From cf0877c07f8c8e474a5a642c9795889c27426ca7 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 17 Feb 2026 17:30:49 +0100 Subject: [PATCH 1/4] Implement the `ChpData` class for holding data from CHP Signed-off-by: Sahas Subramanian --- .../sdk/microgrid/_old_component_data.py | 262 ++++++++++++++++++ 1 file changed, 262 insertions(+) diff --git a/src/frequenz/sdk/microgrid/_old_component_data.py b/src/frequenz/sdk/microgrid/_old_component_data.py index 48c00eefc..4808061e9 100644 --- a/src/frequenz/sdk/microgrid/_old_component_data.py +++ b/src/frequenz/sdk/microgrid/_old_component_data.py @@ -839,6 +839,268 @@ def to_samples(self) -> ComponentDataSamples: ) +@dataclass(kw_only=True) +class ChpData(ComponentData): # pylint: disable=too-many-instance-attributes + """A wrapper class for holding CHP data.""" + + active_power: float = 0.0 + """The total active 3-phase AC power, in Watts (W). + + Represented in the passive sign convention. + + * Positive means consumption from the grid. + * Negative means supply into the grid. + """ + + active_power_per_phase: PhaseTuple = (0.0, 0.0, 0.0) + """The per-phase AC active power for phase 1, 2, and 3 respectively, in Watt (W). + + Represented in the passive sign convention. + + * Positive means consumption from the grid. + * Negative means supply into the grid. + """ + + reactive_power: float = 0.0 + """The total reactive 3-phase AC power, in Volt-Ampere Reactive (VAr). + + * Positive power means capacitive (current leading w.r.t. voltage). + * Negative power means inductive (current lagging w.r.t. voltage). + """ + + reactive_power_per_phase: PhaseTuple = (0.0, 0.0, 0.0) + """The per-phase AC reactive power, in Volt-Ampere Reactive (VAr). + + The provided values are for phase 1, 2, and 3 respectively. + + * Positive power means capacitive (current leading w.r.t. voltage). + * Negative power means inductive (current lagging w.r.t. voltage). + """ + + current_per_phase: PhaseTuple = (0.0, 0.0, 0.0) + """AC current in Amperes (A) for phase/line 1, 2 and 3 respectively. + + Represented in the passive sign convention. + + * Positive means consumption from the grid. + * Negative means supply into the grid. + """ + + voltage_per_phase: PhaseTuple = (0.0, 0.0, 0.0) + """The AC voltage in Volts (V) between the line and the neutral wire for + phase/line 1, 2 and 3 respectively. + """ + + active_power_inclusion_lower_bound: float = 0.0 + """Lower inclusion bound for CHP power in watts. + + This is the lower limit of the range within which power requests are allowed for the + CHP. + + See [`frequenz.api.common.metrics_pb2.Metric.system_inclusion_bounds`][] and + [`frequenz.api.common.metrics_pb2.Metric.system_exclusion_bounds`][] for more + details. + """ + + active_power_exclusion_lower_bound: float = 0.0 + """Lower exclusion bound for CHP power in watts. + + This is the lower limit of the range within which power requests are not allowed for + the CHP. + + See [`frequenz.api.common.metrics_pb2.Metric.system_inclusion_bounds`][] and + [`frequenz.api.common.metrics_pb2.Metric.system_exclusion_bounds`][] for more + details. + """ + + active_power_inclusion_upper_bound: float = 0.0 + """Upper inclusion bound for CHP power in watts. + + This is the upper limit of the range within which power requests are allowed for the + CHP. + + See [`frequenz.api.common.metrics_pb2.Metric.system_inclusion_bounds`][] and + [`frequenz.api.common.metrics_pb2.Metric.system_exclusion_bounds`][] for more + details. + """ + + active_power_exclusion_upper_bound: float = 0.0 + """Upper exclusion bound for CHP power in watts. + + This is the upper limit of the range within which power requests are not allowed for + the CHP. + + See [`frequenz.api.common.metrics_pb2.Metric.system_inclusion_bounds`][] and + [`frequenz.api.common.metrics_pb2.Metric.system_exclusion_bounds`][] for more + details. + """ + + frequency: float = 0.0 + """AC frequency, in Hertz (Hz).""" + + CATEGORY: ClassVar[ComponentCategory] = ComponentCategory.CHP + + METRICS: ClassVar[frozenset[Metric]] = frozenset( + [ + Metric.AC_ACTIVE_POWER, + Metric.AC_ACTIVE_POWER_PHASE_1, + Metric.AC_ACTIVE_POWER_PHASE_2, + Metric.AC_ACTIVE_POWER_PHASE_3, + Metric.AC_REACTIVE_POWER, + Metric.AC_REACTIVE_POWER_PHASE_1, + Metric.AC_REACTIVE_POWER_PHASE_2, + Metric.AC_REACTIVE_POWER_PHASE_3, + Metric.AC_CURRENT_PHASE_1, + Metric.AC_CURRENT_PHASE_2, + Metric.AC_CURRENT_PHASE_3, + Metric.AC_VOLTAGE_PHASE_1_N, + Metric.AC_VOLTAGE_PHASE_2_N, + Metric.AC_VOLTAGE_PHASE_3_N, + Metric.AC_FREQUENCY, + ] + ) + """The metrics of this component.""" + + @override + @classmethod + # pylint: disable-next=too-many-branches + def from_samples(cls, samples: ComponentDataSamples) -> Self: + """Create a new instance from a component data object.""" + if not samples.metric_samples: + raise ValueError("No metrics in the samples.") + + self = cls._from_samples(cls, samples) + + active_power_per_phase: list[float] = [0.0, 0.0, 0.0] + reactive_power_per_phase: list[float] = [0.0, 0.0, 0.0] + current_per_phase: list[float] = [0.0, 0.0, 0.0] + voltage_per_phase: list[float] = [0.0, 0.0, 0.0] + + for sample in samples.metric_samples: + value = sample.as_single_value() or 0.0 + match sample.metric: + case Metric.AC_ACTIVE_POWER: + self.active_power = value + ( + self.active_power_inclusion_lower_bound, + self.active_power_inclusion_upper_bound, + self.active_power_exclusion_lower_bound, + self.active_power_exclusion_upper_bound, + ) = _bound_ranges_to_inclusion_exclusion( + sample.bounds, "AC_ACTIVE_POWER", sample + ) + case Metric.AC_ACTIVE_POWER_PHASE_1: + active_power_per_phase[0] = value + case Metric.AC_ACTIVE_POWER_PHASE_2: + active_power_per_phase[1] = value + case Metric.AC_ACTIVE_POWER_PHASE_3: + active_power_per_phase[2] = value + case Metric.AC_REACTIVE_POWER: + self.reactive_power = value + case Metric.AC_REACTIVE_POWER_PHASE_1: + reactive_power_per_phase[0] = value + case Metric.AC_REACTIVE_POWER_PHASE_2: + reactive_power_per_phase[1] = value + case Metric.AC_REACTIVE_POWER_PHASE_3: + reactive_power_per_phase[2] = value + case Metric.AC_CURRENT_PHASE_1: + current_per_phase[0] = value + case Metric.AC_CURRENT_PHASE_2: + current_per_phase[1] = value + case Metric.AC_CURRENT_PHASE_3: + current_per_phase[2] = value + case Metric.AC_VOLTAGE_PHASE_1_N: + voltage_per_phase[0] = value + case Metric.AC_VOLTAGE_PHASE_2_N: + voltage_per_phase[1] = value + case Metric.AC_VOLTAGE_PHASE_3_N: + voltage_per_phase[2] = value + case Metric.AC_FREQUENCY: + self.frequency = value + case unexpected: + _logger.warning( + "Unexpected metric %s in CHP data sample: %r", + unexpected, + sample, + ) + + self.active_power_per_phase = cast(PhaseTuple, tuple(active_power_per_phase)) + self.reactive_power_per_phase = cast( + PhaseTuple, tuple(reactive_power_per_phase) + ) + self.current_per_phase = cast(PhaseTuple, tuple(current_per_phase)) + self.voltage_per_phase = cast(PhaseTuple, tuple(voltage_per_phase)) + + return self + + @override + def to_samples(self) -> ComponentDataSamples: + """Convert the component data to a component data object.""" + return ComponentDataSamples( + component_id=self.component_id, + metric_samples=[ + MetricSample( + sampled_at=self.timestamp, + metric=Metric.AC_ACTIVE_POWER, + value=self.active_power, + bounds=_inclusion_exclusion_bounds_to_ranges( + self.active_power_inclusion_lower_bound, + self.active_power_inclusion_upper_bound, + self.active_power_exclusion_lower_bound, + self.active_power_exclusion_upper_bound, + ), + ), + *( + MetricSample( + sampled_at=self.timestamp, metric=metric, value=value, bounds=[] + ) + for metric, value in [ + ( + Metric.AC_ACTIVE_POWER_PHASE_1, + self.active_power_per_phase[0], + ), + ( + Metric.AC_ACTIVE_POWER_PHASE_2, + self.active_power_per_phase[1], + ), + ( + Metric.AC_ACTIVE_POWER_PHASE_3, + self.active_power_per_phase[2], + ), + (Metric.AC_REACTIVE_POWER, self.reactive_power), + ( + Metric.AC_REACTIVE_POWER_PHASE_1, + self.reactive_power_per_phase[0], + ), + ( + Metric.AC_REACTIVE_POWER_PHASE_2, + self.reactive_power_per_phase[1], + ), + ( + Metric.AC_REACTIVE_POWER_PHASE_3, + self.reactive_power_per_phase[2], + ), + (Metric.AC_CURRENT_PHASE_1, self.current_per_phase[0]), + (Metric.AC_CURRENT_PHASE_2, self.current_per_phase[1]), + (Metric.AC_CURRENT_PHASE_3, self.current_per_phase[2]), + (Metric.AC_VOLTAGE_PHASE_1_N, self.voltage_per_phase[0]), + (Metric.AC_VOLTAGE_PHASE_2_N, self.voltage_per_phase[1]), + (Metric.AC_VOLTAGE_PHASE_3_N, self.voltage_per_phase[2]), + (Metric.AC_FREQUENCY, self.frequency), + ] + ), + ], + states=[ + ComponentStateSample( + sampled_at=self.timestamp, + states=frozenset(self.states), + warnings=frozenset(self.warnings), + errors=frozenset(self.errors), + ) + ], + ) + + @dataclass(kw_only=True) class EVChargerData(ComponentData): # pylint: disable=too-many-instance-attributes """A wrapper class for holding ev_charger data.""" From 38006c3254719593299d706524f5e3af484089fb Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 17 Feb 2026 17:31:40 +0100 Subject: [PATCH 2/4] Support fetching CHP data from the DataSourcingActor Signed-off-by: Sahas Subramanian --- .../_data_sourcing/microgrid_api_source.py | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index 316c950d0..577c24275 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -20,6 +20,7 @@ from ...timeseries import Sample from .._old_component_data import ( BatteryData, + ChpData, EVChargerData, InverterData, MeterData, @@ -121,6 +122,36 @@ Metric.AC_REACTIVE_POWER_PHASE_3: lambda msg: msg.reactive_power_per_phase[2], } +_CHP_DATA_METHODS: dict[Metric | TransitionalMetric, Callable[[ChpData], float]] = { + Metric.AC_ACTIVE_POWER: lambda msg: msg.active_power, + Metric.AC_ACTIVE_POWER_PHASE_1: lambda msg: msg.active_power_per_phase[0], + Metric.AC_ACTIVE_POWER_PHASE_2: lambda msg: msg.active_power_per_phase[1], + Metric.AC_ACTIVE_POWER_PHASE_3: lambda msg: msg.active_power_per_phase[2], + TransitionalMetric.ACTIVE_POWER_INCLUSION_LOWER_BOUND: lambda msg: ( + msg.active_power_inclusion_lower_bound + ), + TransitionalMetric.ACTIVE_POWER_EXCLUSION_LOWER_BOUND: lambda msg: ( + msg.active_power_exclusion_lower_bound + ), + TransitionalMetric.ACTIVE_POWER_EXCLUSION_UPPER_BOUND: lambda msg: ( + msg.active_power_exclusion_upper_bound + ), + TransitionalMetric.ACTIVE_POWER_INCLUSION_UPPER_BOUND: lambda msg: ( + msg.active_power_inclusion_upper_bound + ), + Metric.AC_CURRENT_PHASE_1: lambda msg: msg.current_per_phase[0], + Metric.AC_CURRENT_PHASE_2: lambda msg: msg.current_per_phase[1], + Metric.AC_CURRENT_PHASE_3: lambda msg: msg.current_per_phase[2], + Metric.AC_VOLTAGE_PHASE_1_N: lambda msg: msg.voltage_per_phase[0], + Metric.AC_VOLTAGE_PHASE_2_N: lambda msg: msg.voltage_per_phase[1], + Metric.AC_VOLTAGE_PHASE_3_N: lambda msg: msg.voltage_per_phase[2], + Metric.AC_FREQUENCY: lambda msg: msg.frequency, + Metric.AC_REACTIVE_POWER: lambda msg: msg.reactive_power, + Metric.AC_REACTIVE_POWER_PHASE_1: lambda msg: msg.reactive_power_per_phase[0], + Metric.AC_REACTIVE_POWER_PHASE_2: lambda msg: msg.reactive_power_per_phase[1], + Metric.AC_REACTIVE_POWER_PHASE_3: lambda msg: msg.reactive_power_per_phase[2], +} + class MicrogridApiSource: """Fetches requested metrics from the Microgrid API. @@ -250,6 +281,31 @@ async def _check_inverter_request( connection_manager.get().api_client, comp_id ) + async def _check_chp_request( + self, + comp_id: ComponentId, + requests: dict[Metric | TransitionalMetric, list[ComponentMetricRequest]], + ) -> None: + """Check if the requests are valid CHP metrics. + + Raises: + ValueError: if the requested metric is not available for CHP. + + Args: + comp_id: The id of the requested component. + requests: A list of metric requests received from external actors + for the given CHP. + """ + for metric in requests: + if metric not in _CHP_DATA_METHODS: + err = f"Unknown metric {metric} for CHP id {comp_id}" + _logger.error(err) + raise ValueError(err) + if comp_id not in self.comp_data_receivers: + self.comp_data_receivers[comp_id] = ChpData.subscribe( + connection_manager.get().api_client, comp_id + ) + async def _check_meter_request( self, comp_id: ComponentId, @@ -304,6 +360,8 @@ async def _check_requested_component_and_metrics( await self._check_inverter_request(comp_id, requests) elif category == ComponentCategory.METER: await self._check_meter_request(comp_id, requests) + elif category == ComponentCategory.CHP: + await self._check_chp_request(comp_id, requests) else: err = f"Unknown component category {category}" _logger.error(err) @@ -333,6 +391,8 @@ def _get_data_extraction_method( return _METER_DATA_METHODS[metric] if category == ComponentCategory.EV_CHARGER: return _EV_CHARGER_DATA_METHODS[metric] + if category == ComponentCategory.CHP: + return _CHP_DATA_METHODS[metric] err = f"Unknown component category {category}" _logger.error(err) raise ValueError(err) From e90bd422fd7be106674edc9cb14421b4d4321b3d Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 17 Feb 2026 17:33:01 +0100 Subject: [PATCH 3/4] Keep data sourcing actor channels open for non-streaming components To be able to keep the resampling actor from crashing or closing the stream for components that are not streaming, we have to keep their upstream channels open. This will be revamped soon when we start supporting stream lifetimes based on downstream needs. Signed-off-by: Sahas Subramanian --- .../sdk/microgrid/_data_sourcing/microgrid_api_source.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index 577c24275..d43da8e58 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -482,13 +482,6 @@ async def clean_tasks( sending_tasks = await clean_tasks(sending_tasks) await asyncio.gather(*sending_tasks) - await asyncio.gather( - *[ - self._registry.close_and_remove(r.get_channel_name()) - for requests in self._req_streaming_metrics[comp_id].values() - for r in requests - ] - ) except Exception: _logger.exception( "Unexpected error while handling data stream for component %d (%s), " From b66862a7879c4cb15892b76c20f10fae3e600b96 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 17 Feb 2026 17:38:38 +0100 Subject: [PATCH 4/4] Update release notes Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index dc08c0b5f..fad6db0dc 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,7 +12,7 @@ ## New Features - +- The SDK's data pipeline now supports retrieving telemetry from CHPs. ## Bug Fixes