Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
- `MetricSample` values now have correct type-hints.

## New Features

Expand Down
129 changes: 101 additions & 28 deletions src/frequenz/client/reporting/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,39 @@
"""Types for the Reporting API client."""

import math
from collections.abc import Iterable, Iterator
from collections.abc import Iterator, MutableSequence
from dataclasses import dataclass
from datetime import datetime
from typing import Any, NamedTuple
from typing import Any, Callable, Generic, NamedTuple, Protocol, TypeVar, cast

# pylint: disable=no-name-in-module
from frequenz.api.common.v1alpha8.metrics.metrics_pb2 import (
MetricSample as PbMetricSample,
)
from frequenz.api.common.v1alpha8.microgrid.electrical_components.electrical_components_pb2 import (
ElectricalComponentDiagnostic as PbElectricalComponentDiagnostic,
)
from frequenz.api.common.v1alpha8.microgrid.electrical_components.electrical_components_pb2 import (
ElectricalComponentStateCode as PbElectricalComponentStateCode,
)
from frequenz.api.common.v1alpha8.microgrid.electrical_components.electrical_components_pb2 import (
ElectricalComponentStateSnapshot as PbElectricalComponentStateSnapshot,
)
from frequenz.api.common.v1alpha8.microgrid.electrical_components.electrical_components_pb2 import (
ElectricalComponentTelemetry as PbElectricalComponentTelemetry,
)
from frequenz.api.common.v1alpha8.microgrid.sensors.sensors_pb2 import (
SensorDiagnostic as PbSensorDiagnostic,
)
from frequenz.api.common.v1alpha8.microgrid.sensors.sensors_pb2 import (
SensorStateCode as PbSensorStateCode,
)
from frequenz.api.common.v1alpha8.microgrid.sensors.sensors_pb2 import (
SensorStateSnapshot as PbSensorStateSnapshot,
)
from frequenz.api.common.v1alpha8.microgrid.sensors.sensors_pb2 import (
SensorTelemetry as PbSensorTelemetry,
)
from frequenz.api.reporting.v1alpha10.reporting_pb2 import (
ReceiveAggregatedMicrogridComponentsDataStreamResponse as PBAggregatedStreamResponse,
)
Expand All @@ -35,13 +62,46 @@ class MetricSample(NamedTuple):

timestamp: datetime
microgrid_id: int
component_id: str
component_id: int | str
metric: str
value: float
value: (
float
| PbElectricalComponentStateCode.ValueType
| PbSensorStateCode.ValueType
| PbElectricalComponentDiagnostic
| PbSensorDiagnostic
)


class _PbMgTelem(Protocol):
"""Protocol for microgrid telemetry from the Reporting API client."""

@property
def microgrid_id(self) -> int:
"""Return the microgrid ID of the telemetry batch."""


class _PbTelem(Protocol):
"""Protocol for telemetry items in the Reporting API client."""

@property
def metric_samples(self) -> MutableSequence[PbMetricSample]:
"""Return the metric samples of the telemetry item."""

@property
def state_snapshots(self) -> MutableSequence[Any]:
"""List of state snapshots associated with this telemetry item."""


_MgTelemT = TypeVar("_MgTelemT", bound=_PbMgTelem)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type name could be less cryptic for my taste.

_TelemT = TypeVar("_TelemT", bound=_PbTelem)
_StateSnapshotT = TypeVar(
"_StateSnapshotT", bound=PbElectricalComponentStateSnapshot | PbSensorStateSnapshot
)


@dataclass(frozen=True)
class GenericDataBatch:
class GenericDataBatch(Generic[_MgTelemT, _TelemT, _StateSnapshotT]):
"""Base class for batches of microgrid data (components or sensors).

This class serves as a base for handling batches of data related to microgrid
Expand All @@ -50,9 +110,9 @@ class GenericDataBatch:
functionality to work with bounds if applicable.
"""

_data_pb: Any
id_attr: str
items_attr: str
_data_pb: _MgTelemT
id_fetcher: Callable[[_TelemT], int]
items_fetcher: Callable[[_MgTelemT], MutableSequence[_TelemT]]
has_bounds: bool = False

