Skip to content

WIP: Add test for filter calls between fetch() #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: v0.x.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@

## Upgrading

The `frequenz.dispatch.TargetComponents` type was removed, use `frequenz.client.dispatch.TargetComponents` instead.
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->

## New Features

* The dispatcher offers two new parameters to control the client's call and stream timeout:
- `call_timeout`: The maximum time to wait for a response from the client.
- `stream_timeout`: The maximum time to wait before restarting a stream.
* While the dispatch stream restarts we refresh our dispatch cache as well, to ensure we didn't miss any updates.
<!-- Here goes the main new features and examples or instructions on how to use them -->

## Bug Fixes

* Fixed that dispatches are never retried on failure, but instead an infinite loop of retry logs is triggered.
* The merge by type class now uses the correct logger path.
* The merge by type was made more robust under heavy load, making sure to use the same `now` for all dispatches that are checked.
* Fix that the merge filter was using an outdated dispatches dict once fetch() ran.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dependencies = [
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-client-dispatch >= 0.11.0, < 0.12.0",
"frequenz-client-dispatch >= 0.11.1, < 0.12.0",
"frequenz-client-common >= 0.3.2, < 0.4.0",
"frequenz-client-base >= 0.11.0, < 0.12.0",
]
Expand Down
4 changes: 3 additions & 1 deletion src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,13 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
identity = self._dispatch_identity(stopping_dispatch)

if actor_and_channel := self._actors.pop(identity, None):
_logger.info("Stopping actor for dispatch type %r", stopping_dispatch.type)
await actor_and_channel.actor.stop(msg)
await actor_and_channel.channel.close()
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
"Actor for dispatch type %r is not running, ignoring stop request",
stopping_dispatch.type,
)

async def _run(self) -> None:
Expand Down
27 changes: 14 additions & 13 deletions src/frequenz/dispatch/_bg_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ async def _run(self) -> None:
pass

async def _execute_scheduled_event(self, dispatch: Dispatch, timer: Timer) -> None:
"""Execute a scheduled event.
"""Execute a scheduled event and schedules the next one if any.

Args:
dispatch: The dispatch to execute.
Expand Down Expand Up @@ -366,8 +366,8 @@ async def _fetch(self, timer: Timer) -> None:
"""
self._initial_fetch_event.clear()

old_dispatches = self._dispatches
self._dispatches = {}
old_dispatches = set(self._dispatches.keys())
new_dispatches = {}

try:
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
Expand All @@ -381,9 +381,9 @@ async def _fetch(self, timer: Timer) -> None:
continue
dispatch = Dispatch(client_dispatch)

self._dispatches[dispatch.id] = dispatch
old_dispatch = old_dispatches.pop(dispatch.id, None)
if not old_dispatch:
new_dispatches[dispatch.id] = dispatch
old_dispatch = self._dispatches.get(dispatch.id, None)
if old_dispatch is None:
_logger.debug("New dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(
dispatch, None, timer
Expand All @@ -396,22 +396,23 @@ async def _fetch(self, timer: Timer) -> None:
)
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))

_logger.debug("Received %s dispatches", len(self._dispatches))
_logger.debug("Received %s dispatches", len(new_dispatches))

except grpc.aio.AioRpcError as error:
_logger.error("Error fetching dispatches: %s", error)
self._dispatches = old_dispatches
return

for dispatch in old_dispatches.values():
# Delete old dispatches
for dispatch_id in old_dispatches:
if dispatch_id in new_dispatches:
continue

dispatch = self._dispatches.pop(dispatch_id)
_logger.debug("Deleted dispatch: %s", dispatch)
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
await self._update_dispatch_schedule_and_notify(None, dispatch, timer)

# Set deleted only here as it influences the result of dispatch.started
# which is used in above in _running_state_change
dispatch._set_deleted() # pylint: disable=protected-access
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
self._dispatches.update(new_dispatches)

Comment on lines +405 to 416
Copy link
Preview

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatches dictionary is updated after processing deletions, but the deletion logic uses the old dispatches dictionary. This could lead to inconsistent state if the same dispatch ID exists in both old and new dispatches.

Copilot uses AI. Check for mistakes.

self._initial_fetch_event.set()

Expand Down
21 changes: 20 additions & 1 deletion src/frequenz/dispatch/_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,29 @@ def started(self) -> bool:
Returns:
True if the dispatch is started, False otherwise.
"""
now = datetime.now(tz=timezone.utc)
return self.started_at(now)

def started_at(self, now: datetime) -> bool:
"""Check if the dispatch has started.
A dispatch is considered started if the current time is after the start
time but before the end time.
Recurring dispatches are considered started if the current time is after
the start time of the last occurrence but before the end time of the
last occurrence.
Args:
now: time to use as now
Returns:
True if the dispatch is started
"""
if self.deleted:
return False

return super().started
return super().started_at(now)

# noqa is needed because of a bug in pydoclint that makes it think a `return` without a return
# value needs documenting
Expand Down
16 changes: 11 additions & 5 deletions src/frequenz/dispatch/_merge_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import logging
from collections.abc import Mapping
from datetime import datetime, timezone
from sys import maxsize
from typing import Any

Expand All @@ -15,6 +16,8 @@
from ._bg_service import MergeStrategy
from ._dispatch import Dispatch

_logger = logging.getLogger(__name__)


