diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index f80f12e..0d56f81 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,7 +6,7 @@ ## Upgrading - +- `MetricSample` values now have correct type-hints. ## New Features diff --git a/src/frequenz/client/reporting/_types.py b/src/frequenz/client/reporting/_types.py index 7b090ba..e6fa4a8 100644 --- a/src/frequenz/client/reporting/_types.py +++ b/src/frequenz/client/reporting/_types.py @@ -4,12 +4,39 @@ """Types for the Reporting API client.""" import math -from collections.abc import Iterable, Iterator +from collections.abc import Iterator, MutableSequence from dataclasses import dataclass from datetime import datetime -from typing import Any, NamedTuple +from typing import Any, Callable, Generic, NamedTuple, Protocol, TypeVar, cast # pylint: disable=no-name-in-module +from frequenz.api.common.v1alpha8.metrics.metrics_pb2 import ( + MetricSample as PbMetricSample, +) +from frequenz.api.common.v1alpha8.microgrid.electrical_components.electrical_components_pb2 import ( + ElectricalComponentDiagnostic as PbElectricalComponentDiagnostic, +) +from frequenz.api.common.v1alpha8.microgrid.electrical_components.electrical_components_pb2 import ( + ElectricalComponentStateCode as PbElectricalComponentStateCode, +) +from frequenz.api.common.v1alpha8.microgrid.electrical_components.electrical_components_pb2 import ( + ElectricalComponentStateSnapshot as PbElectricalComponentStateSnapshot, +) +from frequenz.api.common.v1alpha8.microgrid.electrical_components.electrical_components_pb2 import ( + ElectricalComponentTelemetry as PbElectricalComponentTelemetry, +) +from frequenz.api.common.v1alpha8.microgrid.sensors.sensors_pb2 import ( + SensorDiagnostic as PbSensorDiagnostic, +) +from frequenz.api.common.v1alpha8.microgrid.sensors.sensors_pb2 import ( + SensorStateCode as PbSensorStateCode, +) +from frequenz.api.common.v1alpha8.microgrid.sensors.sensors_pb2 import ( + SensorStateSnapshot as PbSensorStateSnapshot, +) +from frequenz.api.common.v1alpha8.microgrid.sensors.sensors_pb2 import ( + SensorTelemetry as PbSensorTelemetry, +) from frequenz.api.reporting.v1alpha10.reporting_pb2 import ( ReceiveAggregatedMicrogridComponentsDataStreamResponse as PBAggregatedStreamResponse, ) @@ -35,13 +62,46 @@ class MetricSample(NamedTuple): timestamp: datetime microgrid_id: int - component_id: str + component_id: int | str metric: str - value: float + value: ( + float + | PbElectricalComponentStateCode.ValueType + | PbSensorStateCode.ValueType + | PbElectricalComponentDiagnostic + | PbSensorDiagnostic + ) + + +class _PbMgTelem(Protocol): + """Protocol for microgrid telemetry from the Reporting API client.""" + + @property + def microgrid_id(self) -> int: + """Return the microgrid ID of the telemetry batch.""" + + +class _PbTelem(Protocol): + """Protocol for telemetry items in the Reporting API client.""" + + @property + def metric_samples(self) -> MutableSequence[PbMetricSample]: + """Return the metric samples of the telemetry item.""" + + @property + def state_snapshots(self) -> MutableSequence[Any]: + """List of state snapshots associated with this telemetry item.""" + + +_MgTelemT = TypeVar("_MgTelemT", bound=_PbMgTelem) +_TelemT = TypeVar("_TelemT", bound=_PbTelem) +_StateSnapshotT = TypeVar( + "_StateSnapshotT", bound=PbElectricalComponentStateSnapshot | PbSensorStateSnapshot +) @dataclass(frozen=True) -class GenericDataBatch: +class GenericDataBatch(Generic[_MgTelemT, _TelemT, _StateSnapshotT]): """Base class for batches of microgrid data (components or sensors). This class serves as a base for handling batches of data related to microgrid @@ -50,9 +110,9 @@ class GenericDataBatch: functionality to work with bounds if applicable. """ - _data_pb: Any - id_attr: str - items_attr: str + _data_pb: _MgTelemT + id_fetcher: Callable[[_TelemT], int] + items_fetcher: Callable[[_MgTelemT], MutableSequence[_TelemT]] has_bounds: bool = False def is_empty(self) -> bool: @@ -61,15 +121,13 @@ def is_empty(self) -> bool: Returns: True if the batch contains no valid data. """ - items = getattr(self._data_pb, self.items_attr, []) + items = self.items_fetcher(self._data_pb) if not items: return True for item in items: - if not getattr(item, "metric_samples", []) and not getattr( - item, "states", [] - ): - return True - return False + if item.metric_samples or item.state_snapshots: + return False + return True # pylint: disable=too-many-locals # pylint: disable=too-many-branches @@ -89,11 +147,11 @@ def __iter__(self) -> Iterator[MetricSample]: * value: The metric value. """ mid = self._data_pb.microgrid_id - items = getattr(self._data_pb, self.items_attr) + items = self.items_fetcher(self._data_pb) for item in items: - cid = getattr(item, self.id_attr) - for sample in getattr(item, "metric_samples", []): + cid = self.id_fetcher(item) + for sample in item.metric_samples: ts = datetime_from_proto(sample.sample_time) met = enum_from_proto(sample.metric, Metric, allow_invalid=False).name @@ -128,21 +186,26 @@ def __iter__(self) -> Iterator[MetricSample]: ts, mid, cid, f"{met}_bound_{i}_upper", upper ) - for state in getattr(item, "state_snapshots", []): + for state in item.state_snapshots: + state = cast(_StateSnapshotT, state) ts = datetime_from_proto(state.origin_time) for category, category_items in { - "state": getattr(state, "states", []), - "warning": getattr(state, "warnings", []), - "error": getattr(state, "errors", []), + "state": state.states, + "warning": state.warnings, + "error": state.errors, }.items(): - if not isinstance(category_items, Iterable): - continue for s in category_items: yield MetricSample(ts, mid, cid, category, s) @dataclass(frozen=True) -class ComponentsDataBatch(GenericDataBatch): +class ComponentsDataBatch( + GenericDataBatch[ + PBReceiveMicrogridComponentsDataStreamResponse, + PbElectricalComponentTelemetry, + PbElectricalComponentStateSnapshot, + ] +): """Batch of microgrid components data.""" def __init__(self, data_pb: PBReceiveMicrogridComponentsDataStreamResponse): @@ -153,14 +216,20 @@ def __init__(self, data_pb: PBReceiveMicrogridComponentsDataStreamResponse): """ super().__init__( data_pb, - id_attr="electrical_component_id", - items_attr="components", + id_fetcher=lambda item: item.electrical_component_id, + items_fetcher=lambda pb: pb.components, has_bounds=True, ) @dataclass(frozen=True) -class SensorsDataBatch(GenericDataBatch): +class SensorsDataBatch( + GenericDataBatch[ + PBReceiveMicrogridSensorsDataStreamResponse, + PbSensorTelemetry, + PbSensorStateSnapshot, + ] +): """Batch of microgrid sensors data.""" def __init__(self, data_pb: PBReceiveMicrogridSensorsDataStreamResponse): @@ -169,7 +238,11 @@ def __init__(self, data_pb: PBReceiveMicrogridSensorsDataStreamResponse): Args: data_pb: The underlying protobuf message. """ - super().__init__(data_pb, id_attr="sensor_id", items_attr="sensors") + super().__init__( + data_pb, + id_fetcher=lambda item: item.sensor_id, + items_fetcher=lambda pb: pb.sensors, + ) @dataclass(frozen=True)