Skip to content

Fetch dispatches in sync with periodic reconnect of client.stream #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* The dispatcher offers two new parameters to control the client's call and stream timeout:
- `call_timeout`: The maximum time to wait for a response from the client.
- `stream_timeout`: The maximum time to wait before restarting a stream.
* While the dispatch stream restarts we refresh our dispatch cache as well, to ensure we didn't miss any updates.

## Bug Fixes

Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ dependencies = [
# Make sure to update the version for cross-referencing also in the
# mkdocs.yml file when changing the version here (look for the config key
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk >= 1.0.0-rc1900, < 1.0.0-rc2100",
"frequenz-sdk >= 1.0.0-rc2002, < 1.0.0-rc2100",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-client-dispatch >= 0.10.2, < 0.11.0",
"frequenz-client-dispatch >= 0.11.0, < 0.12.0",
"frequenz-client-common >= 0.3.2, < 0.4.0",
"frequenz-client-base >= 0.11.0, < 0.12.0",
]
dynamic = ["version"]

Expand Down
48 changes: 31 additions & 17 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from frequenz.channels import Broadcast, Receiver, Sender, select
from frequenz.channels.timer import SkipMissedAndDrift, Timer
from frequenz.client.common.microgrid.components import ComponentCategory, ComponentId
from frequenz.client.dispatch.types import DispatchId
from frequenz.client.dispatch.types import TargetComponents as ClientTargetComponents
from frequenz.core.id import BaseId
from frequenz.sdk.actor import Actor, BackgroundService

from ._dispatch import Dispatch
Expand All @@ -26,6 +29,18 @@
"""


class DispatchActorId(BaseId, str_prefix="DA"):
"""ID for a dispatch actor."""

def __init__(self, dispatch_id: DispatchId | int) -> None:
"""Initialize the DispatchActorId.

Args:
dispatch_id: The ID of the dispatch this actor is associated with.
"""
super().__init__(int(dispatch_id))


@dataclass(frozen=True, kw_only=True)
class DispatchInfo:
"""Event emitted when the dispatch changes."""
Expand Down Expand Up @@ -161,7 +176,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
],
running_status_receiver: Receiver[Dispatch],
dispatch_identity: Callable[[Dispatch], int] | None = None,
dispatch_identity: Callable[[Dispatch], DispatchActorId] | None = None,
retry_interval: timedelta = timedelta(seconds=60),
) -> None:
"""Initialize the dispatch handler.
Expand All @@ -175,36 +190,25 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
retry_interval: How long to wait until trying to start failed actors again.
"""
super().__init__()
self._dispatch_identity: Callable[[Dispatch], int] = (
dispatch_identity if dispatch_identity else lambda d: d.id
self._dispatch_identity: Callable[[Dispatch], DispatchActorId] = (
dispatch_identity if dispatch_identity else lambda d: DispatchActorId(d.id)
)

self._dispatch_rx = running_status_receiver
self._retry_timer_rx = Timer(retry_interval, SkipMissedAndDrift())
self._actor_factory = actor_factory
self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {}
self._failed_dispatches: dict[int, Dispatch] = {}
self._actors: dict[DispatchActorId, ActorDispatcher.ActorAndChannel] = {}
self._failed_dispatches: dict[DispatchActorId, Dispatch] = {}
"""Failed dispatches that will be retried later."""

def start(self) -> None:
"""Start the background service."""
self._tasks.add(asyncio.create_task(self._run()))

def _get_target_components_from_dispatch(
self, dispatch: Dispatch
) -> TargetComponents:
if all(isinstance(comp, int) for comp in dispatch.target):
# We've confirmed all elements are integers, so we can cast.
int_components = cast(list[int], dispatch.target)
return [ComponentId(cid) for cid in int_components]
# If not all are ints, then it must be a list of ComponentCategory
# based on the definition of ClientTargetComponents.
return cast(list[ComponentCategory], dispatch.target)

async def _start_actor(self, dispatch: Dispatch) -> None:
"""Start the actor the given dispatch refers to."""
dispatch_update = DispatchInfo(
components=self._get_target_components_from_dispatch(dispatch),
components=_convert_target_components(dispatch.target),
dry_run=dispatch.dry_run,
options=dispatch.payload,
_src=dispatch,
Expand Down Expand Up @@ -301,3 +305,13 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
await self._start_actor(dispatch)
else:
await self._stop_actor(dispatch, "Dispatch stopped")


def _convert_target_components(target: ClientTargetComponents) -> TargetComponents:
if all(isinstance(comp, int) for comp in target):
# We've confirmed all elements are integers, so we can cast.
int_components = cast(list[int], target)
return [ComponentId(cid) for cid in int_components]
# If not all are ints, then it must be a list of ComponentCategory
# based on the definition of ClientTargetComponents.
return cast(list[ComponentCategory], target)
113 changes: 77 additions & 36 deletions src/frequenz/dispatch/_bg_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@
import grpc.aio
from frequenz.channels import Broadcast, Receiver, select, selected_from
from frequenz.channels.timer import SkipMissedAndResync, Timer
from frequenz.client.base.streaming import (
StreamFatalError,
StreamRetrying,
StreamStarted,
)
from frequenz.client.common.microgrid import MicrogridId
from frequenz.client.dispatch import DispatchApiClient
from frequenz.client.dispatch.types import Event
from frequenz.client.dispatch.types import DispatchEvent as ApiDispatchEvent
from frequenz.client.dispatch.types import DispatchId, Event
from frequenz.sdk.actor import BackgroundService

from ._actor_dispatcher import DispatchActorId
from ._dispatch import Dispatch
from ._event import Created, Deleted, DispatchEvent, Updated

Expand All @@ -33,11 +41,13 @@ class MergeStrategy(ABC):
"""Base class for strategies to merge running intervals."""

@abstractmethod
def identity(self, dispatch: Dispatch) -> int:
def identity(self, dispatch: Dispatch) -> DispatchActorId:
"""Identity function for the merge criteria."""

@abstractmethod
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
def filter(
self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch
) -> bool:
"""Filter dispatches based on the strategy.

