Skip to content

Commit 0250fc3

Browse files
committed
Fix filter not seeing the latest dispatches after periodic fetch()'s
By using a workflow that avoids reassigning the dispatches dict. When `fetch()`, which runs periodically, would assign a fresh map to `self._dispatches` the filter would not see the latest dispatches, as it would still reference the old map. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent cff2298 commit 0250fc3

File tree

2 files changed

+24
-10
lines changed

2 files changed

+24
-10
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@
1616

1717
* The merge by type class now uses the correct logger path.
1818
* The merge by type was made more robust under heavy load, making sure to use the same `now` for all dispatches that are checked.
19+
* Fix that the merge filter was using an outdated dispatches dict once fetch() ran.

src/frequenz/dispatch/_bg_service.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,7 @@ async def _fetch(self, timer: Timer) -> None:
366366
"""
367367
self._initial_fetch_event.clear()
368368

369-
old_dispatches = self._dispatches
370-
self._dispatches = {}
369+
new_dispatches = {}
371370

372371
try:
373372
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
@@ -381,9 +380,9 @@ async def _fetch(self, timer: Timer) -> None:
381380
continue
382381
dispatch = Dispatch(client_dispatch)
383382

384-
self._dispatches[dispatch.id] = dispatch
385-
old_dispatch = old_dispatches.pop(dispatch.id, None)
386-
if not old_dispatch:
383+
new_dispatches[dispatch.id] = dispatch
384+
old_dispatch = self._dispatches.get(dispatch.id, None)
385+
if old_dispatch is None:
387386
_logger.debug("New dispatch: %s", dispatch)
388387
await self._update_dispatch_schedule_and_notify(
389388
dispatch, None, timer
@@ -396,23 +395,37 @@ async def _fetch(self, timer: Timer) -> None:
396395
)
397396
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
398397

399-
_logger.debug("Received %s dispatches", len(self._dispatches))
398+
_logger.debug("Received %s dispatches", len(new_dispatches))
400399

401400
except grpc.aio.AioRpcError as error:
402401
_logger.error("Error fetching dispatches: %s", error)
403-
self._dispatches = old_dispatches
404402
return
405403

406-
for dispatch in old_dispatches.values():
404+
# We make a copy because we mutate self._dispatch.keys() inside the loop
405+
for dispatch_id in frozenset(self._dispatches.keys() - new_dispatches.keys()):
406+
try:
407+
dispatch = self._dispatches.pop(dispatch_id)
408+
except KeyError as error:
409+
_logger.warning(
410+
"Inconsistency in cache detected. "
411+
+ "Tried to delete non-existing dispatch %s (%s)",
412+
dispatch_id,
413+
error,
414+
)
415+
continue
416+
407417
_logger.debug("Deleted dispatch: %s", dispatch)
408-
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
409418
await self._update_dispatch_schedule_and_notify(None, dispatch, timer)
410419

411420
# Set deleted only here as it influences the result of dispatch.started
412-
# which is used in above in _running_state_change
421+
# which is used in the func call above ^
413422
dispatch._set_deleted() # pylint: disable=protected-access
414423
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
415424

425+
# Update the dispatch list with the dispatches
426+
self._dispatches.update(new_dispatches)
427+
428+
# Set event to indicate fetch ran at least once
416429
self._initial_fetch_event.set()
417430

418431
async def _update_dispatch_schedule_and_notify(

0 commit comments

Comments
 (0)