def is_empty(self) -> bool:
Expand All @@ -61,15 +121,13 @@ def is_empty(self) -> bool:
Returns:
True if the batch contains no valid data.
"""
items = getattr(self._data_pb, self.items_attr, [])
items = self.items_fetcher(self._data_pb)
if not items:
return True
for item in items:
if not getattr(item, "metric_samples", []) and not getattr(
item, "states", []
):
return True
return False
if item.metric_samples or item.state_snapshots:
return False
return True

# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
Expand All @@ -89,11 +147,11 @@ def __iter__(self) -> Iterator[MetricSample]:
* value: The metric value.
"""
mid = self._data_pb.microgrid_id
items = getattr(self._data_pb, self.items_attr)
items = self.items_fetcher(self._data_pb)

for item in items:
cid = getattr(item, self.id_attr)
for sample in getattr(item, "metric_samples", []):
cid = self.id_fetcher(item)
for sample in item.metric_samples:
Comment on lines +150 to +154
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenericDataBatch.__iter__() now relies on direct attribute access (item.metric_samples, item.state_snapshots, etc.). There are no unit tests covering iteration/unrolling behavior (only is_empty() is tested), so regressions here would be easy to miss. Consider adding a test that constructs a minimal batch (mock/proto) and asserts the yielded MetricSamples for both metric samples and state snapshots.

Copilot uses AI. Check for mistakes.
ts = datetime_from_proto(sample.sample_time)
met = enum_from_proto(sample.metric, Metric, allow_invalid=False).name

Expand Down Expand Up @@ -128,21 +186,26 @@ def __iter__(self) -> Iterator[MetricSample]:
ts, mid, cid, f"{met}_bound_{i}_upper", upper
)

for state in getattr(item, "state_snapshots", []):
for state in item.state_snapshots:
state = cast(_StateSnapshotT, state)
ts = datetime_from_proto(state.origin_time)
for category, category_items in {
"state": getattr(state, "states", []),
"warning": getattr(state, "warnings", []),
"error": getattr(state, "errors", []),
"state": state.states,
"warning": state.warnings,
"error": state.errors,
}.items():
if not isinstance(category_items, Iterable):
continue
for s in category_items:
yield MetricSample(ts, mid, cid, category, s)


@dataclass(frozen=True)
class ComponentsDataBatch(GenericDataBatch):
class ComponentsDataBatch(
GenericDataBatch[
PBReceiveMicrogridComponentsDataStreamResponse,
PbElectricalComponentTelemetry,
PbElectricalComponentStateSnapshot,
]
):
"""Batch of microgrid components data."""

def __init__(self, data_pb: PBReceiveMicrogridComponentsDataStreamResponse):
Expand All @@ -153,14 +216,20 @@ def __init__(self, data_pb: PBReceiveMicrogridComponentsDataStreamResponse):
"""
super().__init__(
data_pb,
id_attr="electrical_component_id",
items_attr="components",
id_fetcher=lambda item: item.electrical_component_id,
items_fetcher=lambda pb: pb.components,
has_bounds=True,
)


@dataclass(frozen=True)
class SensorsDataBatch(GenericDataBatch):
class SensorsDataBatch(
GenericDataBatch[
PBReceiveMicrogridSensorsDataStreamResponse,
PbSensorTelemetry,
PbSensorStateSnapshot,
]
):
"""Batch of microgrid sensors data."""

def __init__(self, data_pb: PBReceiveMicrogridSensorsDataStreamResponse):
Expand All @@ -169,7 +238,11 @@ def __init__(self, data_pb: PBReceiveMicrogridSensorsDataStreamResponse):
Args:
data_pb: The underlying protobuf message.
"""
super().__init__(data_pb, id_attr="sensor_id", items_attr="sensors")
super().__init__(
data_pb,
id_fetcher=lambda item: item.sensor_id,
items_fetcher=lambda pb: pb.sensors,
)


@dataclass(frozen=True)
Expand Down