@@ -617,7 +617,7 @@ async def test_resampling_with_one_window(
617
617
#
618
618
# t(s) 0 1 2 2.5 3 4
619
619
# |----------|----------R----|-----|----------R-----> (no more samples)
620
- # value 5.0 12.0 2 .0 4.0 5.0
620
+ # value 5.0 12.0 0 .0 4.0 5.0
621
621
#
622
622
# R = resampling is done
623
623
@@ -647,7 +647,7 @@ async def test_resampling_with_one_window(
647
647
resampling_fun_mock .reset_mock ()
648
648
649
649
# Second resampling run
650
- sample2_5s = Sample (timestamp + timedelta (seconds = 2.5 ), value = Quantity ( 2.0 ))
650
+ sample2_5s = Sample (timestamp + timedelta (seconds = 2.5 ), value = Quantity . zero ( ))
651
651
sample3s = Sample (timestamp + timedelta (seconds = 3 ), value = Quantity (4.0 ))
652
652
sample4s = Sample (timestamp + timedelta (seconds = 4 ), value = Quantity (5.0 ))
653
653
await source_sender .send (sample2_5s )
@@ -1205,6 +1205,117 @@ async def test_timer_is_aligned(
1205
1205
resampling_fun_mock .reset_mock ()
1206
1206
1207
1207
1208
+ async def test_resampling_all_zeros (
1209
+ fake_time : time_machine .Coordinates , source_chan : Broadcast [Sample [Quantity ]]
1210
+ ) -> None :
1211
+ """Test resampling with one resampling window full of zeros."""
1212
+ timestamp = datetime .now (timezone .utc )
1213
+
1214
+ resampling_period_s = 2
1215
+ expected_resampled_value = 0.0
1216
+
1217
+ resampling_fun_mock = MagicMock (
1218
+ spec = ResamplingFunction , return_value = expected_resampled_value
1219
+ )
1220
+ config = ResamplerConfig (
1221
+ resampling_period = timedelta (seconds = resampling_period_s ),
1222
+ max_data_age_in_periods = 1.0 ,
1223
+ resampling_function = resampling_fun_mock ,
1224
+ initial_buffer_len = 4 ,
1225
+ )
1226
+ resampler = Resampler (config )
1227
+
1228
+ source_receiver = source_chan .new_receiver ()
1229
+ source_sender = source_chan .new_sender ()
1230
+
1231
+ sink_mock = AsyncMock (spec = Sink , return_value = True )
1232
+
1233
+ resampler .add_timeseries ("test" , source_receiver , sink_mock )
1234
+ source_props = resampler .get_source_properties (source_receiver )
1235
+
1236
+ # Test timeline
1237
+ #
1238
+ # t(s) 0 1 2 2.5 3 4
1239
+ # |----------|----------R----|-----|----------R-----> (no more samples)
1240
+ # value 0.0 0.0 0.0 0.0 0.0
1241
+ #
1242
+ # R = resampling is done
1243
+
1244
+ # Send a few samples and run a resample tick, advancing the fake time by one period
1245
+ sample0s = Sample (timestamp , value = Quantity .zero ())
1246
+ sample1s = Sample (timestamp + timedelta (seconds = 1 ), value = Quantity .zero ())
1247
+ await source_sender .send (sample0s )
1248
+ await source_sender .send (sample1s )
1249
+ await _advance_time (fake_time , resampling_period_s )
1250
+ await resampler .resample (one_shot = True )
1251
+
1252
+ assert datetime .now (timezone .utc ).timestamp () == 2
1253
+ sink_mock .assert_called_once_with (
1254
+ Sample (
1255
+ timestamp + timedelta (seconds = resampling_period_s ),
1256
+ Quantity (expected_resampled_value ),
1257
+ )
1258
+ )
1259
+ resampling_fun_mock .assert_called_once_with (
1260
+ a_sequence (sample1s ), config , source_props
1261
+ )
1262
+ assert source_props == SourceProperties (
1263
+ sampling_start = timestamp , received_samples = 2 , sampling_period = None
1264
+ )
1265
+ assert _get_buffer_len (resampler , source_receiver ) == config .initial_buffer_len
1266
+ sink_mock .reset_mock ()
1267
+ resampling_fun_mock .reset_mock ()
1268
+
1269
+ # Second resampling run
1270
+ sample2_5s = Sample (timestamp + timedelta (seconds = 2.5 ), value = Quantity .zero ())
1271
+ sample3s = Sample (timestamp + timedelta (seconds = 3 ), value = Quantity .zero ())
1272
+ sample4s = Sample (timestamp + timedelta (seconds = 4 ), value = Quantity .zero ())
1273
+ await source_sender .send (sample2_5s )
1274
+ await source_sender .send (sample3s )
1275
+ await source_sender .send (sample4s )
1276
+ await _advance_time (fake_time , resampling_period_s )
1277
+ await resampler .resample (one_shot = True )
1278
+
1279
+ assert datetime .now (timezone .utc ).timestamp () == 4
1280
+ sink_mock .assert_called_once_with (
1281
+ Sample (
1282
+ timestamp + timedelta (seconds = resampling_period_s * 2 ),
1283
+ Quantity (expected_resampled_value ),
1284
+ )
1285
+ )
1286
+ resampling_fun_mock .assert_called_once_with (
1287
+ a_sequence (sample2_5s , sample3s , sample4s ), config , source_props
1288
+ )
1289
+ # By now we have a full buffer (5 samples and a buffer of length 4), which
1290
+ # we received in 4 seconds, so we have an input period of 0.8s.
1291
+ assert source_props == SourceProperties (
1292
+ sampling_start = timestamp ,
1293
+ received_samples = 5 ,
1294
+ sampling_period = timedelta (seconds = 0.8 ),
1295
+ )
1296
+ # The buffer should be able to hold 2 seconds of data, and data is coming
1297
+ # every 0.8 seconds, so we should be able to store 3 samples.
1298
+ assert _get_buffer_len (resampler , source_receiver ) == 3
1299
+ sink_mock .reset_mock ()
1300
+ resampling_fun_mock .reset_mock ()
1301
+
1302
+ await _assert_no_more_samples (
1303
+ resampler ,
1304
+ timestamp ,
1305
+ sink_mock ,
1306
+ resampling_fun_mock ,
1307
+ fake_time ,
1308
+ resampling_period_s ,
1309
+ current_iteration = 3 ,
1310
+ )
1311
+ assert source_props == SourceProperties (
1312
+ sampling_start = timestamp ,
1313
+ received_samples = 5 ,
1314
+ sampling_period = timedelta (seconds = 0.8 ),
1315
+ )
1316
+ assert _get_buffer_len (resampler , source_receiver ) == 3
1317
+
1318
+
1208
1319
def _get_buffer_len (resampler : Resampler , source_receiver : Source ) -> int :
1209
1320
# pylint: disable=protected-access
1210
1321
blen = resampler ._resamplers [source_receiver ]._helper ._buffer .maxlen
0 commit comments