Skip to content

Commit 7ad1b60

Browse files
Add flush_after_seconds option to streaming_bulk() (#3064) (#3117)
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb) Co-authored-by: Miguel Grinberg <[email protected]>
1 parent 14adafa commit 7ad1b60

File tree

7 files changed

+343
-50
lines changed

7 files changed

+343
-50
lines changed

elasticsearch/_async/helpers.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,16 @@
3333
Union,
3434
)
3535

36+
from ..compat import safe_task
3637
from ..exceptions import ApiError, NotFoundError, TransportError
3738
from ..helpers.actions import (
3839
_TYPE_BULK_ACTION,
3940
_TYPE_BULK_ACTION_BODY,
4041
_TYPE_BULK_ACTION_HEADER,
4142
_TYPE_BULK_ACTION_HEADER_AND_BODY,
43+
_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY,
44+
_TYPE_BULK_ACTION_WITH_META,
45+
BulkMeta,
4246
_ActionChunker,
4347
_process_bulk_chunk_error,
4448
_process_bulk_chunk_success,
@@ -54,9 +58,10 @@
5458

5559

5660
async def _chunk_actions(
57-
actions: AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY],
61+
actions: AsyncIterable[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY],
5862
chunk_size: int,
5963
max_chunk_bytes: int,
64+
flush_after_seconds: Optional[float],
6065
serializer: Serializer,
6166
) -> AsyncIterable[
6267
Tuple[
@@ -76,10 +81,42 @@ async def _chunk_actions(
7681
chunker = _ActionChunker(
7782
chunk_size=chunk_size, max_chunk_bytes=max_chunk_bytes, serializer=serializer
7883
)
79-
async for action, data in actions:
80-
ret = chunker.feed(action, data)
81-
if ret:
82-
yield ret
84+
85+
if not flush_after_seconds:
86+
async for action, data in actions:
87+
ret = chunker.feed(action, data)
88+
if ret:
89+
yield ret
90+
else:
91+
item_queue: asyncio.Queue[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] = (
92+
asyncio.Queue()
93+
)
94+
95+
async def get_items() -> None:
96+
try:
97+
async for item in actions:
98+
await item_queue.put(item)
99+
finally:
100+
await item_queue.put((BulkMeta.done, None))
101+
102+
async with safe_task(get_items()):
103+
timeout: Optional[float] = flush_after_seconds
104+
while True:
105+
try:
106+
action, data = await asyncio.wait_for(
107+
item_queue.get(), timeout=timeout
108+
)
109+
timeout = flush_after_seconds
110+
except asyncio.TimeoutError:
111+
action, data = BulkMeta.flush, None
112+
timeout = None
113+
114+
if action is BulkMeta.done:
115+
break
116+
ret = chunker.feed(action, data)
117+
if ret:
118+
yield ret
119+
83120
ret = chunker.flush()
84121
if ret:
85122
yield ret
@@ -159,9 +196,13 @@ async def azip(
159196

160197
async def async_streaming_bulk(
161198
client: AsyncElasticsearch,
162-
actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
199+
actions: Union[
200+
Iterable[_TYPE_BULK_ACTION_WITH_META],
201+
AsyncIterable[_TYPE_BULK_ACTION_WITH_META],
202+
],
163203
chunk_size: int = 500,
164204
max_chunk_bytes: int = 100 * 1024 * 1024,
205+
flush_after_seconds: Optional[float] = None,
165206
raise_on_error: bool = True,
166207
expand_action_callback: Callable[
167208
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
@@ -194,6 +235,9 @@ async def async_streaming_bulk(
194235
:arg actions: iterable or async iterable containing the actions to be executed
195236
:arg chunk_size: number of docs in one chunk sent to es (default: 500)
196237
:arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
238+
:arg flush_after_seconds: time in seconds after which a chunk is written even
239+
if hasn't reached `chunk_size` or `max_chunk_bytes`. Set to 0 to not use a
240+
timeout-based flush. (default: 0)
197241
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
198242
from the execution of the last chunk when some occur. By default we raise.
199243
:arg raise_on_exception: if ``False`` then don't propagate exceptions from
@@ -220,9 +264,14 @@ async def async_streaming_bulk(
220264
if isinstance(retry_on_status, int):
221265
retry_on_status = (retry_on_status,)
222266

223-
async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
267+
async def map_actions() -> (
268+
AsyncIterable[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY]
269+
):
224270
async for item in aiter(actions):
225-
yield expand_action_callback(item)
271+
if isinstance(item, BulkMeta):
272+
yield item, None
273+
else:
274+
yield expand_action_callback(item)
226275

227276
serializer = client.transport.serializers.get_serializer("application/json")
228277

@@ -234,7 +283,7 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
234283
]
235284
bulk_actions: List[bytes]
236285
async for bulk_data, bulk_actions in _chunk_actions(
237-
map_actions(), chunk_size, max_chunk_bytes, serializer
286+
map_actions(), chunk_size, max_chunk_bytes, flush_after_seconds, serializer
238287
):
239288
for attempt in range(max_retries + 1):
240289
to_retry: List[bytes] = []

elasticsearch/compat.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import asyncio
1819
import inspect
1920
import os
2021
import sys
22+
from contextlib import asynccontextmanager, contextmanager
2123
from pathlib import Path
22-
from typing import Tuple, Type, Union
24+
from threading import Thread
25+
from typing import Any, AsyncIterator, Callable, Coroutine, Iterator, Tuple, Type, Union
2326

2427
string_types: Tuple[Type[str], Type[bytes]] = (str, bytes)
2528

@@ -76,9 +79,48 @@ def warn_stacklevel() -> int:
7679
return 0
7780

7881

82+
@contextmanager
83+
def safe_thread(
84+
target: Callable[..., Any], *args: Any, **kwargs: Any
85+
) -> Iterator[Thread]:
86+
"""Run a thread within a context manager block.
87+
88+
The thread is automatically joined when the block ends. If the thread raised
89+
an exception, it is raised in the caller's context.
90+
"""
91+
captured_exception = None
92+
93+
def run() -> None:
94+
try:
95+
target(*args, **kwargs)
96+
except BaseException as exc:
97+
nonlocal captured_exception
98+
captured_exception = exc
99+
100+
thread = Thread(target=run)
101+
thread.start()
102+
yield thread
103+
thread.join()
104+
if captured_exception:
105+
raise captured_exception
106+
107+
108+
@asynccontextmanager
109+
async def safe_task(coro: Coroutine[Any, Any, Any]) -> AsyncIterator[asyncio.Task[Any]]:
110+
"""Run a background task within a context manager block.
111+
112+
The task is awaited when the block ends.
113+
"""
114+
task = asyncio.create_task(coro)
115+
yield task
116+
await task
117+
118+
79119
__all__ = [
80120
"string_types",
81121
"to_str",
82122
"to_bytes",
83123
"warn_stacklevel",
124+
"safe_thread",
125+
"safe_task",
84126
]

elasticsearch/helpers/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,21 @@
1919
from .._utils import fixup_module_metadata
2020
from .actions import _chunk_actions # noqa: F401
2121
from .actions import _process_bulk_chunk # noqa: F401
22-
from .actions import bulk, expand_action, parallel_bulk, reindex, scan, streaming_bulk
22+
from .actions import (
23+
BULK_FLUSH,
24+
bulk,
25+
expand_action,
26+
parallel_bulk,
27+
reindex,
28+
scan,
29+
streaming_bulk,
30+
)
2331
from .errors import BulkIndexError, ScanError
2432

2533
__all__ = [
2634
"BulkIndexError",
2735
"ScanError",
36+
"BULK_FLUSH",
2837
"expand_action",
2938
"streaming_bulk",
3039
"bulk",

0 commit comments

Comments
 (0)