Skip to content

Commit f3b1103

Browse files
committed
Add dispatch runner
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 25ebc78 commit f3b1103

File tree

4 files changed

+354
-1
lines changed

4 files changed

+354
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* We now provide the `DispatchRunnerActor` class, a class to manage actors based on incoming dispatches.
1414

1515
## Bug Fixes
1616

src/frequenz/dispatch/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@
77
88
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
99
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
10+
* [DispatchRunnerActor][frequenz.dispatch.DispatchRunnerActor]: An actor to
11+
manage other actors based on incoming dispatches.
1012
* [Created][frequenz.dispatch.Created],
1113
[Updated][frequenz.dispatch.Updated],
1214
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
1315
1416
"""
1517

18+
from ._actor_runner import DispatchRunnerActor, DispatchUpdate
1619
from ._dispatch import Dispatch, RunningState
1720
from ._dispatcher import Dispatcher, ReceiverFetcher
1821
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -26,4 +29,6 @@
2629
"Updated",
2730
"Dispatch",
2831
"RunningState",
32+
"DispatchRunnerActor",
33+
"DispatchUpdate",
2934
]
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# License: All rights reserved
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Helper class to manage actors based on dispatches."""
5+
6+
import logging
7+
from dataclasses import dataclass
8+
from typing import Any
9+
10+
from frequenz.channels import Receiver, Sender
11+
from frequenz.client.dispatch.types import ComponentSelector
12+
from frequenz.sdk.actor import Actor
13+
14+
from ._dispatch import Dispatch, RunningState
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass(frozen=True, kw_only=True)
20+
class DispatchUpdate:
21+
"""Event emitted when the dispatch configuration changes."""
22+
23+
components: ComponentSelector
24+
"""Components to be used."""
25+
26+
dry_run: bool
27+
"""Whether this is a dry run."""
28+
29+
payload: dict[str, Any]
30+
"""Additional payload."""
31+
32+
33+
class DispatchRunnerActor(Actor):
34+
"""Helper class to manage actors based on dispatches.
35+
36+
Example usage:
37+
38+
```python
39+
import os
40+
import asyncio
41+
from frequenz.dispatch import Dispatcher, DispatchRunnerActor, DispatchUpdate
42+
from frequenz.client.dispatch.types import ComponentSelector
43+
from frequenz.client.common.microgrid.components import ComponentCategory
44+
45+
from frequenz.channels import Receiver, Broadcast
46+
from unittest.mock import MagicMock
47+
48+
class MyActor(Actor):
49+
def __init__(self, config_channel: Receiver[DispatchUpdate]):
50+
super().__init__()
51+
self._config_channel = config_channel
52+
self._dry_run: bool
53+
self._payload: dict[str, Any]
54+
55+
async def _run(self) -> None:
56+
while True:
57+
config = await self._config_channel.receive()
58+
print("Received config:", config)
59+
60+
self.set_components(config.components)
61+
self._dry_run = config.dry_run
62+
self._payload = config.payload
63+
64+
def set_components(self, components: ComponentSelector) -> None:
65+
match components:
66+
case [int(), *_] as component_ids:
67+
print("Dispatch: Setting components to %s", components)
68+
case [ComponentCategory.BATTERY, *_]:
69+
print("Dispatch: Using all battery components")
70+
case unsupported:
71+
print(
72+
"Dispatch: Requested an unsupported selector %r, "
73+
"but only component IDs or category BATTERY are supported.",
74+
unsupported,
75+
)
76+
77+
async def run():
78+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
79+
key = os.getenv("DISPATCH_API_KEY", "some-key")
80+
81+
microgrid_id = 1
82+
83+
dispatcher = Dispatcher(
84+
microgrid_id=microgrid_id,
85+
server_url=url,
86+
key=key
87+
)
88+
89+
# Create config channel to receive (re-)configuration events pre-start and mid-run
90+
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
91+
92+
# Start actor and supporting actor, give each a config channel receiver
93+
my_actor = MyActor(dispatch_updates_channel.new_receiver())
94+
95+
# Optional: One (or more) additional actors can be added to the set
96+
supporting_actor = MagicMock(dispatch_updates_channel.new_receiver())
97+
98+
status_receiver = dispatcher.running_status_change.new_receiver()
99+
100+
dispatch_runner = DispatchRunnerActor(
101+
actors=frozenset([my_actor, supporting_actor]),
102+
dispatch_type="EXAMPLE",
103+
running_status_receiver=status_receiver,
104+
configuration_sender=dispatch_updates_channel.new_sender(),
105+
)
106+
107+
await asyncio.gather(dispatcher.start(), dispatch_runner.start())
108+
```
109+
"""
110+
111+
def __init__(
112+
self,
113+
actors: frozenset[Actor],
114+
dispatch_type: str,
115+
running_status_receiver: Receiver[Dispatch],
116+
configuration_sender: Sender[DispatchUpdate] | None = None,
117+
) -> None:
118+
"""Initialize the dispatch handler.
119+
120+
Args:
121+
actors: The actors to handle.
122+
dispatch_type: The type of dispatches to handle.
123+
running_status_receiver: The receiver for dispatch running status changes.
124+
configuration_sender: The sender for dispatch configuration events
125+
"""
126+
super().__init__()
127+
self._dispatch_rx = running_status_receiver
128+
self._actors = actors
129+
self._dispatch_type = dispatch_type
130+
self._configuration_sender = configuration_sender
131+
132+
def _start_actors(self) -> None:
133+
"""Start all actors."""
134+
for actor in self._actors:
135+
if actor.is_running:
136+
_logger.warning("Actor %s is already running", actor.name)
137+
else:
138+
actor.start()
139+
140+
async def _stop_actors(self, msg: str) -> None:
141+
"""Stop all actors.
142+
143+
Args:
144+
msg: The message to be passed to the actors being stopped.
145+
"""
146+
for actor in self._actors:
147+
if actor.is_running:
148+
await actor.stop(msg)
149+
else:
150+
_logger.warning("Actor %s is not running", actor.name)
151+
152+
async def _run(self) -> None:
153+
"""Wait for dispatches and handle them."""
154+
async for dispatch in self._dispatch_rx:
155+
await self._handle_dispatch(dispatch=dispatch)
156+
157+
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
158+
"""Handle a dispatch.
159+
160+
Args:
161+
dispatch: The dispatch to handle.
162+
"""
163+
running = dispatch.running(self._dispatch_type)
164+
match running:
165+
case RunningState.STOPPED:
166+
_logger.info("Stopped by dispatch %s", dispatch.id)
167+
await self._stop_actors("Dispatch stopped")
168+
case RunningState.RUNNING:
169+
if self._configuration_sender is not None:
170+
_logger.info("Updated configuration by dispatch %s", dispatch.id)
171+
await self._configuration_sender.send(
172+
DispatchUpdate(
173+
components=dispatch.selector,
174+
dry_run=dispatch.dry_run,
175+
payload=dispatch.payload,
176+
)
177+
)
178+
179+
_logger.info("Started by dispatch %s", dispatch.id)
180+
self._start_actors()
181+
case RunningState.DIFFERENT_TYPE:
182+
_logger.debug(
183+
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
184+
)

