Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements from the last PR #114

Merged
merged 2 commits into from
Mar 4, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 15 additions & 26 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,25 @@ async def main():
```
"""

class RetryFailedDispatches:
"""Manages the retry of failed dispatches."""
class FailedDispatchesRetrier(BackgroundService):
"""Manages the retring of failed dispatches."""

def __init__(self, retry_interval: timedelta) -> None:
"""Initialize the retry manager.

Args:
retry_interval: The interval between retries.
"""
super().__init__()
self._retry_interval = retry_interval
self._channel = Broadcast[Dispatch](name="retry_channel")
self._sender = self._channel.new_sender()
self._tasks: set[asyncio.Task[None]] = set()

def start(self) -> None:
"""Start the background service.

This is a no-op.
"""

def new_receiver(self) -> Receiver[Dispatch]:
"""Create a new receiver for dispatches to retry.
Expand Down Expand Up @@ -187,7 +193,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
],
running_status_receiver: Receiver[Dispatch],
dispatch_identity: Callable[[Dispatch], int] | None = None,
retry_interval: timedelta | None = timedelta(seconds=60),
retry_interval: timedelta = timedelta(seconds=60),
) -> None:
"""Initialize the dispatch handler.

Expand All @@ -197,7 +203,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
running_status_receiver: The receiver for dispatch running status changes.
dispatch_identity: A function to identify to which actor a dispatch refers.
By default, it uses the dispatch ID.
retry_interval: The interval between retries. If `None`, retries are disabled.
retry_interval: The interval between retries.
"""
super().__init__()
self._dispatch_identity: Callable[[Dispatch], int] = (
Expand All @@ -211,11 +217,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
name="dispatch_updates_channel", resend_latest=True
)
self._updates_sender = self._updates_channel.new_sender()
self._retrier = (
ActorDispatcher.RetryFailedDispatches(retry_interval)
if retry_interval
else None
)
self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)

def start(self) -> None:
"""Start the background service."""
Expand Down Expand Up @@ -258,12 +260,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
dispatch.type,
exc_info=e,
)
if self._retrier:
self._retrier.retry(dispatch)
else:
_logger.error(
"No retry mechanism enabled, dispatch %r failed", dispatch
)
self._retrier.retry(dispatch)
else:
# No exception occurred, so we can add the actor to the list
self._actors[identity] = actor
Expand All @@ -275,26 +272,18 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
stopping_dispatch: The dispatch that is stopping the actor.
msg: The message to be passed to the actors being stopped.
"""
actor: Actor | None = None
identity = self._dispatch_identity(stopping_dispatch)

actor = self._actors.get(identity)

if actor:
if actor := self._actors.pop(identity, None):
await actor.stop(msg)

del self._actors[identity]
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)

async def _run(self) -> None:
"""Run the background service."""
if not self._retrier:
async for dispatch in self._dispatch_rx:
await self._handle_dispatch(dispatch)
else:
async with self._retrier:
retry_recv = self._retrier.new_receiver()

async for selected in select(retry_recv, self._dispatch_rx):
Expand Down
Loading