def _hash_positive(args: Any) -> int:
"""Make a positive hash."""
Expand All @@ -39,21 +42,24 @@ def filter(
Keeps stop events only if no other dispatches matching the
strategy's criteria are running.
"""
if dispatch.started:
logging.debug("Keeping start event %s", dispatch.id)
now = datetime.now(tz=timezone.utc)

if dispatch.started_at(now):
_logger.debug("Keeping start event %s", dispatch.id)
return True

other_dispatches_running = any(
existing_dispatch.started
existing_dispatch.started_at(now)
for existing_dispatch in dispatches.values()
if (
self.identity(existing_dispatch) == self.identity(dispatch)
and existing_dispatch.id != dispatch.id
)
)

logging.debug(
"stop event %s because other_dispatches_running=%s",
_logger.debug(
"%s stop event %s because other_dispatches_running=%s",
"Ignoring" if other_dispatches_running else "Allowing",
dispatch.id,
other_dispatches_running,
)
Expand Down
83 changes: 82 additions & 1 deletion tests/test_frequenz_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
import pytest
import time_machine
from frequenz.channels import Receiver
from frequenz.channels.timer import SkipMissedAndResync, Timer
from frequenz.client.common.microgrid import MicrogridId
from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule
from frequenz.client.dispatch.test.client import FakeClient, to_create_params
from frequenz.client.dispatch.test.generator import DispatchGenerator
from frequenz.client.dispatch.types import Dispatch as BaseDispatch
from frequenz.client.dispatch.types import TargetIds
from frequenz.client.dispatch.types import DispatchId, TargetIds
from pytest import fixture

from frequenz.dispatch import (
Expand Down Expand Up @@ -692,6 +693,86 @@ async def test_multiple_dispatches_sequential_intervals_merge(
assert not stopped.started


async def test_sequential_overlapping_dispatches_between_fetch(
fake_time: time_machine.Coordinates,
generator: DispatchGenerator,
) -> None:
"""Test that sequential overlapping dispatches are handled correctly."""
microgrid_id = MicrogridId(randint(1, 100))
client = FakeClient()
service = DispatchScheduler(microgrid_id=microgrid_id, client=client)
service.start()

receiver = await service.new_running_state_event_receiver(
"TEST_TYPE", merge_strategy=MergeByType()
)

# Create two overlapping dispatches
dispatch1 = replace(
generator.generate_dispatch(),
active=True,
duration=timedelta(seconds=10),
target=TargetIds(1, 2),
start_time=_now() + timedelta(seconds=5),
recurrence=RecurrenceRule(),
type="TEST_TYPE",
)
dispatch2 = replace(
generator.generate_dispatch(),
active=True,
duration=timedelta(seconds=10),
target=TargetIds(3, 4),
start_time=_now() + timedelta(seconds=8), # overlaps with dispatch1
recurrence=RecurrenceRule(),
type="TEST_TYPE",
)
await client.create(**to_create_params(microgrid_id, dispatch1))

timer = Timer(timedelta(seconds=100), SkipMissedAndResync(), auto_start=False)
await service._fetch(timer) # pylint: disable=protected-access
Copy link
Preview

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Accessing protected methods in tests can make tests brittle. Consider exposing a public method for testing or using dependency injection to make the fetch operation testable.

Copilot uses AI. Check for mistakes.


await client.create(**to_create_params(microgrid_id, dispatch2))

# Move time forward to start first
fake_time.shift(timedelta(seconds=6))
await asyncio.sleep(1)
import logging

Comment on lines +739 to +740
Copy link
Preview

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import statements should be at the top of the file, not inside functions. Move this import to the top with other imports.

Suggested change
import logging

Copilot uses AI. Check for mistakes.

logging.debug("We see: %s", service._dispatches)
Comment on lines +739 to +741
Copy link
Preview

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug logging and protected member access should be removed from test code. This appears to be leftover debugging code.

Suggested change
import logging
logging.debug("We see: %s", service._dispatches)

Copilot uses AI. Check for mistakes.


started1 = await receiver.receive()
assert started1.id == DispatchId(1)

# Move time to second dispatch
fake_time.shift(timedelta(seconds=6))
await asyncio.sleep(1)

started2 = await receiver.receive()
assert started2.id == DispatchId(2)
assert started2.started
assert started1.started

# Now we move to when the first one ended
fake_time.shift(timedelta(seconds=5))
await asyncio.sleep(1)

with pytest.raises(asyncio.TimeoutError):
logging.debug("Wait for now starts %s", _now())
Copy link
Preview

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug logging should be removed from test code. This appears to be leftover debugging code.

Suggested change
logging.debug("Wait for now starts %s", _now())

Copilot uses AI. Check for mistakes.

started3 = await receiver.receive()
assert started3.id != started2.id, "Received unexpected event"

assert not started1.started
assert started2.started
await asyncio.sleep(1)

# Next we move to when all dispatches should have stopped
fake_time.shift(timedelta(seconds=4))
started4 = await receiver.receive()

# We only expect a message for dispatch2, dispatch1 should never send a stop
assert started4.id == DispatchId(2)


@pytest.mark.parametrize("merge_strategy", [MergeByType(), MergeByTypeTarget()])
async def test_at_least_one_running_filter(
fake_time: time_machine.Coordinates,
Expand Down
Loading