Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pint.Qunatity units support #881

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ jobs:
IMAGE_NAME: 'ubuntu-latest'
PYTHON_VERSION: '3.10'
TOX_CMD: 'tensorflow-29'
Linux pint_0191:
IMAGE_NAME: 'ubuntu-latest'
PYTHON_VERSION: '3.10'
TOX_CMD: 'pint-0191'
Linux setup:
IMAGE_NAME: 'ubuntu-latest'
PYTHON_VERSION: '3.9'
Expand Down
18 changes: 15 additions & 3 deletions docs/collected_information.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ You might want to measure various values during your experiments, such as
the progress of prediction accuracy over training steps.

Sacred supports tracking of numerical series (e.g. int, float) using the Metrics API.
If the value is a `pint.Quantity https://pint.readthedocs.io/en/stable/`_, the units will also be tracked.
To access the API in experiments, the experiment must be running and the variable referencing the current experiment
or run must be available in the scope. The ``_run.log_scalar(metric_name, value, step)`` method takes
a metric name (e.g. "training.loss"), the measured value and the iteration step in which the value was taken.
Expand Down Expand Up @@ -215,12 +216,22 @@ In any case, the numbers should form an increasing sequence.
# Implicit step counter (0, 1, 2, 3, ...)
# incremented with each call for training.accuracy:
_run.log_scalar("training.accuracy", value * 2)
# Log an entry with units
ureg = pint.UnitRegistry()
_run.log_scalar("training.distance", value * 2 * ureg.meter)
# Another option is to use the Experiment object (must be running)
# The training.diff has its own step counter (0, 1, 2, ...) too
ex.log_scalar("training.diff", value * 2)


Currently, the information is collected only by two observers: the :ref:`mongo_observer` and the :ref:`file_observer`. For the Mongo Observer, metrics are stored in the ``metrics`` collection of MongoDB and are identified by their name (e.g. "training.loss") and the experiment run id they belong to. For the :ref:`file_observer`, metrics are stored in the file ``metrics.json`` in the run id's directory and are organized by metric name (e.g. "training.loss").
Currently, the information is collected only by the following observers:

* :ref:`mongo_observer`
* Metrics are stored in the ``metrics`` collection of MongoDB and are identified by their name (e.g. "training.loss") and the experiment run id they belong to.
* :ref:`file_observer`
* metrics are stored in the file ``metrics.json`` in the run id's directory and are organized by metric name (e.g. "training.loss").
* :ref:`google_cloud_storage_observer`
* :ref:`s3_observer`


Metrics Records
Expand All @@ -238,9 +249,10 @@ the step number can be found in ``metric["steps"][i]`` and the time of the measu
``_id`` Unique identifier
``name`` The name of the metric (e.g. training.loss)
``run_id`` The identifier of the run (``_id`` in the runs collection)
``steps`` Array of steps (e.g. ``[0, 1, 2, 3, 4]``)
``values`` Array of measured values
``steps`` Array of steps (e.g. ``[0, 1, 2, 3, 4]``)
``values`` Array of measured values
``timestamps`` Array of times of capturing the individual measurements
``meta`` Dictionary of metadata. (e.g. ``{"units": "meter"}``)
================== =======================================================


Expand Down
2 changes: 2 additions & 0 deletions docs/observers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ created in ascending order, and each run directory will contain the files specif
FileStorageObserver Directory Structure documentation above.


.. _google_cloud_storage_observer:

Google Cloud Storage Observer
============

Expand Down
224 changes: 159 additions & 65 deletions sacred/metrics_logger.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#!/usr/bin/env python
# coding=utf-8
from __future__ import annotations
from dataclasses import dataclass, field

import datetime
from typing import Any
import sacred.optional as opt

from queue import Queue, Empty
from queue import Queue


class MetricsLogger:
Expand All @@ -16,90 +20,180 @@ class MetricsLogger:
"""

def __init__(self):
# Create a message queue that remembers
# calls of the log_scalar_metric
self._logged_metrics = Queue()
self._metric_step_counter = {}
self.metrics: dict[str, Metric] = {}
self._metric_step_counter: dict[str, int | float] = {}
"""Remembers the last number of each metric."""

def log_scalar_metric(self, metric_name, value, step=None):
self.plugins: list[MetricPlugin] = [PintMetricPlugin(), NumpyMetricPlugin()]

def log_scalar_metric(
self,
metric_name: str,
value: Any,
step: int | float = None,
):
"""Add a new measurement.