Args:
Expand Down Expand Up @@ -75,7 +85,7 @@ class QueueItem:
to consider the start event when deciding whether to execute the
stop event.
"""
dispatch_id: int
dispatch_id: DispatchId
dispatch: Dispatch = field(compare=False)

def __init__(
Expand All @@ -90,7 +100,7 @@ def __init__(
# pylint: disable=too-many-arguments
def __init__(
self,
microgrid_id: int,
microgrid_id: MicrogridId,
client: DispatchApiClient,
) -> None:
"""Initialize the background service.
Expand All @@ -102,7 +112,7 @@ def __init__(
super().__init__(name="dispatch")

self._client = client
self._dispatches: dict[int, Dispatch] = {}
self._dispatches: dict[DispatchId, Dispatch] = {}
self._microgrid_id = microgrid_id

self._lifecycle_events_channel = Broadcast[DispatchEvent](
Expand Down Expand Up @@ -230,8 +240,21 @@ async def _run(self) -> None:
) as next_event_timer:
# Initial fetch
await self._fetch(next_event_timer)
stream = self._client.stream(microgrid_id=self._microgrid_id)

# pylint: disable-next=protected-access
streamer = self._client._get_stream(microgrid_id=self._microgrid_id)
stream = streamer.new_receiver(include_events=True)

# We track stream start events linked to retries to avoid re-fetching
# dispatches that were already retrieved during an initial stream start.
# The initial fetch gets all dispatches, and the StreamStarted event
# isn't always reliable due to parallel receiver creation and stream
# task initiation.
# This way we get a deterministic behavior where we only fetch
# dispatches once initially and then only when the stream is restarted.
is_retry_attempt = False

# Streaming updates
async for selected in select(next_event_timer, stream):
if selected_from(selected, next_event_timer):
if not self._scheduled_events:
Expand All @@ -240,36 +263,54 @@ async def _run(self) -> None:
heappop(self._scheduled_events).dispatch, next_event_timer
)
elif selected_from(selected, stream):
_logger.debug("Received dispatch event: %s", selected.message)
dispatch = Dispatch(selected.message.dispatch)
match selected.message.event:
case Event.CREATED:
self._dispatches[dispatch.id] = dispatch
await self._update_dispatch_schedule_and_notify(
dispatch, None, next_event_timer
)
await self._lifecycle_events_tx.send(
Created(dispatch=dispatch)
)
case Event.UPDATED:
await self._update_dispatch_schedule_and_notify(
dispatch,
self._dispatches[dispatch.id],
next_event_timer,
)
self._dispatches[dispatch.id] = dispatch
await self._lifecycle_events_tx.send(
Updated(dispatch=dispatch)
)
case Event.DELETED:
self._dispatches.pop(dispatch.id)
await self._update_dispatch_schedule_and_notify(
None, dispatch, next_event_timer
)

