32
32
from frequenz .channels import Bidirectional , Peekable , Receiver
33
33
from google .protobuf .empty_pb2 import Empty # pylint: disable=no-name-in-module
34
34
35
+ from ... import microgrid
35
36
from ...actor ._decorator import actor
36
37
from ...microgrid import ComponentGraph
37
38
from ...microgrid .client import MicrogridApiClient
41
42
ComponentCategory ,
42
43
InverterData ,
43
44
)
44
- from ...power import DistributionAlgorithm , InvBatPair
45
+ from ...power import DistributionAlgorithm , DistributionResult , InvBatPair
45
46
from ._battery_pool_status import BatteryPoolStatus
46
47
from .request import Request
47
48
from .result import Error , Ignored , OutOfBound , PartialFailure , Result , Success
@@ -140,22 +141,17 @@ class PowerDistributingActor:
140
141
141
142
def __init__ (
142
143
self ,
143
- microgrid_api : MicrogridApiClient ,
144
- component_graph : ComponentGraph ,
145
144
users_channels : Dict [str , Bidirectional .Handle [Result , Request ]],
146
145
wait_for_data_sec : float = 2 ,
147
146
) -> None :
148
147
"""Create class instance.
149
148
150
149
Args:
151
- microgrid_api: api for sending the requests.
152
- component_graph: component graph of the given microgrid api.
153
150
users_channels: BidirectionalHandle for each user. Key should be
154
151
user id and value should be BidirectionalHandle.
155
152
wait_for_data_sec: How long actor should wait before processing first
156
153
request. It is a time needed to collect first components data.
157
154
"""
158
- self ._api = microgrid_api
159
155
self ._wait_for_data_sec = wait_for_data_sec
160
156
161
157
# NOTE: power_distributor_exponent should be received from ConfigManager
@@ -164,19 +160,16 @@ def __init__(
164
160
self .power_distributor_exponent
165
161
)
166
162
167
- batteries = component_graph .components (
168
- component_category = {ComponentCategory .BATTERY }
169
- )
163
+ graph = microgrid .get ().component_graph
164
+ batteries = graph .components (component_category = {ComponentCategory .BATTERY })
170
165
171
166
self ._battery_pool = BatteryPoolStatus (
172
167
battery_ids = {battery .component_id for battery in batteries },
173
168
max_blocking_duration_sec = 30.0 ,
174
169
max_data_age_sec = 10.0 ,
175
170
)
176
171
177
- self ._bat_inv_map , self ._inv_bat_map = self ._get_components_pairs (
178
- component_graph
179
- )
172
+ self ._bat_inv_map , self ._inv_bat_map = self ._get_components_pairs (graph )
180
173
self ._battery_receivers : Dict [int , Peekable [BatteryData ]] = {}
181
174
self ._inverter_receivers : Dict [int , Peekable [InverterData ]] = {}
182
175
@@ -250,6 +243,7 @@ async def run(self) -> None:
250
243
"""
251
244
await self ._create_channels ()
252
245
await self ._battery_pool .async_init ()
246
+ api = microgrid .get ().api_client
253
247
254
248
# Wait few seconds to get data from the channels created above.
255
249
await asyncio .sleep (self ._wait_for_data_sec )
@@ -291,20 +285,10 @@ async def run(self) -> None:
291
285
str (battery_distribution ),
292
286
)
293
287
294
- tasks = {
295
- inverter_id : asyncio .create_task (
296
- self ._api .set_power (inverter_id , power )
297
- )
298
- for inverter_id , power in distribution .distribution .items ()
299
- }
300
-
301
- _ , pending = await asyncio .wait (
302
- tasks .values (),
303
- timeout = request .request_timeout_sec ,
304
- return_when = ALL_COMPLETED ,
288
+ tasks = await self ._set_distributed_power (
289
+ api , distribution , request .request_timeout_sec
305
290
)
306
291
307
- await self ._cancel_tasks (pending )
308
292
failed_power , failed_batteries = self ._parse_result (
309
293
tasks , distribution .distribution , request .request_timeout_sec
310
294
)
@@ -330,6 +314,36 @@ async def run(self) -> None:
330
314
self ._battery_pool .update_last_request_status (response )
331
315
await user .channel .send (response )
332
316
317
+ async def _set_distributed_power (
318
+ self ,
319
+ api : MicrogridApiClient ,
320
+ distribution : DistributionResult ,
321
+ timeout_sec : float ,
322
+ ) -> Dict [int , asyncio .Task [Empty ]]:
323
+ """Send distributed power to the inverters.
324
+
325
+ Args:
326
+ api: Microgrid api client
327
+ distribution: Distribution result
328
+ timeout_sec: How long wait for the response
329
+
330
+ Returns:
331
+ Dict with finished or cancelled task for each inverter.
332
+ """
333
+ tasks = {
334
+ inverter_id : asyncio .create_task (api .set_power (inverter_id , power ))
335
+ for inverter_id , power in distribution .distribution .items ()
336
+ }
337
+
338
+ _ , pending = await asyncio .wait (
339
+ tasks .values (),
340
+ timeout = timeout_sec ,
341
+ return_when = ALL_COMPLETED ,
342
+ )
343
+
344
+ await self ._cancel_tasks (pending )
345
+ return tasks
346
+
333
347
def _check_request (self , request : Request ) -> Optional [Result ]:
334
348
"""Check whether the given request if correct.
335
349
@@ -546,13 +560,12 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
546
560
547
561
async def _create_channels (self ) -> None :
548
562
"""Create channels to get data of components in microgrid."""
563
+ api = microgrid .get ().api_client
549
564
for battery_id , inverter_id in self ._bat_inv_map .items ():
550
- bat_recv : Receiver [BatteryData ] = await self . _api .battery_data (battery_id )
565
+ bat_recv : Receiver [BatteryData ] = await api .battery_data (battery_id )
551
566
self ._battery_receivers [battery_id ] = bat_recv .into_peekable ()
552
567
553
- inv_recv : Receiver [InverterData ] = await self ._api .inverter_data (
554
- inverter_id
555
- )
568
+ inv_recv : Receiver [InverterData ] = await api .inverter_data (inverter_id )
556
569
self ._inverter_receivers [inverter_id ] = inv_recv .into_peekable ()
557
570
558
571
def _parse_result (
0 commit comments