Skip to content

Commit 0bcf61c

Browse files
Add a Moving Window Class (#190)
2 parents 2524cb2 + 7aa5499 commit 0bcf61c

File tree

4 files changed

+270
-3
lines changed

4 files changed

+270
-3
lines changed

RELEASE_NOTES.md

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
* A new class `OrderedRingBuffer` is now available, providing a sorted ring buffer of datetime-value pairs with tracking of any values that have not yet been written.
1616
* Add logical meter formula for EV power.
17+
* A `MovingWindow` class has been added that consumes a data stream from a logical meter and updates an `OrderedRingBuffer`.
1718

1819
## Bug Fixes
1920

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A data window that moves with the latest datapoints of a data stream."""
5+
6+
from __future__ import annotations
7+
8+
import asyncio
9+
import logging
10+
from collections.abc import Sequence
11+
from datetime import datetime, timedelta
12+
13+
import numpy as np
14+
from frequenz.channels import Receiver
15+
from numpy.typing import ArrayLike
16+
17+
from .._internal.asyncio import cancel_and_await
18+
from . import Sample
19+
from ._ringbuffer import OrderedRingBuffer
20+
21+
log = logging.getLogger(__name__)
22+
23+
24+
class MovingWindow(Sequence):
25+
"""
26+
A data window that moves with the latest datapoints of a data stream.
27+
28+
After initialization the `MovingWindow` can be accessed by an integer
29+
index or a timestamp. A sub window can be accessed by using a slice of integers
30+
integers or timestamps.
31+
32+
Note that a numpy ndarray is returned and thus users can use
33+
numpys operations directly on a window.
34+
35+
The window uses an ringbuffer for storage and the first element is aligned to
36+
a fixed defined point in time. Since the moving nature of the window, the
37+
date of the first and the last element are constantly changing and therefore
38+
the point in time that defines the alignment can be outside of the time window.
39+
Modulo arithmetic is used to move the `window_alignment` timestamp into the
40+
latest window.
41+
If for example the `window_alignment` parameter is set to `datetime(1, 1, 1)`
42+
and the window size is bigger than one day then the first element will always
43+
be aligned to the midnight. For further information see also the
44+
[`OrderedRingBuffer`][frequenz.sdk.timeseries._ringbuffer.OrderedRingBuffer]
45+
documentation.
46+
47+
48+
**Example1** (calculating the mean of a time interval):
49+
50+
```
51+
window = MovingWindow(size=100, resampled_data_recv=resampled_data_recv)
52+
53+
time_start = datetime.now()
54+
time_end = time_start + timedelta(minutes=5)
55+
56+
# ... wait for 5 minutes until the buffer is filled
57+
await asyncio.sleep(5)
58+
59+
# return an numpy array from the window
60+
a = window[time_start:time_end]
61+
# and use it to for example calculate the mean
62+
mean = a.mean()
63+
'''
64+
65+
**Example2** (create a polars data frame from a `MovingWindow`):
66+
67+
```
68+
import polars as pl
69+
70+
# create a window that stores two days of data
71+
# starting at 1.1.23 with samplerate=1
72+
window = MovingWindow(size = (60 * 60 * 24 * 2), sample_receiver)
73+
74+
# wait for one full day until the buffer is filled
75+
asyncio.sleep(60*60*24)
76+
77+
# create a polars series with one full day of data
78+
time_start = datetime(2023, 1, 1)
79+
time_end = datetime(2023, 1, 2)
80+
s = pl.Series("Jan_1", mv[time_start:time_end])
81+
```
82+
"""
83+
84+
def __init__(
85+
self,
86+
size: int,
87+
resampled_data_recv: Receiver[Sample],
88+
sampling_period: timedelta,
89+
window_alignment: datetime = datetime(1, 1, 1),
90+
) -> None:
91+
"""
92+
Initialize the MovingWindow.
93+
94+
This method creates the underlying ringbuffer and starts a
95+
new task that updates the ringbuffer with new incoming samples.
96+
The task stops running only if the channel receiver is closed.
97+
98+
Args:
99+
size: The number of elements that are stored.
100+
resampled_data_recv: A receiver that delivers samples with a
101+
given sampling period.
102+
sampling_period: The sampling period.
103+
window_alignment: A datetime object that defines a point in time to which
104+
the window is aligned to modulo window size.
105+
(default is midnight 01.01.01)
106+
For further information, consult the class level documentation.
107+
108+
Raises:
109+
asyncio.CancelledError: when the task gets cancelled.
110+
"""
111+
self._resampled_data_recv = resampled_data_recv
112+
self._buffer = OrderedRingBuffer(
113+
np.empty(shape=size, dtype=float),
114+
sampling_period=sampling_period,
115+
time_index_alignment=window_alignment,
116+
)
117+
self._copy_buffer = False
118+
self._update_window_task: asyncio.Task = asyncio.create_task(self._run_impl())
119+
log.debug("Cancelling MovingWindow task: %s", __name__)
120+
121+
async def _run_impl(self) -> None:
122+
"""Awaits samples from the receiver and updates the underlying ringbuffer."""
123+
try:
124+
async for sample in self._resampled_data_recv:
125+
log.debug("Received new sample: %s", sample)
126+
self._buffer.update(sample)
127+
except asyncio.CancelledError:
128+
log.info("MovingWindow task has been cancelled.")
129+
return
130+
131+
log.error("Channel has been closed")
132+
133+
async def stop(self) -> None:
134+
"""Cancel the running task and stop the MovingWindow."""
135+
await cancel_and_await(self._update_window_task)
136+
137+
def __len__(self) -> int:
138+
"""
139+
Return the size of the `MovingWindow`s underlying buffer.
140+
141+
Returns:
142+
The size of the `MovingWindow`.
143+
"""
144+
return len(self._buffer)
145+
146+
def __getitem__(self, key: int | datetime | slice) -> float | ArrayLike:
147+
"""
148+
Return a sub window of the `MovingWindow`.
149+
150+
The `MovingWindow` is accessed either by timestamp or by index
151+
or by a slice of timestamps or integers.
152+
153+
* If the key is an integer, the float value of that key
154+
at the given position is returned.
155+
* If the key is a datetime object, the float value of that key
156+
that corresponds to the timestamp is returned.
157+
* If the key is a slice of timestamps or integers, an ndarray is returned,
158+
where the bounds correspond to the slice bounds.
159+
Note that a half open interval, which is open at the end, is returned.
160+
161+
Args:
162+
key: Either an integer or a timestamp or a slice of timestamps or integers.
163+
164+
Raises:
165+
IndexError: when requesting an out of range timestamp or index
166+
TypeError: when the key is not a datetime or slice object.
167+
168+
Returns:
169+
A float if the key is a number or a timestamp.
170+
an numpy array if the key is a slice.
171+
"""
172+
if isinstance(key, slice):
173+
log.debug("Returning slice for [%s:%s].", key.start, key.stop)
174+
# we are doing runtime typechecks since there is no abstract slice type yet
175+
# see also (https://peps.python.org/pep-0696)
176+
if isinstance(key.start, datetime) and isinstance(key.stop, datetime):
177+
return self._buffer.window(key.start, key.stop, self._copy_buffer)
178+
if isinstance(key.start, int) and isinstance(key.stop, int):
179+
return self._buffer[key]
180+
elif isinstance(key, datetime):
181+
log.debug("Returning value at time %s ", key)
182+
return self._buffer[self._buffer.datetime_to_index(key)]
183+
elif isinstance(key, int):
184+
return self._buffer[key]
185+
186+
raise TypeError(
187+
"Key has to be either a timestamp or an integer "
188+
"or a slice of timestamps or integers"
189+
)

src/frequenz/sdk/timeseries/_ringbuffer.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class Gap:
2828
"""End of the range, exclusive."""
2929

3030
def contains(self, timestamp: datetime):
31-
"""Check if a given timestamp is inside this gap.
31+
"""Check if a given timestamp is inside this gap.
3232
3333
Args:
3434
timestamp: Timestamp to check.
@@ -55,8 +55,8 @@ def __init__(
5555
5656
Args:
5757
buffer: Instance of a buffer container to use internally.
58-
sampling_period: Timedelta of the desired resampling period.
59-
time_index_alignment: Arbitary point in time used to align
58+
sampling_period: Timedelta of the desired sampling period.
59+
time_index_alignment: Arbitrary point in time used to align
6060
timestamped data with the index position in the buffer.
6161
Used to make the data stored in the buffer align with the
6262
beginning and end of the buffer borders.
+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the moving window."""
5+
6+
import asyncio
7+
from datetime import datetime, timedelta
8+
from typing import Sequence, Tuple
9+
10+
import numpy as np
11+
from frequenz.channels import Broadcast, Sender
12+
13+
from frequenz.sdk.timeseries import Sample
14+
from frequenz.sdk.timeseries._moving_window import MovingWindow
15+
16+
17+
async def push_lm_data(sender: Sender[Sample], test_seq: Sequence[float]) -> None:
18+
"""
19+
Push data in the passed sender to mock `LogicalMeter` behaviour.
20+
Starting with the First of January 2023.
21+
22+
Args:
23+
sender: Sender for pushing resampled samples to the `MovingWindow`.
24+
test_seq: The Sequence that is pushed into the `MovingWindow`.
25+
"""
26+
start_ts: datetime = datetime(2023, 1, 1)
27+
for i, j in zip(test_seq, range(0, len(test_seq))):
28+
timestamp = start_ts + timedelta(seconds=j)
29+
await sender.send(Sample(timestamp, float(i)))
30+
31+
await asyncio.sleep(0.0)
32+
33+
34+
def init_moving_window(shape: int) -> Tuple[MovingWindow, Sender[Sample]]:
35+
"""
36+
Initialize the moving window with given shape
37+
38+
Args:
39+
shape: The size of the `MovingWindow`
40+
41+
Returns:
42+
tuple[MovingWindow, Sender[Sample]]: A pair of sender and `MovingWindow`.
43+
"""
44+
lm_chan = Broadcast[Sample]("lm_net_power")
45+
lm_tx = lm_chan.new_sender()
46+
window = MovingWindow(shape, lm_chan.new_receiver(), timedelta(seconds=1))
47+
return window, lm_tx
48+
49+
50+
async def test_access_window_by_index() -> None:
51+
"""Test indexing a window by integer index"""
52+
window, sender = init_moving_window(1)
53+
await push_lm_data(sender, [1])
54+
assert np.array_equal(window[0], 1.0)
55+
56+
57+
async def test_access_window_by_timestamp() -> None:
58+
"""Test indexing a window by timestamp"""
59+
window, sender = init_moving_window(1)
60+
await push_lm_data(sender, [1])
61+
assert np.array_equal(window[datetime(2023, 1, 1)], 1.0)
62+
63+
64+
async def test_access_window_by_int_slice() -> None:
65+
"""Test accessing a subwindow with an integer slice"""
66+
window, sender = init_moving_window(5)
67+
await push_lm_data(sender, range(0, 5))
68+
assert np.array_equal(window[3:5], np.array([3.0, 4.0]))
69+
70+
71+
async def test_access_window_by_ts_slice() -> None:
72+
"""Test accessing a subwindow with a timestamp slice"""
73+
window, sender = init_moving_window(5)
74+
await push_lm_data(sender, range(0, 5))
75+
time_start = datetime(2023, 1, 1) + timedelta(seconds=3)
76+
time_end = time_start + timedelta(seconds=2)
77+
assert np.array_equal(window[time_start:time_end], np.array([3.0, 4.0])) # type: ignore

0 commit comments

Comments
 (0)