diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index dc08c0b5f..2794cb380 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,7 +12,18 @@ ## New Features - +- **`WindowSide` Enum**: Introduced the `WindowSide` enum to represent the sides of a resampling window: + - `LEFT`: Represents the start of the resampling window. + - `RIGHT`: Represents the end of the resampling window. + - The `WindowSide` enum is publicly available for import from the timeseries module. + +- **`closed` Parameter**: Added the `closed` parameter to `ResamplerConfig`, allowing users to configure which side of the resampling window is included. This parameter uses the new `WindowSide` enum: + - `WindowSide.RIGHT`: Includes the end of the window, excludes the start. (Default) + - `WindowSide.LEFT`: Includes the start of the window, excludes the end. + +- **`label` Parameter**: Added the `label` parameter to `ResamplerConfig`, allowing users to configure the timestamp labeling of the resampled data. This parameter uses the new `WindowSide` enum: + - `WindowSide.RIGHT`: The resampled timestamp represents the end of the resampling window. (Default) + - `WindowSide.LEFT`: The resampled timestamp represents the start of the resampling window. ## Bug Fixes diff --git a/src/frequenz/sdk/timeseries/__init__.py b/src/frequenz/sdk/timeseries/__init__.py index d5bde1602..288ea6b5a 100644 --- a/src/frequenz/sdk/timeseries/__init__.py +++ b/src/frequenz/sdk/timeseries/__init__.py @@ -49,6 +49,7 @@ ResamplerConfig2, ResamplingFunction, ResamplingFunction2, + WindowSide, ) from ._resampling._exceptions import ResamplingError, SourceStoppedError from ._resampling._wall_clock_timer import ( @@ -82,4 +83,5 @@ "TickInfo", "WallClockTimer", "WallClockTimerConfig", + "WindowSide", ] diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 047892803..55de6854b 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -10,6 +10,7 @@ from collections.abc import Sequence from dataclasses import dataclass, field from datetime import datetime, timedelta +from enum import Enum from typing import Protocol from frequenz.core.datetime import UNIX_EPOCH @@ -44,6 +45,16 @@ """ +class WindowSide(Enum): + """Represents a side of a resampling window.""" + + LEFT = "left" + """The left side of the resampling window.""" + + RIGHT = "right" + """The right side of the resampling window.""" + + class ResamplingFunction(Protocol): """Combine multiple samples into a new one. @@ -126,6 +137,29 @@ class ResamplerConfig: value. """ + closed: WindowSide = WindowSide.RIGHT + """Indicates which side of the resampling window is closed. + + If `WindowSide.RIGHT` (default), the resampling window is closed on the + right side and open on the left, meaning it includes samples with timestamps + within the range (start, end], where `start` and `end` are the boundaries of + the window. + + If `WindowSide.LEFT`, the resampling window is closed on the left side and + open on the right, meaning it includes samples with timestamps within the + range [start, end), where `start` and `end` are the boundaries of the + window. + """ + + label: WindowSide = WindowSide.RIGHT + """Indicates the timestamp label of the resampled data. + + If `WindowSide.RIGHT` (default), the timestamp of the resampled data + corresponds to the right boundary of the resampling window. If + `WindowSide.LEFT`, the timestamp corresponds to the left boundary of the + resampling window. + """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index 81471fc0d..82fb5f0f7 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -9,7 +9,7 @@ import itertools import logging import math -from bisect import bisect +from bisect import bisect, bisect_left from collections import deque from datetime import datetime, timedelta, timezone from typing import assert_never @@ -20,7 +20,7 @@ from ..._internal._asyncio import cancel_and_await from .._base_types import Sample from ._base_types import Sink, Source, SourceProperties -from ._config import ResamplerConfig, ResamplerConfig2 +from ._config import ResamplerConfig, ResamplerConfig2, WindowSide from ._exceptions import ResamplingError, SourceStoppedError from ._wall_clock_timer import TickInfo, WallClockTimer @@ -411,7 +411,8 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: """Generate a new sample based on all the current *relevant* samples. Args: - timestamp: The timestamp to be used to calculate the new sample. + timestamp: The reference timestamp for the resampling process. This + timestamp indicates the end of the resampling period. Returns: A new sample generated by calling the resampling function with all @@ -437,12 +438,20 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: ) minimum_relevant_timestamp = timestamp - period * conf.max_data_age_in_periods - min_index = bisect( - self._buffer, - minimum_relevant_timestamp, - key=lambda s: s[0], - ) - max_index = bisect(self._buffer, timestamp, key=lambda s: s[0]) + if self._config.closed == WindowSide.LEFT: + min_index = bisect_left( + self._buffer, + minimum_relevant_timestamp, + key=lambda s: s[0], + ) + max_index = bisect_left(self._buffer, timestamp, key=lambda s: s[0]) + else: + min_index = bisect( + self._buffer, + minimum_relevant_timestamp, + key=lambda s: s[0], + ) + max_index = bisect(self._buffer, timestamp, key=lambda s: s[0]) # Using itertools for slicing doesn't look very efficient, but # experiments with a custom (ring) buffer that can slice showed that # it is not that bad. See: @@ -458,7 +467,13 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: if relevant_samples else None ) - return Sample(timestamp, None if value is None else Quantity(value)) + + sample_time = ( + timestamp - conf.resampling_period + if self._config.label == WindowSide.LEFT + else timestamp + ) + return Sample(sample_time, None if value is None else Quantity(value)) def _log_no_relevant_samples( self, minimum_relevant_timestamp: datetime, timestamp: datetime diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 3629b5f4e..31f22638f 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -26,6 +26,7 @@ Sink, Source, SourceProperties, + WindowSide, ) from frequenz.sdk.timeseries._resampling._exceptions import ( ResamplingError, @@ -1504,6 +1505,177 @@ async def test_resampling_all_zeros( assert _get_buffer_len(resampler, source_receiver) == 3 +@pytest.mark.parametrize("closed", [WindowSide.RIGHT, WindowSide.LEFT]) +async def test_resampler_closed_option( + closed: WindowSide, + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `closed` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + closed=closed, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + source_props = resampler.get_source_properties(source_receiver) + + # Test timeline + # + # t(s) 0 1 2 2.5 3 4 + # |----------|----------R----|-----|----------R-----> (no more samples) + # value 5.0 10.0 15.0 1.0 4.0 5.0 + # + # R = resampling is done + + # Send a few samples and run a resample tick, advancing the fake time by one period + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + sample3 = Sample(timestamp + timedelta(seconds=2), value=Quantity(15.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + await source_sender.send(sample3) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + assert datetime.now(timezone.utc).timestamp() == 2 + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(expected_resampled_value), + ) + ) + # Assert the behavior based on the `closed` option + if closed == WindowSide.RIGHT: + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample2), as_float_tuple(sample3)), + config, + source_props, + ) + elif closed == WindowSide.LEFT: + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample1), as_float_tuple(sample2)), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=3, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + sink_mock.reset_mock() + resampling_fun_mock.reset_mock() + + # Additional samples at 2.5, 3, and 4 seconds + sample4 = Sample(timestamp + timedelta(seconds=2.5), value=Quantity(1.0)) + sample5 = Sample(timestamp + timedelta(seconds=3), value=Quantity(4.0)) + sample6 = Sample(timestamp + timedelta(seconds=4), value=Quantity(5.0)) + await source_sender.send(sample4) + await source_sender.send(sample5) + await source_sender.send(sample6) + + # Advance time to 4 seconds and resample again + await _advance_time(fake_time, resampling_period_s * 2) + await resampler.resample(one_shot=True) + + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s * 2), + Quantity(expected_resampled_value), + ) + ) + if closed == WindowSide.RIGHT: + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample4), + as_float_tuple(sample5), + as_float_tuple(sample6), + ), + config, + source_props, + ) + elif closed == WindowSide.LEFT: + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample3), + as_float_tuple(sample4), + as_float_tuple(sample5), + ), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=6, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + + +@pytest.mark.parametrize("label", [WindowSide.LEFT, WindowSide.RIGHT]) +async def test_resampler_label_option( + label: WindowSide, + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `label` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + label=label, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + # Send samples and resample + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + # Assert the timestamp of the resampled sample + expected_timestamp = ( + timestamp + if label == WindowSide.LEFT + else timestamp + timedelta(seconds=resampling_period_s) + ) + sink_mock.assert_called_once_with( + Sample(expected_timestamp, Quantity(expected_resampled_value)) + ) + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable-next=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen