Skip to content

Commit

Permalink
make threadcount configurable for opensearch output connector (#496)
Browse files Browse the repository at this point in the history
* make threadcount configurable

* Update CHANGELOG.md

---------

Co-authored-by: dtrai2 <[email protected]>
  • Loading branch information
ekneg54 and dtrai2 authored Dec 8, 2023
1 parent 73bd324 commit 5fc46d7
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

### Features

* make thread_count configurable for parallel_bulk in opensearch output connector

### Improvements

### Bugfix
Expand Down
25 changes: 25 additions & 0 deletions logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,28 @@
from functools import cached_property

import opensearchpy as search
from attrs import define, field, validators
from opensearchpy import helpers

from logprep.abc.output import Output
from logprep.connector.elasticsearch.output import ElasticsearchOutput
from logprep.metrics.metrics import Metric

logging.getLogger("opensearch").setLevel(logging.WARNING)


class OpensearchOutput(ElasticsearchOutput):
"""An OpenSearch output connector."""

@define(kw_only=True, slots=False)
class Config(ElasticsearchOutput.Config):
"""Config for OpensearchOutput."""

thread_count: int = field(
default=4, validator=[validators.instance_of(int), validators.gt(1)]
)
"""Number of threads to use for bulk requests."""

@cached_property
def _search_context(self):
return search.OpenSearch(
Expand All @@ -67,6 +78,20 @@ def describe(self) -> str:
base_description = Output.describe(self)
return f"{base_description} - Opensearch Output: {self._config.hosts}"

@Metric.measure_time()
def _write_backlog(self):
if not self._message_backlog:
return

self._bulk(
self._search_context,
self._message_backlog,
max_retries=self._config.max_retries,
chunk_size=len(self._message_backlog) / self._config.thread_count,
thread_count=self._config.thread_count,
)
self._message_backlog.clear()

def _bulk(self, *args, **kwargs):
try:
helpers.parallel_bulk(*args, **kwargs)
Expand Down

0 comments on commit 5fc46d7

Please sign in to comment.