Skip to content

Commit ffbd11c

Browse files
authored
Merge pull request #144 from axiomiety/executor-double-acquire-fix
_base.py: fix double acquire by TrioExecuto, issue #143
2 parents 410b9be + 9d6103f commit ffbd11c

File tree

3 files changed

+20
-8
lines changed

3 files changed

+20
-8
lines changed

newsfragments/143.bugfix.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
``TrioExecutor.submit``, called from :meth:`asyncio.loop.run_in_executor`, no longer acquires a token from its `~trio.CapacityLimiter` before calling `~trio.to_thread.run_sync` (which already does its own ``acquire()``).
2+
3+
The previous behaviour led to a double-acquire, leading each worker thread to require two tokens to run instead of one. Tasks could get stuck having acquired the first token but unable to acquire the second as part of `~trio.to_thread.run_sync`, leading to a deadlock.

tests/test_trio_asyncio.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,18 @@ async def unshield_later():
176176
]
177177
assert trio.current_time() == 1.5 + (shield * 0.5)
178178
assert scope.cancelled_caught == (not shield)
179+
180+
181+
@pytest.mark.trio
182+
async def test_executor_limiter_deadlock():
183+
def noop():
184+
pass
185+
186+
# capacity of 1 to catch a double-acquire
187+
limiter = trio.CapacityLimiter(1)
188+
executor = trio_asyncio.TrioExecutor(limiter=limiter)
189+
async with trio_asyncio.open_loop() as loop:
190+
with trio.move_on_after(1) as scope:
191+
await trio_asyncio.aio_as_trio(loop.run_in_executor)(executor, noop)
192+
193+
assert not scope.cancelled_caught

trio_asyncio/_base.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,8 @@ def __init__(self, limiter=None, thread_name_prefix=None, max_workers=None):
8484
async def submit(self, func, *args):
8585
if not self._running: # pragma: no cover
8686
raise RuntimeError("Executor is down")
87-
lim = self._limiter
88-
if lim is not None:
89-
await lim.acquire()
90-
try:
91-
return await trio.to_thread.run_sync(func, *args, limiter=self._limiter)
92-
finally:
93-
if lim is not None:
94-
lim.release()
87+
# there is no need to call self._limiter.acquire() here, run_sync does it
88+
return await trio.to_thread.run_sync(func, *args, limiter=self._limiter)
9589

9690
def shutdown(self, wait=None):
9791
self._running = False

0 commit comments

Comments
 (0)