tests/test_runner.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
# LICENSE: ALL RIGHTS RESERVED
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Test the dispatch runner."""
5+
6+
import asyncio
7+
from dataclasses import dataclass, replace
8+
from datetime import datetime, timedelta, timezone
9+
from typing import AsyncIterator, Iterator
10+
11+
import async_solipsism
12+
import pytest
13+
import time_machine
14+
from frequenz.channels import Broadcast, Receiver, Sender
15+
from frequenz.client.dispatch.test.generator import DispatchGenerator
16+
from frequenz.client.dispatch.types import Frequency
17+
from frequenz.sdk.actor import Actor
18+
from pytest import fixture
19+
20+
from frequenz.dispatch import Dispatch, DispatchRunnerActor, DispatchUpdate
21+
22+
23+
# This method replaces the event loop for all tests in the file.
24+
@pytest.fixture
25+
def event_loop_policy() -> async_solipsism.EventLoopPolicy:
26+
"""Return an event loop policy that uses the async solipsism event loop."""
27+
return async_solipsism.EventLoopPolicy()
28+
29+
30+
@fixture
31+
def fake_time() -> Iterator[time_machine.Coordinates]:
32+
"""Replace real time with a time machine that doesn't automatically tick."""
33+
# destination can be a datetime or a timestamp (int), so are moving to the
34+
# epoch (in UTC!)
35+
with time_machine.travel(destination=0, tick=False) as traveller:
36+
yield traveller
37+
38+
39+
def _now() -> datetime:
40+
"""Return the current time in UTC."""
41+
return datetime.now(tz=timezone.utc)
42+
43+
44+
class MockActor(Actor):
45+
"""Mock actor for testing."""
46+
47+
async def _run(self) -> None:
48+
while True:
49+
await asyncio.sleep(1)
50+
51+
52+
@dataclass
53+
class TestEnv:
54+
"""Test environment."""
55+
56+
actor: Actor
57+
runner_actor: DispatchRunnerActor
58+
running_status_sender: Sender[Dispatch]
59+
configuration_receiver: Receiver[DispatchUpdate]
60+
generator: DispatchGenerator = DispatchGenerator()
61+
62+
63+
@fixture
64+
async def test_env() -> AsyncIterator[TestEnv]:
65+
"""Create a test environment."""
66+
channel = Broadcast[Dispatch](name="dispatch ready test channel")
67+
config_channel = Broadcast[DispatchUpdate](name="dispatch config test channel")
68+
69+
actor = MockActor()
70+
71+
runner_actor = DispatchRunnerActor(
72+
actors=frozenset([actor]),
73+
dispatch_type="UNIT_TEST",
74+
running_status_receiver=channel.new_receiver(),
75+
configuration_sender=config_channel.new_sender(),
76+
)
77+
78+
runner_actor.start()
79+
80+
yield TestEnv(
81+
actor=actor,
82+
runner_actor=runner_actor,
83+
running_status_sender=channel.new_sender(),
84+
configuration_receiver=config_channel.new_receiver(),
85+
)
86+
87+
await runner_actor.stop()
88+
89+
90+
async def test_simple_start_stop(
91+
test_env: TestEnv,
92+
fake_time: time_machine.Coordinates,
93+
) -> None:
94+
"""Test behavior when receiving start/stop messages."""
95+
now = _now()
96+
duration = timedelta(minutes=10)
97+
dispatch = test_env.generator.generate_dispatch()
98+
dispatch = replace(
99+
dispatch,
100+
active=True,
101+
dry_run=False,
102+
duration=duration,
103+
start_time=now,
104+
payload={"test": True},
105+
type="UNIT_TEST",
106+
recurrence=replace(
107+
dispatch.recurrence,
108+
frequency=Frequency.UNSPECIFIED,
109+
),
110+
)
111+
112+
await test_env.running_status_sender.send(Dispatch(dispatch))
113+
fake_time.shift(timedelta(seconds=1))
114+
115+
event = await test_env.configuration_receiver.receive()
116+
assert event.payload == {"test": True}
117+
assert event.components == dispatch.selector
118+
assert event.dry_run is False
119+
120+
assert test_env.actor.is_running is True
121+
122+
fake_time.shift(duration)
123+
await test_env.running_status_sender.send(Dispatch(dispatch))
124+
125+
# Give await actor.stop a chance to run in DispatchRunnerActor
126+
await asyncio.sleep(0.1)
127+
128+
assert test_env.actor.is_running is False
129+
130+
131+
async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
132+
"""Test the dry run mode."""
133+
dispatch = test_env.generator.generate_dispatch()
134+
dispatch = replace(
135+
dispatch,
136+
dry_run=True,
137+
active=True,
138+
start_time=_now(),
139+
duration=timedelta(minutes=10),
140+
type="UNIT_TEST",
141+
recurrence=replace(
142+
dispatch.recurrence,
143+
frequency=Frequency.UNSPECIFIED,
144+
),
145+
)
146+
147+
await test_env.running_status_sender.send(Dispatch(dispatch))
148+
fake_time.shift(timedelta(seconds=1))
149+
150+
event = await test_env.configuration_receiver.receive()
151+
152+
assert event.dry_run is dispatch.dry_run
153+
assert event.components == dispatch.selector
154+
assert event.payload == dispatch.payload
155+
assert test_env.actor.is_running is True
156+
157+
assert dispatch.duration is not None
158+
fake_time.shift(dispatch.duration)
159+
await test_env.running_status_sender.send(Dispatch(dispatch))
160+
161+
# Give await actor.stop a chance to run in DispatchRunnerActor
162+
await asyncio.sleep(0.1)
163+
164+
assert test_env.actor.is_running is False

0 commit comments

Comments
 (0)