Skip to content

Commit e565349

Browse files
committed
Fix a couple of bugs in the asyncio implementation
Log an informative message if a connection is not closed and the gc is reclaiming it when using an async dpapi, that does not support running IO at that stage. The ``AsyncAdaptedQueue`` used by default on async dpapis should instantiate a queue only when it's first used to avoid binding it to a possibly wrong event loop. Fixes: sqlalchemy#5823 Change-Id: Ibfc50e209b1937ae3d6599ae7997f028c7a92c33
1 parent 851a3a3 commit e565349

File tree

8 files changed

+146
-29
lines changed

8 files changed

+146
-29
lines changed

doc/build/changelog/unreleased_14/5823.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@
1010
the connection including transaction rollback or connection close as this
1111
will often be outside of the event loop.
1212

13-
13+
The ``AsyncAdaptedQueue`` used by default on async dpapis
14+
should instantiate a queue only when it's first used
15+
to avoid binding it to a possibly wrong event loop.

doc/build/orm/extensions/asyncio.rst

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,29 @@ differences are as follows:
255255
concepts, no third party networking libraries as ``gevent`` and ``eventlet``
256256
provides are in use.
257257

258+
Using multiple asyncio event loops
259+
----------------------------------
260+
261+
An application that makes use of multiple event loops, for example by combining asyncio
262+
with multithreading, should not share the same :class:`_asyncio.AsyncEngine`
263+
with different event loops when using the default pool implementation.
264+
265+
If an :class:`_asyncio.AsyncEngine` is be passed from one event loop to another,
266+
the method :meth:`_asyncio.AsyncEngine.dispose()` should be called before it's
267+
re-used on a new event loop. Failing to do so may lead to a ``RuntimeError``
268+
along the lines of
269+
``Task <Task pending ...> got Future attached to a different loop``
270+
271+
If the same engine must be shared between different loop, it should be configured
272+
to disable pooling using :class:`~sqlalchemy.pool.NullPool`, preventing the Engine
273+
from using any connection more than once:
274+
275+
from sqlalchemy.pool import NullPool
276+
engine = create_async_engine(
277+
"postgresql+asyncpg://user:pass@host/dbname", poolclass=NullPool
278+
)
279+
280+
258281
.. currentmodule:: sqlalchemy.ext.asyncio
259282

260283
Engine API Documentation

