Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,18 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- **`WindowSide` Enum**: Introduced the `WindowSide` enum to represent the sides of a resampling window:
- `LEFT`: Represents the start of the resampling window.
- `RIGHT`: Represents the end of the resampling window.
- The `WindowSide` enum is publicly available for import from the timeseries module.

- **`closed` Parameter**: Added the `closed` parameter to `ResamplerConfig`, allowing users to configure which side of the resampling window is included. This parameter uses the new `WindowSide` enum:
- `WindowSide.RIGHT`: Includes the end of the window, excludes the start. (Default)
- `WindowSide.LEFT`: Includes the start of the window, excludes the end.

- **`label` Parameter**: Added the `label` parameter to `ResamplerConfig`, allowing users to configure the timestamp labeling of the resampled data. This parameter uses the new `WindowSide` enum:
- `WindowSide.RIGHT`: The resampled timestamp represents the end of the resampling window. (Default)
- `WindowSide.LEFT`: The resampled timestamp represents the start of the resampling window.

## Bug Fixes

Expand Down
2 changes: 2 additions & 0 deletions src/frequenz/sdk/timeseries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
ResamplerConfig2,
ResamplingFunction,
ResamplingFunction2,
WindowSide,
)
from ._resampling._exceptions import ResamplingError, SourceStoppedError
from ._resampling._wall_clock_timer import (
Expand Down Expand Up @@ -82,4 +83,5 @@
"TickInfo",
"WallClockTimer",
"WallClockTimerConfig",
"WindowSide",
]
34 changes: 34 additions & 0 deletions src/frequenz/sdk/timeseries/_resampling/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from collections.abc import Sequence
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Protocol

from frequenz.core.datetime import UNIX_EPOCH
Expand Down Expand Up @@ -44,6 +45,16 @@
"""


class WindowSide(Enum):
"""Represents a side of a resampling window."""

LEFT = "left"
"""The left side of the resampling window."""

RIGHT = "right"
"""The right side of the resampling window."""


class ResamplingFunction(Protocol):
"""Combine multiple samples into a new one.

Expand Down Expand Up @@ -126,6 +137,29 @@ class ResamplerConfig:
value.
"""

closed: WindowSide = WindowSide.RIGHT
"""Indicates which side of the resampling window is closed.

If `WindowSide.RIGHT` (default), the resampling window is closed on the
right side and open on the left, meaning it includes samples with timestamps
within the range (start, end], where `start` and `end` are the boundaries of
the window.

If `WindowSide.LEFT`, the resampling window is closed on the left side and
open on the right, meaning it includes samples with timestamps within the
range [start, end), where `start` and `end` are the boundaries of the
window.
"""

label: WindowSide = WindowSide.RIGHT
"""Indicates the timestamp label of the resampled data.

If `WindowSide.RIGHT` (default), the timestamp of the resampled data
corresponds to the right boundary of the resampling window. If
`WindowSide.LEFT`, the timestamp corresponds to the left boundary of the
resampling window.
"""

initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
"""The initial length of the resampling buffer.

Expand Down
35 changes: 25 additions & 10 deletions src/frequenz/sdk/timeseries/_resampling/_resampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import itertools
import logging
import math
from bisect import bisect
from bisect import bisect, bisect_left
from collections import deque
from datetime import datetime, timedelta, timezone
from typing import assert_never
Expand All @@ -20,7 +20,7 @@
from ..._internal._asyncio import cancel_and_await
from .._base_types import Sample
from ._base_types import Sink, Source, SourceProperties
from ._config import ResamplerConfig, ResamplerConfig2
from ._config import ResamplerConfig, ResamplerConfig2, WindowSide
from ._exceptions import ResamplingError, SourceStoppedError
from ._wall_clock_timer import TickInfo, WallClockTimer