The measurement will be processed by supported observers
during the heartbeat event.

Parameters
----------
metric_name : str
The name of the metric, e.g. training.loss.
value : Any
The measured value. If the value is a `pint.Quantity` then units
information will be sent to the observer.
step : int | float, optional
The step number, e.g. the iteration number
If not specified, an internal counter for each metric
is used, incremented by one. By default None
"""
Add a new measurement.

The measurement will be processed by the MongoDB observer
during a heartbeat event.
Other observers are not yet supported.

:param metric_name: The name of the metric, e.g. training.loss.
:param value: The measured value.
:param step: The step number (integer), e.g. the iteration number
If not specified, an internal counter for each metric
is used, incremented by one.
"""
if opt.has_numpy:
np = opt.np
if isinstance(value, np.generic):
value = value.item()
if isinstance(step, np.generic):
step = step.item()
if step is None:
step = self._metric_step_counter.get(metric_name, -1) + 1
self._logged_metrics.put(
ScalarMetricLogEntry(metric_name, step, datetime.datetime.utcnow(), value)
)
self._metric_step_counter[metric_name] = step

def get_last_metrics(self):
metric_log_entry = ScalarMetricLogEntry(step, datetime.datetime.utcnow(), value)
if metric_name not in self.metrics:
self.metrics[metric_name] = Metric(metric_name)
for plugin in self.plugins:
plugin.process_metric(metric_log_entry, self.metrics[metric_name])
self.metrics[metric_name].entries.put(metric_log_entry)

def get_last_metrics(self) -> list[MetricLogEntry]:
"""Read all measurement events since last call of the method.

:return List[ScalarMetricLogEntry]
Returns
-------
list[MetricLogEntry]
"""
read_up_to = self._logged_metrics.qsize()
messages = []
for i in range(read_up_to):
try:
messages.append(self._logged_metrics.get_nowait())
except Empty:
pass
return messages
last_metrics = [
metric.prepare_for_observers() for metric in self.metrics.values()
]
return [metric for metric in last_metrics if len(metric.entries) > 0]


@dataclass
class ScalarMetricLogEntry:
"""Container for measurements of scalar metrics.

There is exactly one ScalarMetricLogEntry per logged scalar metric value.
"""

def __init__(self, name, step, timestamp, value):
self.name = name
self.step = step
self.timestamp = timestamp
self.value = value
step: Any
timestamp: datetime.datetime
value: Any


def linearize_metrics(logged_metrics):
"""
Group metrics by name.
@dataclass
class Metric:
"""Container for metric metadat and log entries."""

name: str
meta: dict = field(default_factory=dict)
entries: Queue[ScalarMetricLogEntry] = field(default_factory=Queue)

def prepare_for_observers(self) -> MetricLogEntry:
"""Captures the current state of the queue for injestion by observers.

Note that this will clear the entries queue.

Returns
-------
MetricLogEntry
Same as metric, but with the current state of the queue rendered as a tuple.
"""
return MetricLogEntry(
self.name,
self.meta,
tuple(self.entries.get_nowait() for _ in range(self.entries.qsize())),
)


@dataclass
class MetricLogEntry(Metric):
"""Metric with entries frozen."""

entries: tuple[ScalarMetricLogEntry] = tuple()


def linearize_metrics(
logged_metrics: list[MetricLogEntry],
) -> dict[str, dict[str, list | dict]]:
"""Group metrics by name.

Takes a list of individual measurements, possibly belonging
to different metrics and groups them by name.

:param logged_metrics: A list of ScalarMetricLogEntries
:return: Measured values grouped by the metric name:
{"metric_name1": {"steps": [0,1,2], "values": [4, 5, 6],
"timestamps": [datetime, datetime, datetime]},
"metric_name2": {...}}
Parameters
----------
logged_metrics : list[ScalarMetricLogEntry]

Returns
-------
dict[str, dict[str, list | dict]]
Measured values grouped by the metric name:
{
"metric_name1": {
"steps": [0,1,2],
"values": [4, 5, 6],
"timestamps": [datetime, datetime, datetime],
"meta": {}
},
"metric_name2": {...}
}
"""
metrics_by_name = {}
for metric_entry in logged_metrics:
if metric_entry.name not in metrics_by_name:
metrics_by_name[metric_entry.name] = {
"steps": [],
"values": [],
"timestamps": [],
"name": metric_entry.name,
}
metrics_by_name[metric_entry.name]["steps"].append(metric_entry.step)
metrics_by_name[metric_entry.name]["values"].append(metric_entry.value)
metrics_by_name[metric_entry.name]["timestamps"].append(metric_entry.timestamp)
return metrics_by_name
return {
metric.name: {
"meta": metric.meta,
"steps": [m.step for m in metric.entries],
"values": [m.value for m in metric.entries],
"timestamps": [m.timestamp for m in metric.entries],
}
for metric in logged_metrics
}


class MetricPlugin:
@staticmethod
def process_metric(metric_entry: ScalarMetricLogEntry, metric: Metric):
"""Transforms `metric_entry` and `metric`.

Parameters
----------
metric_entry : ScalarMetricLogEntry
metric : Metric
"""


class NumpyMetricPlugin(MetricPlugin):
"""Convert numpy types to plain python types."""

@staticmethod
def process_metric(metric_entry: ScalarMetricLogEntry, metric: Metric):
if not opt.has_numpy:
return metric_entry, metric
import numpy as np

if isinstance(metric_entry.value, np.generic):
metric_entry.value = metric_entry.value.item()
if isinstance(metric_entry.step, np.generic):
metric_entry.step = metric_entry.step.item()


class PintMetricPlugin(MetricPlugin):
"""Convert pint types to python types, track units, and convert between units."""

@staticmethod
def process_metric(
metric_entry: ScalarMetricLogEntry, metric: Metric
) -> tuple[ScalarMetricLogEntry, Metric]:
if not opt.has_pint:
return metric_entry, metric
import pint

units = metric.meta["units"] if "units" in metric.meta else None
if isinstance(metric_entry.value, pint.Quantity):
if units is not None:
metric_entry.value.to(units)
else:
metric.meta["units"] = str(metric_entry.value.units)
metric_entry.value = metric_entry.value.magnitude
25 changes: 12 additions & 13 deletions sacred/observers/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,19 +311,18 @@ def log_metrics(self, metrics_by_name, info):
for metric_name, metric_ptr in metrics_by_name.items():

if metric_name not in saved_metrics:
saved_metrics[metric_name] = {
"values": [],
"steps": [],
"timestamps": [],
}

saved_metrics[metric_name]["values"] += metric_ptr["values"]
saved_metrics[metric_name]["steps"] += metric_ptr["steps"]

# Manually convert them to avoid passing a datetime dtype handler
# when we're trying to convert into json.
timestamps_norm = [ts.isoformat() for ts in metric_ptr["timestamps"]]
saved_metrics[metric_name]["timestamps"] += timestamps_norm
saved_metrics[metric_name] = metric_ptr.copy()
timestamps_norm = [ts.isoformat() for ts in metric_ptr["timestamps"]]
saved_metrics[metric_name]["timestamps"] = timestamps_norm
else:
saved_metrics[metric_name]["values"] += metric_ptr["values"]
saved_metrics[metric_name]["steps"] += metric_ptr["steps"]

# Manually convert them to avoid passing a datetime dtype handler
# when we're trying to convert into json.
timestamps_norm = [ts.isoformat() for ts in metric_ptr["timestamps"]]
saved_metrics[metric_name]["timestamps"] += timestamps_norm
saved_metrics[metric_name]["meta"] = metric_ptr["meta"]

self.save_json(saved_metrics, "metrics.json")

Expand Down
18 changes: 7 additions & 11 deletions sacred/observers/gcs_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,17 +308,13 @@ def log_metrics(self, metrics_by_name, info):
for metric_name, metric_ptr in metrics_by_name.items():

if metric_name not in self.saved_metrics:
self.saved_metrics[metric_name] = {
"values": [],
"steps": [],
"timestamps": [],
}

self.saved_metrics[metric_name]["values"] += metric_ptr["values"]
self.saved_metrics[metric_name]["steps"] += metric_ptr["steps"]

timestamps_norm = [ts.isoformat() for ts in metric_ptr["timestamps"]]
self.saved_metrics[metric_name]["timestamps"] += timestamps_norm
self.saved_metrics[metric_name] = metric_ptr.copy()
else:
self.saved_metrics[metric_name]["values"] += metric_ptr["values"]
self.saved_metrics[metric_name]["steps"] += metric_ptr["steps"]
timestamps_norm = [ts.isoformat() for ts in metric_ptr["timestamps"]]
self.saved_metrics[metric_name]["timestamps"] += timestamps_norm
self.saved_metrics[metric_name]["meta"] = metric_ptr["meta"]

self.save_json(self.saved_metrics, "metrics.json")

Expand Down
Loading