18
18
19
19
logger = logging .getLogger (__name__ )
20
20
21
+ REQUEST_CONFIG = configure_opensearch_bulk_settings ()
21
22
22
23
# Cluster functions
23
24
@@ -27,11 +28,15 @@ def configure_opensearch_client(url: str) -> OpenSearch:
27
28
28
29
Includes the appropriate AWS credentials configuration if the URL is not localhost.
29
30
"""
31
+ logger .info ("OpenSearch request configurations: %s" , REQUEST_CONFIG )
30
32
if url == "localhost" :
31
33
return OpenSearch (
32
34
hosts = [{"host" : url , "port" : "9200" }],
33
35
http_auth = ("admin" , "admin" ),
34
36
connection_class = RequestsHttpConnection ,
37
+ max_retries = REQUEST_CONFIG ["OPENSEARCH_BULK_MAX_RETRIES" ],
38
+ retry_on_timeout = True ,
39
+ timeout = REQUEST_CONFIG ["OPENSEARCH_REQUEST_TIMEOUT" ],
35
40
)
36
41
37
42
credentials = boto3 .Session ().get_credentials ()
@@ -43,6 +48,9 @@ def configure_opensearch_client(url: str) -> OpenSearch:
43
48
use_ssl = True ,
44
49
verify_certs = True ,
45
50
connection_class = RequestsHttpConnection ,
51
+ max_retries = REQUEST_CONFIG ["OPENSEARCH_BULK_MAX_RETRIES" ],
52
+ retry_on_timeout = True ,
53
+ timeout = REQUEST_CONFIG ["OPENSEARCH_REQUEST_TIMEOUT" ],
46
54
)
47
55
48
56
@@ -315,16 +323,13 @@ def bulk_index(
315
323
Returns total sums of: records created, records updated, errors, and total records
316
324
processed.
317
325
"""
318
- bulk_config = configure_opensearch_bulk_settings ()
319
326
result = {"created" : 0 , "updated" : 0 , "errors" : 0 , "total" : 0 }
320
327
actions = helpers .generate_bulk_actions (index , records , "index" )
321
328
responses = streaming_bulk (
322
329
client ,
323
330
actions ,
324
- max_chunk_bytes = bulk_config ["OPENSEARCH_BULK_MAX_CHUNK_BYTES" ],
325
- max_retries = bulk_config ["OPENSEARCH_BULK_MAX_RETRIES" ],
331
+ max_chunk_bytes = REQUEST_CONFIG ["OPENSEARCH_BULK_MAX_CHUNK_BYTES" ],
326
332
raise_on_error = False ,
327
- request_timeout = bulk_config ["OPENSEARCH_REQUEST_TIMEOUT" ],
328
333
)
329
334
for response in responses :
330
335
if response [0 ] is False :
@@ -350,7 +355,6 @@ def bulk_index(
350
355
logger .info ("All records ingested, refreshing index." )
351
356
response = client .indices .refresh (
352
357
index = index ,
353
- request_timeout = bulk_config ["OPENSEARCH_REQUEST_TIMEOUT" ],
354
358
)
355
359
logger .debug (response )
356
360
return result
0 commit comments