Skip to content

Commit 998a7b2

Browse files
Use anyio for bulk flush timeout
1 parent 2d85baf commit 998a7b2

File tree

3 files changed

+15
-25
lines changed

3 files changed

+15
-25
lines changed

elasticsearch/_async/helpers.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
)
3535

3636
import sniffio
37+
from anyio import create_memory_object_stream, create_task_group, move_on_after
3738

38-
from ..compat import safe_task
3939
from ..exceptions import ApiError, NotFoundError, TransportError
4040
from ..helpers.actions import (
4141
_TYPE_BULK_ACTION,
@@ -99,26 +99,28 @@ async def _chunk_actions(
9999
if ret:
100100
yield ret
101101
else:
102-
item_queue: asyncio.Queue[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] = (
103-
asyncio.Queue()
104-
)
102+
sender, receiver = create_memory_object_stream[
103+
_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY
104+
]()
105105

106106
async def get_items() -> None:
107107
try:
108108
async for item in actions:
109-
await item_queue.put(item)
109+
await sender.send(item)
110110
finally:
111-
await item_queue.put((BulkMeta.done, None))
111+
await sender.send((BulkMeta.done, None))
112+
113+
async with create_task_group() as tg:
114+
tg.start_soon(get_items)
112115

113-
async with safe_task(get_items()):
114116
timeout: Optional[float] = flush_after_seconds
115117
while True:
116-
try:
117-
action, data = await asyncio.wait_for(
118-
item_queue.get(), timeout=timeout
119-
)
118+
action: _TYPE_BULK_ACTION_WITH_META = {}
119+
data: _TYPE_BULK_ACTION_BODY = None
120+
with move_on_after(timeout) as scope:
121+
action, data = await receiver.receive()
120122
timeout = flush_after_seconds
121-
except asyncio.TimeoutError:
123+
if scope.cancelled_caught:
122124
action, data = BulkMeta.flush, None
123125
timeout = None
124126

elasticsearch/compat.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,22 +105,10 @@ def run() -> None:
105105
raise captured_exception
106106

107107

108-
@asynccontextmanager
109-
async def safe_task(coro: Coroutine[Any, Any, Any]) -> AsyncIterator[asyncio.Task[Any]]:
110-
"""Run a background task within a context manager block.
111-
112-
The task is awaited when the block ends.
113-
"""
114-
task = asyncio.create_task(coro)
115-
yield task
116-
await task
117-
118-
119108
__all__ = [
120109
"string_types",
121110
"to_str",
122111
"to_bytes",
123112
"warn_stacklevel",
124113
"safe_thread",
125-
"safe_task",
126114
]

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ dependencies = [
4646
"python-dateutil",
4747
"typing-extensions",
4848
"sniffio",
49+
"anyio",
4950
]
5051

5152
# TODO revert before merging/releasing
@@ -86,7 +87,6 @@ dev = [
8687
"jinja2",
8788
"tqdm",
8889
"trio",
89-
"anyio",
9090
"mypy",
9191
"pyright",
9292
"types-python-dateutil",

0 commit comments

Comments
 (0)