Skip to content

Commit 60a8286

Browse files
authored
Close all *pool's receivers when stopping the pool (#1150)
2 parents fc74e8f + 6a1d39f commit 60a8286

File tree

7 files changed

+36
-12
lines changed

7 files changed

+36
-12
lines changed

RELEASE_NOTES.md

+2
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,5 @@ This release includes a new `ConfigManager` class to simplify managing the confi
6363
- Fix a bug where if a string was passed to the `ConfigManagingActor` it would be interpreted as a sequence of 1 character strings.
6464

6565
- Remove a confusing log message in the power distributing actor.
66+
67+
- Close all receivers owned by a *pool when stopping the pool.

src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,12 @@ def __init__( # pylint: disable=too-many-arguments
9090
self._working_batteries: set[int] = set()
9191

9292
self._update_battery_status_task: asyncio.Task[None] | None = None
93+
self._batteries_status_receiver: Receiver[ComponentPoolStatus] = (
94+
batteries_status_receiver
95+
)
9396
if self._batteries:
9497
self._update_battery_status_task = asyncio.create_task(
95-
self._update_battery_status(batteries_status_receiver)
98+
self._update_battery_status(self._batteries_status_receiver)
9699
)
97100

98101
self._min_update_interval: timedelta = min_update_interval
@@ -128,6 +131,7 @@ async def stop(self) -> None:
128131
if self._update_battery_status_task:
129132
tasks_to_stop.append(cancel_and_await(self._update_battery_status_task))
130133
await asyncio.gather(*tasks_to_stop)
134+
self._batteries_status_receiver.close()
131135

132136
def _get_all_batteries(self) -> frozenset[int]:
133137
"""Get all batteries from the microgrid.

src/frequenz/sdk/timeseries/battery_pool/_component_metric_fetcher.py

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
ComponentMetricId,
2222
InverterData,
2323
)
24+
from typing_extensions import override
2425

2526
from ..._internal._asyncio import AsyncConstructible
2627
from ..._internal._constants import MAX_BATTERY_DATA_AGE_SEC
@@ -68,6 +69,10 @@ async def async_new(
6869
async def fetch_next(self) -> ComponentMetricsData | None:
6970
"""Fetch metrics for this component."""
7071

72+
@abstractmethod
73+
def stop(self) -> None:
74+
"""Stop the metric fetcher."""
75+
7176

7277
class LatestMetricsFetcher(ComponentMetricFetcher, Generic[T], ABC):
7378
"""Subscribe for the latest component data and extract the needed metrics."""
@@ -143,6 +148,11 @@ async def fetch_next(self) -> ComponentMetricsData | None:
143148

144149
return ComponentMetricsData(self._component_id, data.timestamp, metrics)
145150

151+
@override
152+
def stop(self) -> None:
153+
"""Stop the metric fetcher."""
154+
self._receiver.close()
155+
146156
@abstractmethod
147157
def _extract_metric(self, data: T, mid: ComponentMetricId) -> float: ...
148158

src/frequenz/sdk/timeseries/battery_pool/_methods.py

+14-11
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ def __init__(
112112
set()
113113
)
114114

115+
self._fetchers: dict[int, ComponentMetricFetcher] = {}
116+
115117
@classmethod
116118
def name(cls) -> str:
117119
"""Get name of the method.
@@ -166,28 +168,29 @@ async def stop(self) -> None:
166168
await asyncio.gather(
167169
*[cancel_and_await(self._send_task), cancel_and_await(self._update_task)]
168170
)
171+
for fetcher in self._fetchers.values():
172+
fetcher.stop()
169173

170-
async def _create_data_fetchers(self) -> dict[int, ComponentMetricFetcher]:
174+
async def _create_data_fetchers(self) -> None:
171175
fetchers: dict[int, ComponentMetricFetcher] = {
172176
cid: await LatestBatteryMetricsFetcher.async_new(cid, metrics)
173177
for cid, metrics in self._metric_calculator.battery_metrics.items()
174178
}
179+
self._fetchers.update(fetchers)
175180
inverter_fetchers = {
176181
cid: await LatestInverterMetricsFetcher.async_new(cid, metrics)
177182
for cid, metrics in self._metric_calculator.inverter_metrics.items()
178183
}
179-
fetchers.update(inverter_fetchers)
180-
return fetchers
184+
self._fetchers.update(inverter_fetchers)
181185

182-
def _remove_metric_fetcher(
183-
self, fetchers: dict[int, ComponentMetricFetcher], component_id: int
184-
) -> None:
186+
def _remove_metric_fetcher(self, component_id: int) -> None:
185187
_logger.error(
186188
"Removing component %d from the %s formula.",
187189
component_id,
188190
self._result_channel._name, # pylint: disable=protected-access
189191
)
190-
fetchers.pop(component_id)
192+
fetcher = self._fetchers.pop(component_id)
193+
fetcher.stop()
191194

192195
def _metric_updated(self, new_metrics: ComponentMetricsData) -> bool:
193196
cid = new_metrics.component_id
@@ -197,11 +200,11 @@ def _metric_updated(self, new_metrics: ComponentMetricsData) -> bool:
197200

198201
async def _update_and_notify(self) -> None:
199202
"""Receive component metrics and send notification when they change."""
200-
fetchers = await self._create_data_fetchers()
203+
await self._create_data_fetchers()
201204

202205
self._pending_data_fetchers = {
203206
asyncio.create_task(fetcher.fetch_next(), name=str(cid))
204-
for cid, fetcher in fetchers.items()
207+
for cid, fetcher in self._fetchers.items()
205208
}
206209
while len(self._pending_data_fetchers) > 0:
207210
done, self._pending_data_fetchers = await asyncio.wait(
@@ -210,7 +213,7 @@ async def _update_and_notify(self) -> None:
210213
for item in done:
211214
metrics = item.result()
212215
if metrics is None:
213-
self._remove_metric_fetcher(fetchers, int(item.get_name()))
216+
self._remove_metric_fetcher(int(item.get_name()))
214217
continue
215218
if self._metric_updated(metrics):
216219
self._update_event.set()
@@ -220,7 +223,7 @@ async def _update_and_notify(self) -> None:
220223
self._cached_metrics[cid] = metrics
221224
# Add fetcher back to the processing list.
222225
self._pending_data_fetchers.add(
223-
asyncio.create_task(fetchers[cid].fetch_next(), name=str(cid))
226+
asyncio.create_task(self._fetchers[cid].fetch_next(), name=str(cid))
224227
)
225228

226229
async def _send_on_update(self, min_update_interval: timedelta) -> None:

src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py

+1
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,4 @@ async def stop(self) -> None:
107107
"""Stop all tasks and channels owned by the EVChargerPool."""
108108
await self.formula_pool.stop()
109109
await self.bounds_tracker.stop()
110+
self.status_receiver.close()

src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py

+3
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ async def _stop(self) -> None:
117117
if self._task is None:
118118
return
119119
await cancel_and_await(self._task)
120+
_, fetchers = self._builder.finalize()
121+
for fetcher in fetchers.values():
122+
fetcher.stream.close()
120123

121124
@classmethod
122125
def from_receiver(

src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py

+1
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,4 @@ async def stop(self) -> None:
108108
"""Stop all tasks and channels owned by the PVInverterPool."""
109109
await self.formula_pool.stop()
110110
await self.bounds_tracker.stop()
111+
self.status_receiver.close()

0 commit comments

Comments
 (0)