Skip to content

Commit 49ebc13

Browse files
authored
Fix dispatch dict passed to MergeStrategy not getting updated for future fetch's (#174)
* We are also switching to use the same `now` while evaluating other dispatches in the MergeStrategy, but this is just an enhancement and not a fix * We still used logging without "path" in some places, that was fixed
2 parents ab62c06 + 7762602 commit 49ebc13

File tree

6 files changed

+85
-27
lines changed

6 files changed

+85
-27
lines changed

RELEASE_NOTES.md

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@
66

77
## Upgrading
88

9-
The `frequenz.dispatch.TargetComponents` type was removed, use `frequenz.client.dispatch.TargetComponents` instead.
9+
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
1010

1111
## New Features
1212

13-
* The dispatcher offers two new parameters to control the client's call and stream timeout:
14-
- `call_timeout`: The maximum time to wait for a response from the client.
15-
- `stream_timeout`: The maximum time to wait before restarting a stream.
16-
* While the dispatch stream restarts we refresh our dispatch cache as well, to ensure we didn't miss any updates.
13+
<!-- Here goes the main new features and examples or instructions on how to use them -->
1714

1815
## Bug Fixes
1916

20-
* Fixed that dispatches are never retried on failure, but instead an infinite loop of retry logs is triggered.
17+
* The merge by type class now uses the correct logger path.
18+
* The merge by type was made more robust under heavy load, making sure to use the same `now` for all dispatches that are checked.
19+
* Fix that the merge filter was using an outdated dispatches dict once fetch() ran.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ dependencies = [
4040
# plugins.mkdocstrings.handlers.python.import)
4141
"frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200",
4242
"frequenz-channels >= 1.6.1, < 2.0.0",
43-
"frequenz-client-dispatch >= 0.11.0, < 0.12.0",
43+
"frequenz-client-dispatch >= 0.11.1, < 0.12.0",
4444
"frequenz-client-common >= 0.3.2, < 0.4.0",
4545
"frequenz-client-base >= 0.11.0, < 0.12.0",
4646
]

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,11 +253,13 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
253253
identity = self._dispatch_identity(stopping_dispatch)
254254

255255
if actor_and_channel := self._actors.pop(identity, None):
256+
_logger.info("Stopping actor for dispatch type %r", stopping_dispatch.type)
256257
await actor_and_channel.actor.stop(msg)
257258
await actor_and_channel.channel.close()
258259
else:
259260
_logger.warning(
260-
"Actor for dispatch type %r is not running", stopping_dispatch.type
261+
"Actor for dispatch type %r is not running, ignoring stop request",
262+
stopping_dispatch.type,
261263
)
262264

263265
async def _run(self) -> None:

src/frequenz/dispatch/_bg_service.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ async def _run(self) -> None:
337337
pass
338338