Expand Down Expand Up @@ -411,7 +411,8 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]:
"""Generate a new sample based on all the current *relevant* samples.

Args:
timestamp: The timestamp to be used to calculate the new sample.
timestamp: The reference timestamp for the resampling process. This
timestamp indicates the end of the resampling period.

Returns:
A new sample generated by calling the resampling function with all
Expand All @@ -437,12 +438,20 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]:
)
minimum_relevant_timestamp = timestamp - period * conf.max_data_age_in_periods

min_index = bisect(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the behavior how to resample, i.e. left or right open and the labeling should be config parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make left or right opened configurable with the corresponding label, such as:
right open: [t,t+1) -> labeled as t
left open: (t-1,t] -> labeled as t (the current behavior)
Or do you want to allow the user to additionally do something like:
right open, label in the end: [t,t+1) -> labeled as t+1
left open, label in the beginning:(t-1,t] -> labeled as t-1
Because I think the later could lead to a lot of confusion (not sure if its ever needed)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the latter is also reasonable options (see e.g. https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.resample.html), but don't see a strong reason to implement this now if not needed. If it's well-documented, the users can also adjust the timestamps trivially. So your proposal sounds good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, whatever we do, we should probably be much more explicit of how output samples are calculated and structured.

self._buffer,
minimum_relevant_timestamp,
key=lambda s: s[0],
)
max_index = bisect(self._buffer, timestamp, key=lambda s: s[0])
if self._config.closed == WindowSide.LEFT:
min_index = bisect_left(
self._buffer,
minimum_relevant_timestamp,
key=lambda s: s[0],
)
max_index = bisect_left(self._buffer, timestamp, key=lambda s: s[0])
else:
min_index = bisect(
self._buffer,
minimum_relevant_timestamp,
key=lambda s: s[0],
)
max_index = bisect(self._buffer, timestamp, key=lambda s: s[0])
# Using itertools for slicing doesn't look very efficient, but
# experiments with a custom (ring) buffer that can slice showed that
# it is not that bad. See:
Expand All @@ -458,7 +467,13 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]:
if relevant_samples
else None
)
return Sample(timestamp, None if value is None else Quantity(value))

sample_time = (
timestamp - conf.resampling_period
if self._config.label == WindowSide.LEFT
else timestamp
)
return Sample(sample_time, None if value is None else Quantity(value))

def _log_no_relevant_samples(
self, minimum_relevant_timestamp: datetime, timestamp: datetime
Expand Down
172 changes: 172 additions & 0 deletions tests/timeseries/test_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Sink,
Source,
SourceProperties,
WindowSide,
)
from frequenz.sdk.timeseries._resampling._exceptions import (
ResamplingError,
Expand Down Expand Up @@ -1504,6 +1505,177 @@ async def test_resampling_all_zeros(
assert _get_buffer_len(resampler, source_receiver) == 3


@pytest.mark.parametrize("closed", [WindowSide.RIGHT, WindowSide.LEFT])
async def test_resampler_closed_option(
closed: WindowSide,
fake_time: time_machine.Coordinates,
source_chan: Broadcast[Sample[Quantity]],
) -> None:
"""Test the `closed` option in ResamplerConfig."""
timestamp = datetime.now(timezone.utc)

resampling_period_s = 2
expected_resampled_value = 42.0

resampling_fun_mock = MagicMock(
spec=ResamplingFunction, return_value=expected_resampled_value
)
config = ResamplerConfig(
resampling_period=timedelta(seconds=resampling_period_s),
max_data_age_in_periods=1.0,
resampling_function=resampling_fun_mock,
closed=closed,
)
resampler = Resampler(config)

source_receiver = source_chan.new_receiver()
source_sender = source_chan.new_sender()

sink_mock = AsyncMock(spec=Sink, return_value=True)

resampler.add_timeseries("test", source_receiver, sink_mock)
source_props = resampler.get_source_properties(source_receiver)

# Test timeline
#
# t(s) 0 1 2 2.5 3 4
# |----------|----------R----|-----|----------R-----> (no more samples)
# value 5.0 10.0 15.0 1.0 4.0 5.0
#
# R = resampling is done

# Send a few samples and run a resample tick, advancing the fake time by one period
sample1 = Sample(timestamp, value=Quantity(5.0))
sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0))
sample3 = Sample(timestamp + timedelta(seconds=2), value=Quantity(15.0))
await source_sender.send(sample1)
await source_sender.send(sample2)
await source_sender.send(sample3)

await _advance_time(fake_time, resampling_period_s)
await resampler.resample(one_shot=True)

assert datetime.now(timezone.utc).timestamp() == 2
sink_mock.assert_called_once_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s),
Quantity(expected_resampled_value),
)
)
# Assert the behavior based on the `closed` option
if closed == WindowSide.RIGHT:
resampling_fun_mock.assert_called_once_with(
a_sequence(as_float_tuple(sample2), as_float_tuple(sample3)),
config,
source_props,
)
elif closed == WindowSide.LEFT:
resampling_fun_mock.assert_called_once_with(
a_sequence(as_float_tuple(sample1), as_float_tuple(sample2)),
config,
source_props,
)
assert source_props == SourceProperties(
sampling_start=timestamp, received_samples=3, sampling_period=None
)
assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len
sink_mock.reset_mock()
resampling_fun_mock.reset_mock()

# Additional samples at 2.5, 3, and 4 seconds
sample4 = Sample(timestamp + timedelta(seconds=2.5), value=Quantity(1.0))
sample5 = Sample(timestamp + timedelta(seconds=3), value=Quantity(4.0))
sample6 = Sample(timestamp + timedelta(seconds=4), value=Quantity(5.0))
await source_sender.send(sample4)
await source_sender.send(sample5)
await source_sender.send(sample6)

# Advance time to 4 seconds and resample again
await _advance_time(fake_time, resampling_period_s * 2)
await resampler.resample(one_shot=True)

sink_mock.assert_called_once_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s * 2),
Quantity(expected_resampled_value),
)
)
if closed == WindowSide.RIGHT:
resampling_fun_mock.assert_called_once_with(
a_sequence(
as_float_tuple(sample4),
as_float_tuple(sample5),
as_float_tuple(sample6),
),
config,
source_props,
)
elif closed == WindowSide.LEFT:
resampling_fun_mock.assert_called_once_with(
a_sequence(
as_float_tuple(sample3),
as_float_tuple(sample4),
as_float_tuple(sample5),
),
config,
source_props,
)
assert source_props == SourceProperties(
sampling_start=timestamp, received_samples=6, sampling_period=None
)
assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len


@pytest.mark.parametrize("label", [WindowSide.LEFT, WindowSide.RIGHT])
async def test_resampler_label_option(
label: WindowSide,
fake_time: time_machine.Coordinates,
source_chan: Broadcast[Sample[Quantity]],
) -> None:
"""Test the `label` option in ResamplerConfig."""
timestamp = datetime.now(timezone.utc)

resampling_period_s = 2
expected_resampled_value = 42.0

resampling_fun_mock = MagicMock(
spec=ResamplingFunction, return_value=expected_resampled_value
)
config = ResamplerConfig(
resampling_period=timedelta(seconds=resampling_period_s),
max_data_age_in_periods=1.0,
resampling_function=resampling_fun_mock,
label=label,
)
resampler = Resampler(config)

source_receiver = source_chan.new_receiver()
source_sender = source_chan.new_sender()

sink_mock = AsyncMock(spec=Sink, return_value=True)

resampler.add_timeseries("test", source_receiver, sink_mock)

# Send samples and resample
sample1 = Sample(timestamp, value=Quantity(5.0))
sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0))
await source_sender.send(sample1)
await source_sender.send(sample2)

await _advance_time(fake_time, resampling_period_s)
await resampler.resample(one_shot=True)

# Assert the timestamp of the resampled sample
expected_timestamp = (
timestamp
if label == WindowSide.LEFT
else timestamp + timedelta(seconds=resampling_period_s)
)
sink_mock.assert_called_once_with(
Sample(expected_timestamp, Quantity(expected_resampled_value))
)


def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int:
# pylint: disable-next=protected-access
blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen
Expand Down