Skip to content

Commit 5488606

Browse files
committed
fix(worker): Modify kill behaviour to mirror threaded worker
Changed kill to also use the _TERMINATOR sentinel, so the queue is still drained to this point on kill instead of cancelled immediately. This should also fix potential race conditions with flush_async. GH-4581
1 parent 0367ef3 commit 5488606

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

sentry_sdk/worker.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def _target(self) -> None:
176176

177177
class AsyncWorker(Worker):
178178
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
179-
self._queue: asyncio.Queue[Callable[[], Any]] = asyncio.Queue(queue_size)
179+
self._queue: asyncio.Queue[Any] = asyncio.Queue(queue_size)
180180
self._task: Optional[asyncio.Task[None]] = None
181181
# Event loop needs to remain in the same process
182182
self._task_for_pid: Optional[int] = None
@@ -194,16 +194,19 @@ def is_alive(self) -> bool:
194194

195195
def kill(self) -> None:
196196
if self._task:
197-
self._task.cancel()
198-
self._task = None
199-
self._task_for_pid = None
197+
try:
198+
self._queue.put_nowait(_TERMINATOR)
199+
except asyncio.QueueFull:
200+
logger.debug("async worker queue full, kill failed")
200201
# Also cancel any active callback tasks
201202
# Avoid modifying the set while cancelling tasks
202203
tasks_to_cancel = set(self._active_tasks)
203204
for task in tasks_to_cancel:
204205
task.cancel()
205206
self._active_tasks.clear()
206207
self._loop = None
208+
self._task = None
209+
self._task_for_pid = None
207210

208211
def start(self) -> None:
209212
if not self.is_alive:
@@ -264,6 +267,9 @@ def submit(self, callback: Callable[[], Any]) -> bool:
264267
async def _target(self) -> None:
265268
while True:
266269
callback = await self._queue.get()
270+
if callback is _TERMINATOR:
271+
self._queue.task_done()
272+
break
267273
# Firing tasks instead of awaiting them allows for concurrent requests
268274
task = asyncio.create_task(self._process_callback(callback))
269275
# Create a strong reference to the task so it can be cancelled on kill

0 commit comments

Comments
 (0)