From 5fc46d7d24866db7adb136ee4f540ff59e4a8696 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Zimmermann?= <101292599+ekneg54@users.noreply.github.com> Date: Fri, 8 Dec 2023 12:59:59 +0100 Subject: [PATCH] make threadcount configurable for opensearch output connector (#496) * make threadcount configurable * Update CHANGELOG.md --------- Co-authored-by: dtrai2 <95028228+dtrai2@users.noreply.github.com> --- CHANGELOG.md | 2 ++ logprep/connector/opensearch/output.py | 25 +++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index df9b7fc54..3b713f075 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ ### Features +* make thread_count configurable for parallel_bulk in opensearch output connector + ### Improvements ### Bugfix diff --git a/logprep/connector/opensearch/output.py b/logprep/connector/opensearch/output.py index 8b0142876..e0b2f7509 100644 --- a/logprep/connector/opensearch/output.py +++ b/logprep/connector/opensearch/output.py @@ -34,10 +34,12 @@ from functools import cached_property import opensearchpy as search +from attrs import define, field, validators from opensearchpy import helpers from logprep.abc.output import Output from logprep.connector.elasticsearch.output import ElasticsearchOutput +from logprep.metrics.metrics import Metric logging.getLogger("opensearch").setLevel(logging.WARNING) @@ -45,6 +47,15 @@ class OpensearchOutput(ElasticsearchOutput): """An OpenSearch output connector.""" + @define(kw_only=True, slots=False) + class Config(ElasticsearchOutput.Config): + """Config for OpensearchOutput.""" + + thread_count: int = field( + default=4, validator=[validators.instance_of(int), validators.gt(1)] + ) + """Number of threads to use for bulk requests.""" + @cached_property def _search_context(self): return search.OpenSearch( @@ -67,6 +78,20 @@ def describe(self) -> str: base_description = Output.describe(self) return f"{base_description} - Opensearch Output: {self._config.hosts}" + @Metric.measure_time() + def _write_backlog(self): + if not self._message_backlog: + return + + self._bulk( + self._search_context, + self._message_backlog, + max_retries=self._config.max_retries, + chunk_size=len(self._message_backlog) / self._config.thread_count, + thread_count=self._config.thread_count, + ) + self._message_backlog.clear() + def _bulk(self, *args, **kwargs): try: helpers.parallel_bulk(*args, **kwargs)