Skip to content

Commit ec2bc21

Browse files
committed
Fix wrong running behaviors with duration=None
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent bb46cbc commit ec2bc21

File tree

4 files changed

+61
-7
lines changed

4 files changed

+61
-7
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ dependencies = [
4141
# plugins.mkdocstrings.handlers.python.import)
4242
"frequenz-sdk == 1.0.0-rc900",
4343
"frequenz-channels >= 1.1.0, < 2.0.0",
44-
"frequenz-client-dispatch >= 0.6.0, < 0.7.0",
44+
# "frequenz-client-dispatch >= 0.6.0, < 0.7.0",
45+
"frequenz-client-dispatch @ git+https://github.com/frequenz-floss/[email protected]",
4546
]
4647
dynamic = ["version"]
4748

src/frequenz/dispatch/_dispatch.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,14 @@ def running(self, type_: str) -> RunningState:
117117
if not self.active or self.deleted:
118118
return RunningState.STOPPED
119119

120-
# A dispatch without duration is always running
120+
now = datetime.now(tz=timezone.utc)
121+
122+
# A dispatch without duration is always running once it started
121123
if self.duration is None:
122-
return RunningState.RUNNING
124+
if self.start_time <= now:
125+
return RunningState.RUNNING
126+
return RunningState.STOPPED
123127

124-
now = datetime.now(tz=timezone.utc)
125128
if until := self._until(now):
126129
return RunningState.RUNNING if now < until else RunningState.STOPPED
127130

@@ -189,6 +192,7 @@ def next_run_after(self, after: datetime) -> datetime | None:
189192
if (
190193
not self.recurrence.frequency
191194
or self.recurrence.frequency == Frequency.UNSPECIFIED
195+
or self.duration is None # Infinite duration
192196
):
193197
if after > self.start_time:
194198
return None
@@ -240,7 +244,13 @@ def _until(self, now: datetime) -> datetime | None:
240244
241245
Returns:
242246
The time when the dispatch should end or None if the dispatch is not running.
247+
248+
Raises:
249+
ValueError: If the dispatch has no duration.
243250
"""
251+
if self.duration is None:
252+
raise ValueError("_until: Dispatch has no duration")
253+
244254
if (
245255
not self.recurrence.frequency
246256
or self.recurrence.frequency == Frequency.UNSPECIFIED

src/frequenz/dispatch/actor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,17 @@ async def _fetch(self) -> None:
118118

119119
for dispatch in old_dispatches.values():
120120
_logger.info("Deleted dispatch: %s", dispatch)
121-
dispatch._set_deleted() # pylint: disable=protected-access
122-
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
123121
if task := self._scheduled.pop(dispatch.id, None):
124122
task.cancel()
125123

126124
if self._running_state_change(None, dispatch):
127125
await self._send_running_state_change(dispatch)
128126

127+
# Set deleted only here as it influences the result of dispatch.running()
128+
# which is used in above in _running_state_change
129+
dispatch._set_deleted() # pylint: disable=protected-access
130+
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
131+
129132
def _update_dispatch_schedule(
130133
self, dispatch: Dispatch, old_dispatch: Dispatch | None
131134
) -> None:

tests/test_frequenz_dispatch.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@
1818
from frequenz.client.dispatch.types import Frequency
1919
from pytest import fixture
2020

21-
from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated
21+
from frequenz.dispatch import (
22+
Created,
23+
Deleted,
24+
Dispatch,
25+
DispatchEvent,
26+
RunningState,
27+
Updated,
28+
)
2229
from frequenz.dispatch.actor import DispatchingActor
2330

2431

@@ -220,6 +227,39 @@ async def test_existing_dispatch_deleted(
220227
assert dispatch == Dispatch(sample, deleted=True)
221228

222229

230+
async def test_dispatch_inf_duration_deleted(
231+
actor_env: ActorTestEnv,
232+
generator: DispatchGenerator,
233+
fake_time: time_machine.Coordinates,
234+
) -> None:
235+
"""Test that a dispatch with infinite duration can be deleted while running."""
236+
# Generate a dispatch with infinite duration (duration=None)
237+
sample = generator.generate_dispatch()
238+
sample = replace(
239+
sample, active=True, duration=None, start_time=_now() + timedelta(seconds=5)
240+
)
241+
# Create the dispatch
242+
sample = await _test_new_dispatch_created(actor_env, sample)
243+
# Advance time to when the dispatch should start
244+
fake_time.shift(timedelta(seconds=40))
245+
await asyncio.sleep(40)
246+
# Expect notification of the dispatch being ready to run
247+
ready_dispatch = await actor_env.ready_dispatches.receive()
248+
print("Received ready dispatch")
249+
assert ready_dispatch.running(sample.type) == RunningState.RUNNING
250+
251+
# Now delete the dispatch
252+
await actor_env.client.delete(
253+
microgrid_id=actor_env.microgrid_id, dispatch_id=sample.id
254+
)
255+
fake_time.shift(timedelta(seconds=10))
256+
print("Deleted dispatch")
257+
await asyncio.sleep(1)
258+
# Expect notification to stop the dispatch
259+
done_dispatch = await actor_env.ready_dispatches.receive()
260+
assert done_dispatch.running(sample.type) == RunningState.STOPPED
261+
262+
223263
async def test_dispatch_schedule(
224264
actor_env: ActorTestEnv,
225265
generator: DispatchGenerator,

0 commit comments

Comments
 (0)