339339
async def _execute_scheduled_event(self, dispatch: Dispatch, timer: Timer) -> None:
340-
"""Execute a scheduled event.
340+
"""Execute a scheduled event and schedules the next one if any.
341341
342342
Args:
343343
dispatch: The dispatch to execute.
@@ -366,8 +366,7 @@ async def _fetch(self, timer: Timer) -> None:
366366
"""
367367
self._initial_fetch_event.clear()
368368

369-
old_dispatches = self._dispatches
370-
self._dispatches = {}
369+
new_dispatches = {}
371370

372371
try:
373372
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
@@ -381,9 +380,9 @@ async def _fetch(self, timer: Timer) -> None:
381380
continue
382381
dispatch = Dispatch(client_dispatch)
383382

384-
self._dispatches[dispatch.id] = dispatch
385-
old_dispatch = old_dispatches.pop(dispatch.id, None)
386-
if not old_dispatch:
383+
new_dispatches[dispatch.id] = dispatch
384+
old_dispatch = self._dispatches.get(dispatch.id, None)
385+
if old_dispatch is None:
387386
_logger.debug("New dispatch: %s", dispatch)
388387
await self._update_dispatch_schedule_and_notify(
389388
dispatch, None, timer
@@ -396,23 +395,40 @@ async def _fetch(self, timer: Timer) -> None:
396395
)
397396
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
398397

399-
_logger.debug("Received %s dispatches", len(self._dispatches))
398+
_logger.debug("Received %s dispatches", len(new_dispatches))
400399

401400
except grpc.aio.AioRpcError as error:
402401
_logger.error("Error fetching dispatches: %s", error)
403-
self._dispatches = old_dispatches
404402
return
405403

406-
for dispatch in old_dispatches.values():
404+
# We make a copy because we mutate self._dispatches.keys() inside the loop
405+
for dispatch_id in frozenset(self._dispatches.keys() - new_dispatches.keys()):
406+
# Use try/except as the `self._dispatches` cache can be mutated by
407+
# stream delete events while we're iterating
408+
try:
409+
dispatch = self._dispatches.pop(dispatch_id)
410+
except KeyError as error:
411+
_logger.warning(
412+
"Inconsistency in cache detected. "
413+
+ "Tried to delete non-existing dispatch %s (%s)",
414+
dispatch_id,
415+
error,
416+
)
417+
continue
418+
407419
_logger.debug("Deleted dispatch: %s", dispatch)
408-
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
409420
await self._update_dispatch_schedule_and_notify(None, dispatch, timer)
410421

411-
# Set deleted only here as it influences the result of dispatch.started
412-
# which is used in above in _running_state_change
422+
# Set deleted only here as it influences the result of
423+
# dispatch.started, which is used in
424+
# _update_dispatch_schedule_and_notify above.
413425
dispatch._set_deleted() # pylint: disable=protected-access
414426
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
415427

428+
# Update the dispatch list with the dispatches
429+
self._dispatches.update(new_dispatches)
430+
431+
# Set event to indicate fetch ran at least once
416432
self._initial_fetch_event.set()
417433

418434
async def _update_dispatch_schedule_and_notify(

src/frequenz/dispatch/_dispatch.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,29 @@ def started(self) -> bool:
4343
Returns:
4444
True if the dispatch is started, False otherwise.
4545
"""
46+
now = datetime.now(tz=timezone.utc)
47+
return self.started_at(now)
48+
49+
def started_at(self, now: datetime) -> bool:
50+
"""Check if the dispatch has started.
51+
52+
A dispatch is considered started if the current time is after the start
53+
time but before the end time.
54+
55+
Recurring dispatches are considered started if the current time is after
56+
the start time of the last occurrence but before the end time of the
57+
last occurrence.
58+
59+
Args:
60+
now: time to use as now
61+
62+
Returns:
63+
True if the dispatch is started
64+
"""
4665
if self.deleted:
4766
return False
4867

49-
return super().started
68+
return super().started_at(now)
5069

5170
# noqa is needed because of a bug in pydoclint that makes it think a `return` without a return
5271
# value needs documenting

src/frequenz/dispatch/_merge_strategies.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import logging
77
from collections.abc import Mapping
8+
from datetime import datetime, timezone
89
from sys import maxsize
910
from typing import Any
1011

@@ -15,6 +16,8 @@
1516
from ._bg_service import MergeStrategy
1617
from ._dispatch import Dispatch
1718

19+
_logger = logging.getLogger(__name__)
20+
1821

1922
def _hash_positive(args: Any) -> int:
2023
"""Make a positive hash."""
@@ -39,24 +42,43 @@ def filter(
3942
Keeps stop events only if no other dispatches matching the
4043
strategy's criteria are running.
4144
"""
42-
if dispatch.started:
43-
logging.debug("Keeping start event %s", dispatch.id)
45+
now = datetime.now(tz=timezone.utc)
46+
47+
if dispatch.started_at(now):
48+
_logger.debug("Keeping start event %s", dispatch.id)
4449
return True
4550

46-
other_dispatches_running = any(
47-
existing_dispatch.started
51+
running_dispatch_list = [
52+
existing_dispatch
4853
for existing_dispatch in dispatches.values()
4954
if (
5055
self.identity(existing_dispatch) == self.identity(dispatch)
5156
and existing_dispatch.id != dispatch.id
5257
)
58+
]
59+
60+
other_dispatches_running = any(
61+
running_dispatch.started_at(now)
62+
for running_dispatch in running_dispatch_list
5363
)
5464

55-
logging.debug(
56-
"stop event %s because other_dispatches_running=%s",
65+
_logger.debug(
66+
"%s stop event %s because other_dispatches_running=%s",
67+
"Ignoring" if other_dispatches_running else "Allowing",
5768
dispatch.id,
5869
other_dispatches_running,
5970
)
71+
72+
if other_dispatches_running:
73+
if _logger.isEnabledFor(logging.DEBUG):
74+
_logger.debug(
75+
"Active other dispatches: %s",
76+
list(
77+
running_dispatch.id
78+
for running_dispatch in running_dispatch_list
79+
),
80+
)
81+
6082
return not other_dispatches_running
6183

6284

0 commit comments

Comments
 (0)