@@ -133,19 +133,25 @@ async def main():
133
133
```
134
134
"""
135
135
136
- class RetryFailedDispatches :
137
- """Manages the retry of failed dispatches."""
136
+ class FailedDispatchesRetrier ( BackgroundService ) :
137
+ """Manages the retring of failed dispatches."""
138
138
139
139
def __init__ (self , retry_interval : timedelta ) -> None :
140
140
"""Initialize the retry manager.
141
141
142
142
Args:
143
143
retry_interval: The interval between retries.
144
144
"""
145
+ super ().__init__ ()
145
146
self ._retry_interval = retry_interval
146
147
self ._channel = Broadcast [Dispatch ](name = "retry_channel" )
147
148
self ._sender = self ._channel .new_sender ()
148
- self ._tasks : set [asyncio .Task [None ]] = set ()
149
+
150
+ def start (self ) -> None :
151
+ """Start the background service.
152
+
153
+ This is a no-op.
154
+ """
149
155
150
156
def new_receiver (self ) -> Receiver [Dispatch ]:
151
157
"""Create a new receiver for dispatches to retry.
@@ -187,7 +193,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
187
193
],
188
194
running_status_receiver : Receiver [Dispatch ],
189
195
dispatch_identity : Callable [[Dispatch ], int ] | None = None ,
190
- retry_interval : timedelta | None = timedelta (seconds = 60 ),
196
+ retry_interval : timedelta = timedelta (seconds = 60 ),
191
197
) -> None :
192
198
"""Initialize the dispatch handler.
193
199
@@ -197,7 +203,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
197
203
running_status_receiver: The receiver for dispatch running status changes.
198
204
dispatch_identity: A function to identify to which actor a dispatch refers.
199
205
By default, it uses the dispatch ID.
200
- retry_interval: The interval between retries. If `None`, retries are disabled.
206
+ retry_interval: The interval between retries.
201
207
"""
202
208
super ().__init__ ()
203
209
self ._dispatch_identity : Callable [[Dispatch ], int ] = (
@@ -211,11 +217,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
211
217
name = "dispatch_updates_channel" , resend_latest = True
212
218
)
213
219
self ._updates_sender = self ._updates_channel .new_sender ()
214
- self ._retrier = (
215
- ActorDispatcher .RetryFailedDispatches (retry_interval )
216
- if retry_interval
217
- else None
218
- )
220
+ self ._retrier = ActorDispatcher .FailedDispatchesRetrier (retry_interval )
219
221
220
222
def start (self ) -> None :
221
223
"""Start the background service."""
@@ -258,12 +260,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
258
260
dispatch .type ,
259
261
exc_info = e ,
260
262
)
261
- if self ._retrier :
262
- self ._retrier .retry (dispatch )
263
- else :
264
- _logger .error (
265
- "No retry mechanism enabled, dispatch %r failed" , dispatch
266
- )
263
+ self ._retrier .retry (dispatch )
267
264
else :
268
265
# No exception occurred, so we can add the actor to the list
269
266
self ._actors [identity ] = actor
@@ -286,10 +283,7 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
286
283
287
284
async def _run (self ) -> None :
288
285
"""Run the background service."""
289
- if not self ._retrier :
290
- async for dispatch in self ._dispatch_rx :
291
- await self ._handle_dispatch (dispatch )
292
- else :
286
+ async with self ._retrier :
293
287
retry_recv = self ._retrier .new_receiver ()
294
288
295
289
async for selected in select (retry_recv , self ._dispatch_rx ):
0 commit comments