await self._lifecycle_events_tx.send(
Deleted(dispatch=dispatch)
match selected.message:
case ApiDispatchEvent():
_logger.debug(
"Received dispatch event: %s", selected.message
)
dispatch = Dispatch(selected.message.dispatch)
match selected.message.event:
case Event.CREATED:
self._dispatches[dispatch.id] = dispatch
await self._update_dispatch_schedule_and_notify(
dispatch, None, next_event_timer
)
await self._lifecycle_events_tx.send(
Created(dispatch=dispatch)
)
case Event.UPDATED:
await self._update_dispatch_schedule_and_notify(
dispatch,
self._dispatches[dispatch.id],
next_event_timer,
)
self._dispatches[dispatch.id] = dispatch
await self._lifecycle_events_tx.send(
Updated(dispatch=dispatch)
)
case Event.DELETED:
self._dispatches.pop(dispatch.id)
await self._update_dispatch_schedule_and_notify(
None, dispatch, next_event_timer
)

await self._lifecycle_events_tx.send(
Deleted(dispatch=dispatch)
)

case StreamRetrying():
is_retry_attempt = True

case StreamStarted():
if is_retry_attempt:
_logger.info(
"Dispatch stream restarted, getting dispatches"
)
await self._fetch(next_event_timer)
is_retry_attempt = False

case StreamFatalError():
pass

async def _execute_scheduled_event(self, dispatch: Dispatch, timer: Timer) -> None:
"""Execute a scheduled event.
Expand Down
9 changes: 5 additions & 4 deletions src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
from typing import Awaitable, Callable, Self

from frequenz.channels import Receiver
from frequenz.client.common.microgrid import MicrogridId
from frequenz.client.dispatch import DispatchApiClient
from frequenz.sdk.actor import Actor, BackgroundService
from typing_extensions import override

from ._actor_dispatcher import ActorDispatcher, DispatchInfo
from ._actor_dispatcher import ActorDispatcher, DispatchActorId, DispatchInfo
from ._bg_service import DispatchScheduler, MergeStrategy
from ._dispatch import Dispatch
from ._event import DispatchEvent
Expand Down Expand Up @@ -202,7 +203,7 @@ async def run():
def __init__(
self,
*,
microgrid_id: int,
microgrid_id: MicrogridId,
server_url: str,
key: str,
call_timeout: timedelta = timedelta(seconds=60),
Expand Down Expand Up @@ -328,8 +329,8 @@ async def start_managing(

self._empty_event.clear()

def id_identity(dispatch: Dispatch) -> int:
return dispatch.id
def id_identity(dispatch: Dispatch) -> DispatchActorId:
return DispatchActorId(dispatch.id)

dispatcher = ActorDispatcher(
actor_factory=actor_factory,
Expand Down
21 changes: 16 additions & 5 deletions src/frequenz/dispatch/_merge_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,34 @@

import logging
from collections.abc import Mapping
from sys import maxsize
from typing import Any

from frequenz.client.dispatch.types import DispatchId
from typing_extensions import override

from ._actor_dispatcher import DispatchActorId
from ._bg_service import MergeStrategy
from ._dispatch import Dispatch


def _hash_positive(args: Any) -> int:
"""Make a positive hash."""
return hash(args) + maxsize + 1


class MergeByType(MergeStrategy):
"""Merge running intervals based on the dispatch type."""

@override
def identity(self, dispatch: Dispatch) -> int:
def identity(self, dispatch: Dispatch) -> DispatchActorId:
"""Identity function for the merge criteria."""
return hash(dispatch.type)
return DispatchActorId(_hash_positive(dispatch.type))

@override
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
def filter(
self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch
) -> bool:
"""Filter dispatches based on the merge strategy.

Keeps start events.
Expand Down Expand Up @@ -53,6 +64,6 @@ class MergeByTypeTarget(MergeByType):
"""Merge running intervals based on the dispatch type and target."""

@override
def identity(self, dispatch: Dispatch) -> int:
def identity(self, dispatch: Dispatch) -> DispatchActorId:
"""Identity function for the merge criteria."""
return hash((dispatch.type, tuple(dispatch.target)))
return DispatchActorId(_hash_positive((dispatch.type, tuple(dispatch.target))))
Loading
Loading