lib/sqlalchemy/ext/asyncio/engine.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,6 @@ class AsyncEngine(ProxyComparable, AsyncConnectable):
494494
495495
.. versionadded:: 1.4
496496
497-
498497
""" # noqa
499498

500499
# AsyncEngine is a thin proxy; no state should be added here

lib/sqlalchemy/pool/base.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727

2828
class _ConnDialect(object):
29-
3029
"""partial implementation of :class:`.Dialect`
3130
which provides DBAPI connection methods.
3231
@@ -36,6 +35,8 @@ class _ConnDialect(object):
3635
3736
"""
3837

38+
is_async = False
39+
3940
def do_rollback(self, dbapi_connection):
4041
dbapi_connection.rollback()
4142

@@ -606,11 +607,20 @@ def __connect(self, first_connect_check=False):
606607

607608

608609
def _finalize_fairy(
609-
connection, connection_record, pool, ref, echo, fairy=None
610+
connection,
611+
connection_record,
612+
pool,
613+
ref, # this is None when called directly, not by the gc
614+
echo,
615+
fairy=None,
610616
):
611617
"""Cleanup for a :class:`._ConnectionFairy` whether or not it's already
612618
been garbage collected.
613619
620+
When using an async dialect no IO can happen here (without using
621+
a dedicated thread), since this is called outside the greenlet
622+
context and with an already running loop. In this case function
623+
will only log a message and raise a warning.
614624
"""
615625

616626
if ref:
@@ -624,7 +634,8 @@ def _finalize_fairy(
624634
assert connection is None
625635
connection = connection_record.connection
626636

627-
dont_restore_gced = pool._is_asyncio
637+
# null pool is not _is_asyncio but can be used also with async dialects
638+
dont_restore_gced = pool._dialect.is_async
628639

629640
if dont_restore_gced:
630641
detach = not connection_record or ref
@@ -658,11 +669,17 @@ def _finalize_fairy(
658669

659670
pool._close_connection(connection)
660671
else:
661-
util.warn(
662-
"asyncio connection is being garbage "
663-
"collected without being properly closed: %r"
664-
% connection
665-
)
672+
message = (
673+
"The garbage collector is trying to clean up "
674+
"connection %r. This feature is unsupported on async "
675+
"dbapi, since no IO can be performed at this stage to "
676+
"reset the connection. Please close out all "
677+
"connections when they are no longer used, calling "
678+
"``close()`` or using a context manager to "
679+
"manage their lifetime."
680+
) % connection
681+
pool.logger.error(message)
682+
util.warn(message)
666683

667684
except BaseException as e:
668685
pool.logger.error(

lib/sqlalchemy/pool/impl.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import traceback
1414
import weakref
1515

16+
from .base import _ConnDialect
1617
from .base import _ConnectionFairy
1718
from .base import _ConnectionRecord
1819
from .base import Pool
@@ -221,9 +222,14 @@ def checkedout(self):
221222
return self._pool.maxsize - self._pool.qsize() + self._overflow
222223

223224

225+
class _AsyncConnDialect(_ConnDialect):
226+
is_async = True
227+
228+
224229
class AsyncAdaptedQueuePool(QueuePool):
225230
_is_asyncio = True
226231
_queue_class = sqla_queue.AsyncAdaptedQueue
232+
_dialect = _AsyncConnDialect()
227233

228234

229235
class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool):

lib/sqlalchemy/util/queue.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from .concurrency import asyncio
2727
from .concurrency import await_fallback
2828
from .concurrency import await_only
29+
from .langhelpers import memoized_property
2930

3031

3132
__all__ = ["Empty", "Full", "Queue"]
@@ -206,15 +207,32 @@ class AsyncAdaptedQueue:
206207
await_ = staticmethod(await_only)
207208

208209
def __init__(self, maxsize=0, use_lifo=False):
209-
if use_lifo:
210-
self._queue = asyncio.LifoQueue(maxsize=maxsize)
211-
else:
212-
self._queue = asyncio.Queue(maxsize=maxsize)
213210
self.use_lifo = use_lifo
214211
self.maxsize = maxsize
215-
self.empty = self._queue.empty
216-
self.full = self._queue.full
217-
self.qsize = self._queue.qsize
212+
213+
def empty(self):
214+
return self._queue.empty()
215+
216+
def full(self):
217+
return self._queue.full()
218+
219+
def qsize(self):
220+
return self._queue.qsize()
221+
222+
@memoized_property
223+
def _queue(self):
224+
# Delay creation of the queue until it is first used, to avoid
225+
# binding it to a possibly wrong event loop.
226+
# By delaying the creation of the pool we accommodate the common
227+
# usage pattern of instanciating the engine at module level, where a
228+
# different event loop is in present compared to when the application
229+
# is actually run.
230+
231+
if self.use_lifo:
232+
queue = asyncio.LifoQueue(maxsize=self.maxsize)
233+
else:
234+
queue = asyncio.Queue(maxsize=self.maxsize)
235+
return queue
218236

219237
def put_nowait(self, item):
220238
try:

test/base/test_concurrency_py3k.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
1+
import threading
2+
13
from sqlalchemy import exc
24
from sqlalchemy import testing
35
from sqlalchemy.testing import async_test
46
from sqlalchemy.testing import eq_
7+
from sqlalchemy.testing import expect_raises
58
from sqlalchemy.testing import expect_raises_message
69
from sqlalchemy.testing import fixtures
10+
from sqlalchemy.testing import is_true
11+
from sqlalchemy.util import asyncio
712
from sqlalchemy.util import await_fallback
813
from sqlalchemy.util import await_only
914
from sqlalchemy.util import greenlet_spawn
15+
from sqlalchemy.util import queue
1016

1117
try:
1218
from greenlet import greenlet
@@ -152,3 +158,47 @@ def run():
152158
"The current operation required an async execution but none was",
153159
):
154160
await greenlet_spawn(run, _require_await=True)
161+
162+
163+
class TestAsyncAdaptedQueue(fixtures.TestBase):
164+
def test_lazy_init(self):
165+
run = [False]
166+
167+
def thread_go(q):
168+
def go():
169+
q.get(timeout=0.1)
170+
171+
with expect_raises(queue.Empty):
172+
asyncio.run(greenlet_spawn(go))
173+
run[0] = True
174+
175+
t = threading.Thread(
176+
target=thread_go, args=[queue.AsyncAdaptedQueue()]
177+
)
178+
t.start()
179+
t.join()
180+
181+
is_true(run[0])
182+
183+
def test_error_other_loop(self):
184+
run = [False]
185+
186+
def thread_go(q):
187+
def go():
188+
eq_(q.get(block=False), 1)
189+
q.get(timeout=0.1)
190+
191+
with expect_raises_message(
192+
RuntimeError, "Task .* attached to a different loop"
193+
):
194+
asyncio.run(greenlet_spawn(go))
195+
196+
run[0] = True
197+
198+
q = queue.AsyncAdaptedQueue()
199+
q.put_nowait(1)
200+
t = threading.Thread(target=thread_go, args=[q])
201+
t.start()
202+
t.join()
203+
204+
is_true(run[0])

test/engine/test_pool.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from sqlalchemy import select
1111
from sqlalchemy import testing
1212
from sqlalchemy.engine import default
13+
from sqlalchemy.pool.impl import _AsyncConnDialect
1314
from sqlalchemy.testing import assert_raises
1415
from sqlalchemy.testing import assert_raises_context_ok
1516
from sqlalchemy.testing import assert_raises_message
@@ -89,10 +90,12 @@ def _queuepool_fixture(self, **kw):
8990

9091
def _queuepool_dbapi_fixture(self, **kw):
9192
dbapi = MockDBAPI()
92-
return (
93-
dbapi,
94-
pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw),
95-
)
93+
_is_asyncio = kw.pop("_is_asyncio", False)
94+
p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw)
95+
if _is_asyncio:
96+
p._is_asyncio = True
97+
p._dialect = _AsyncConnDialect()
98+
return dbapi, p
9699

97100

98101
class PoolTest(PoolTestBase):
@@ -283,6 +286,8 @@ def _dialect(self):
283286
canary = []
284287

285288
class PoolDialect(object):
289+
is_async = False
290+
286291
def do_rollback(self, dbapi_connection):
287292
canary.append("R")
288293
dbapi_connection.rollback()
@@ -361,8 +366,8 @@ def checkout(*arg, **kw):
361366

362367
return p, canary
363368

364-
def _checkin_event_fixture(self):
365-
p = self._queuepool_fixture()
369+
def _checkin_event_fixture(self, _is_asyncio=False):
370+
p = self._queuepool_fixture(_is_asyncio=_is_asyncio)
366371
canary = []
367372

368373
@event.listens_for(p, "checkin")
@@ -639,10 +644,7 @@ def test_invalidate_event_exception(self):
639644

640645
@testing.combinations((True, testing.requires.python3), (False,))
641646
def test_checkin_event_gc(self, detach_gced):
642-
p, canary = self._checkin_event_fixture()
643-
644-
if detach_gced:
645-
p._is_asyncio = True
647+
p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
646648

647649
c1 = p.connect()
648650

@@ -1517,11 +1519,11 @@ def handle_checkout_event(dbapi_con, con_record, con_proxy):
15171519
@testing.combinations((True, testing.requires.python3), (False,))
15181520
def test_userspace_disconnectionerror_weakref_finalizer(self, detach_gced):
15191521
dbapi, pool = self._queuepool_dbapi_fixture(
1520-
pool_size=1, max_overflow=2
1522+
pool_size=1, max_overflow=2, _is_asyncio=detach_gced
15211523
)
15221524

15231525
if detach_gced:
1524-
pool._is_asyncio = True
1526+
pool._dialect.is_async = True
15251527

15261528
@event.listens_for(pool, "checkout")
15271529
def handle_checkout_event(dbapi_con, con_record, con_proxy):

0 commit comments

Comments
 (0)