diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index dc08c0b5f..fad6db0dc 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,7 +12,7 @@ ## New Features - +- The SDK's data pipeline now supports retrieving telemetry from CHPs. ## Bug Fixes diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index 316c950d0..d43da8e58 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -20,6 +20,7 @@ from ...timeseries import Sample from .._old_component_data import ( BatteryData, + ChpData, EVChargerData, InverterData, MeterData, @@ -121,6 +122,36 @@ Metric.AC_REACTIVE_POWER_PHASE_3: lambda msg: msg.reactive_power_per_phase[2], } +_CHP_DATA_METHODS: dict[Metric | TransitionalMetric, Callable[[ChpData], float]] = { + Metric.AC_ACTIVE_POWER: lambda msg: msg.active_power, + Metric.AC_ACTIVE_POWER_PHASE_1: lambda msg: msg.active_power_per_phase[0], + Metric.AC_ACTIVE_POWER_PHASE_2: lambda msg: msg.active_power_per_phase[1], + Metric.AC_ACTIVE_POWER_PHASE_3: lambda msg: msg.active_power_per_phase[2], + TransitionalMetric.ACTIVE_POWER_INCLUSION_LOWER_BOUND: lambda msg: ( + msg.active_power_inclusion_lower_bound + ), + TransitionalMetric.ACTIVE_POWER_EXCLUSION_LOWER_BOUND: lambda msg: ( + msg.active_power_exclusion_lower_bound + ), + TransitionalMetric.ACTIVE_POWER_EXCLUSION_UPPER_BOUND: lambda msg: ( + msg.active_power_exclusion_upper_bound + ), + TransitionalMetric.ACTIVE_POWER_INCLUSION_UPPER_BOUND: lambda msg: ( + msg.active_power_inclusion_upper_bound + ), + Metric.AC_CURRENT_PHASE_1: lambda msg: msg.current_per_phase[0], + Metric.AC_CURRENT_PHASE_2: lambda msg: msg.current_per_phase[1], + Metric.AC_CURRENT_PHASE_3: lambda msg: msg.current_per_phase[2], + Metric.AC_VOLTAGE_PHASE_1_N: lambda msg: msg.voltage_per_phase[0], + Metric.AC_VOLTAGE_PHASE_2_N: lambda msg: msg.voltage_per_phase[1], + Metric.AC_VOLTAGE_PHASE_3_N: lambda msg: msg.voltage_per_phase[2], + Metric.AC_FREQUENCY: lambda msg: msg.frequency, + Metric.AC_REACTIVE_POWER: lambda msg: msg.reactive_power, + Metric.AC_REACTIVE_POWER_PHASE_1: lambda msg: msg.reactive_power_per_phase[0], + Metric.AC_REACTIVE_POWER_PHASE_2: lambda msg: msg.reactive_power_per_phase[1], + Metric.AC_REACTIVE_POWER_PHASE_3: lambda msg: msg.reactive_power_per_phase[2], +} + class MicrogridApiSource: """Fetches requested metrics from the Microgrid API. @@ -250,6 +281,31 @@ async def _check_inverter_request( connection_manager.get().api_client, comp_id ) + async def _check_chp_request( + self, + comp_id: ComponentId, + requests: dict[Metric | TransitionalMetric, list[ComponentMetricRequest]], + ) -> None: + """Check if the requests are valid CHP metrics. + + Raises: + ValueError: if the requested metric is not available for CHP. + + Args: + comp_id: The id of the requested component. + requests: A list of metric requests received from external actors + for the given CHP. + """ + for metric in requests: + if metric not in _CHP_DATA_METHODS: + err = f"Unknown metric {metric} for CHP id {comp_id}" + _logger.error(err) + raise ValueError(err) + if comp_id not in self.comp_data_receivers: + self.comp_data_receivers[comp_id] = ChpData.subscribe( + connection_manager.get().api_client, comp_id + ) + async def _check_meter_request( self, comp_id: ComponentId, @@ -304,6 +360,8 @@ async def _check_requested_component_and_metrics( await self._check_inverter_request(comp_id, requests) elif category == ComponentCategory.METER: await self._check_meter_request(comp_id, requests) + elif category == ComponentCategory.CHP: + await self._check_chp_request(comp_id, requests) else: err = f"Unknown component category {category}" _logger.error(err) @@ -333,6 +391,8 @@ def _get_data_extraction_method( return _METER_DATA_METHODS[metric] if category == ComponentCategory.EV_CHARGER: return _EV_CHARGER_DATA_METHODS[metric] + if category == ComponentCategory.CHP: + return _CHP_DATA_METHODS[metric] err = f"Unknown component category {category}" _logger.error(err) raise ValueError(err) @@ -422,13 +482,6 @@ async def clean_tasks( sending_tasks = await clean_tasks(sending_tasks) await asyncio.gather(*sending_tasks) - await asyncio.gather( - *[ - self._registry.close_and_remove(r.get_channel_name()) - for requests in self._req_streaming_metrics[comp_id].values() - for r in requests - ] - ) except Exception: _logger.exception( "Unexpected error while handling data stream for component %d (%s), " diff --git a/src/frequenz/sdk/microgrid/_old_component_data.py b/src/frequenz/sdk/microgrid/_old_component_data.py index 48c00eefc..4808061e9 100644 --- a/src/frequenz/sdk/microgrid/_old_component_data.py +++ b/src/frequenz/sdk/microgrid/_old_component_data.py @@ -839,6 +839,268 @@ def to_samples(self) -> ComponentDataSamples: ) +@dataclass(kw_only=True) +class ChpData(ComponentData): # pylint: disable=too-many-instance-attributes + """A wrapper class for holding CHP data.""" + + active_power: float = 0.0 + """The total active 3-phase AC power, in Watts (W). + + Represented in the passive sign convention. + + * Positive means consumption from the grid. + * Negative means supply into the grid. + """ + + active_power_per_phase: PhaseTuple = (0.0, 0.0, 0.0) + """The per-phase AC active power for phase 1, 2, and 3 respectively, in Watt (W). + + Represented in the passive sign convention. + + * Positive means consumption from the grid. + * Negative means supply into the grid. + """ + + reactive_power: float = 0.0 + """The total reactive 3-phase AC power, in Volt-Ampere Reactive (VAr). + + * Positive power means capacitive (current leading w.r.t. voltage). + * Negative power means inductive (current lagging w.r.t. voltage). + """ + + reactive_power_per_phase: PhaseTuple = (0.0, 0.0, 0.0) + """The per-phase AC reactive power, in Volt-Ampere Reactive (VAr). + + The provided values are for phase 1, 2, and 3 respectively. + + * Positive power means capacitive (current leading w.r.t. voltage). + * Negative power means inductive (current lagging w.r.t. voltage). + """ + + current_per_phase: PhaseTuple = (0.0, 0.0, 0.0) + """AC current in Amperes (A) for phase/line 1, 2 and 3 respectively. + + Represented in the passive sign convention. + + * Positive means consumption from the grid. + * Negative means supply into the grid. + """ + + voltage_per_phase: PhaseTuple = (0.0, 0.0, 0.0) + """The AC voltage in Volts (V) between the line and the neutral wire for + phase/line 1, 2 and 3 respectively. + """ + + active_power_inclusion_lower_bound: float = 0.0 + """Lower inclusion bound for CHP power in watts. + + This is the lower limit of the range within which power requests are allowed for the + CHP. + + See [`frequenz.api.common.metrics_pb2.Metric.system_inclusion_bounds`][] and + [`frequenz.api.common.metrics_pb2.Metric.system_exclusion_bounds`][] for more + details. + """ + + active_power_exclusion_lower_bound: float = 0.0 + """Lower exclusion bound for CHP power in watts. + + This is the lower limit of the range within which power requests are not allowed for + the CHP. + + See [`frequenz.api.common.metrics_pb2.Metric.system_inclusion_bounds`][] and + [`frequenz.api.common.metrics_pb2.Metric.system_exclusion_bounds`][] for more + details. + """ + + active_power_inclusion_upper_bound: float = 0.0 + """Upper inclusion bound for CHP power in watts. + + This is the upper limit of the range within which power requests are allowed for the + CHP. + + See [`frequenz.api.common.metrics_pb2.Metric.system_inclusion_bounds`][] and + [`frequenz.api.common.metrics_pb2.Metric.system_exclusion_bounds`][] for more + details. + """ + + active_power_exclusion_upper_bound: float = 0.0 + """Upper exclusion bound for CHP power in watts. + + This is the upper limit of the range within which power requests are not allowed for + the CHP. + + See [`frequenz.api.common.metrics_pb2.Metric.system_inclusion_bounds`][] and + [`frequenz.api.common.metrics_pb2.Metric.system_exclusion_bounds`][] for more + details. + """ + + frequency: float = 0.0 + """AC frequency, in Hertz (Hz).""" + + CATEGORY: ClassVar[ComponentCategory] = ComponentCategory.CHP + + METRICS: ClassVar[frozenset[Metric]] = frozenset( + [ + Metric.AC_ACTIVE_POWER, + Metric.AC_ACTIVE_POWER_PHASE_1, + Metric.AC_ACTIVE_POWER_PHASE_2, + Metric.AC_ACTIVE_POWER_PHASE_3, + Metric.AC_REACTIVE_POWER, + Metric.AC_REACTIVE_POWER_PHASE_1, + Metric.AC_REACTIVE_POWER_PHASE_2, + Metric.AC_REACTIVE_POWER_PHASE_3, + Metric.AC_CURRENT_PHASE_1, + Metric.AC_CURRENT_PHASE_2, + Metric.AC_CURRENT_PHASE_3, + Metric.AC_VOLTAGE_PHASE_1_N, + Metric.AC_VOLTAGE_PHASE_2_N, + Metric.AC_VOLTAGE_PHASE_3_N, + Metric.AC_FREQUENCY, + ] + ) + """The metrics of this component.""" + + @override + @classmethod + # pylint: disable-next=too-many-branches + def from_samples(cls, samples: ComponentDataSamples) -> Self: + """Create a new instance from a component data object.""" + if not samples.metric_samples: + raise ValueError("No metrics in the samples.") + + self = cls._from_samples(cls, samples) + + active_power_per_phase: list[float] = [0.0, 0.0, 0.0] + reactive_power_per_phase: list[float] = [0.0, 0.0, 0.0] + current_per_phase: list[float] = [0.0, 0.0, 0.0] + voltage_per_phase: list[float] = [0.0, 0.0, 0.0] + + for sample in samples.metric_samples: + value = sample.as_single_value() or 0.0 + match sample.metric: + case Metric.AC_ACTIVE_POWER: + self.active_power = value + ( + self.active_power_inclusion_lower_bound, + self.active_power_inclusion_upper_bound, + self.active_power_exclusion_lower_bound, + self.active_power_exclusion_upper_bound, + ) = _bound_ranges_to_inclusion_exclusion( + sample.bounds, "AC_ACTIVE_POWER", sample + ) + case Metric.AC_ACTIVE_POWER_PHASE_1: + active_power_per_phase[0] = value + case Metric.AC_ACTIVE_POWER_PHASE_2: + active_power_per_phase[1] = value + case Metric.AC_ACTIVE_POWER_PHASE_3: + active_power_per_phase[2] = value + case Metric.AC_REACTIVE_POWER: + self.reactive_power = value + case Metric.AC_REACTIVE_POWER_PHASE_1: + reactive_power_per_phase[0] = value + case Metric.AC_REACTIVE_POWER_PHASE_2: + reactive_power_per_phase[1] = value + case Metric.AC_REACTIVE_POWER_PHASE_3: + reactive_power_per_phase[2] = value + case Metric.AC_CURRENT_PHASE_1: + current_per_phase[0] = value + case Metric.AC_CURRENT_PHASE_2: + current_per_phase[1] = value + case Metric.AC_CURRENT_PHASE_3: + current_per_phase[2] = value + case Metric.AC_VOLTAGE_PHASE_1_N: + voltage_per_phase[0] = value + case Metric.AC_VOLTAGE_PHASE_2_N: + voltage_per_phase[1] = value + case Metric.AC_VOLTAGE_PHASE_3_N: + voltage_per_phase[2] = value + case Metric.AC_FREQUENCY: + self.frequency = value + case unexpected: + _logger.warning( + "Unexpected metric %s in CHP data sample: %r", + unexpected, + sample, + ) + + self.active_power_per_phase = cast(PhaseTuple, tuple(active_power_per_phase)) + self.reactive_power_per_phase = cast( + PhaseTuple, tuple(reactive_power_per_phase) + ) + self.current_per_phase = cast(PhaseTuple, tuple(current_per_phase)) + self.voltage_per_phase = cast(PhaseTuple, tuple(voltage_per_phase)) + + return self + + @override + def to_samples(self) -> ComponentDataSamples: + """Convert the component data to a component data object.""" + return ComponentDataSamples( + component_id=self.component_id, + metric_samples=[ + MetricSample( + sampled_at=self.timestamp, + metric=Metric.AC_ACTIVE_POWER, + value=self.active_power, + bounds=_inclusion_exclusion_bounds_to_ranges( + self.active_power_inclusion_lower_bound, + self.active_power_inclusion_upper_bound, + self.active_power_exclusion_lower_bound, + self.active_power_exclusion_upper_bound, + ), + ), + *( + MetricSample( + sampled_at=self.timestamp, metric=metric, value=value, bounds=[] + ) + for metric, value in [ + ( + Metric.AC_ACTIVE_POWER_PHASE_1, + self.active_power_per_phase[0], + ), + ( + Metric.AC_ACTIVE_POWER_PHASE_2, + self.active_power_per_phase[1], + ), + ( + Metric.AC_ACTIVE_POWER_PHASE_3, + self.active_power_per_phase[2], + ), + (Metric.AC_REACTIVE_POWER, self.reactive_power), + ( + Metric.AC_REACTIVE_POWER_PHASE_1, + self.reactive_power_per_phase[0], + ), + ( + Metric.AC_REACTIVE_POWER_PHASE_2, + self.reactive_power_per_phase[1], + ), + ( + Metric.AC_REACTIVE_POWER_PHASE_3, + self.reactive_power_per_phase[2], + ), + (Metric.AC_CURRENT_PHASE_1, self.current_per_phase[0]), + (Metric.AC_CURRENT_PHASE_2, self.current_per_phase[1]), + (Metric.AC_CURRENT_PHASE_3, self.current_per_phase[2]), + (Metric.AC_VOLTAGE_PHASE_1_N, self.voltage_per_phase[0]), + (Metric.AC_VOLTAGE_PHASE_2_N, self.voltage_per_phase[1]), + (Metric.AC_VOLTAGE_PHASE_3_N, self.voltage_per_phase[2]), + (Metric.AC_FREQUENCY, self.frequency), + ] + ), + ], + states=[ + ComponentStateSample( + sampled_at=self.timestamp, + states=frozenset(self.states), + warnings=frozenset(self.warnings), + errors=frozenset(self.errors), + ) + ], + ) + + @dataclass(kw_only=True) class EVChargerData(ComponentData): # pylint: disable=too-many-instance-attributes """A wrapper class for holding ev_charger data."""