Skip to content

Commit

Permalink
fix opensearch output connector (#498)
Browse files Browse the repository at this point in the history
* use generator parallel_bulk
* update changelog and prepare release
* add MSGSPECSerializer
* handle serialisation error as in jsonserializer
  • Loading branch information
ekneg54 authored Dec 19, 2023
1 parent 8e8c741 commit 5046759
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-latest-dev-release-to-pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Initialize Python
uses: actions/setup-python@v1
with:
python-version: 3.10
python-version: "3.10"

- name: Install dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
120
],
"editor.codeActionsOnSave": {
"source.organizeImports": true
"source.organizeImports": "explicit"
},
"autoDocstring.docstringFormat": "numpy",
"files.exclude": {
Expand Down
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,24 @@

### Features

* make thread_count configurable for parallel_bulk in opensearch output connector
### Improvements

### Bugfix


## v9.0.3
### Breaking

### Features

* make `thread_count`, `queue_size` and `chunk_size` configurable for `parallel_bulk` in opensearch output connector

### Improvements

### Bugfix

* fix `parallel_bulk` implementation not delivering messages to opensearch

## v9.0.2

### Bugfix
Expand Down
4 changes: 2 additions & 2 deletions logprep/connector/elasticsearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,9 @@ def _write_backlog(self):
)
self._message_backlog.clear()

def _bulk(self, *args, **kwargs):
def _bulk(self, client, actions, *args, **kwargs):
try:
helpers.bulk(*args, **kwargs)
helpers.bulk(client, actions, *args, **kwargs)
except search.SerializationError as error:
self._handle_serialization_error(error)
except search.ConnectionError as error:
Expand Down
63 changes: 46 additions & 17 deletions logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,35 @@
import opensearchpy as search
from attrs import define, field, validators
from opensearchpy import helpers
from opensearchpy.serializer import JSONSerializer

from logprep.abc.output import Output
from logprep.connector.elasticsearch.output import ElasticsearchOutput
from logprep.metrics.metrics import Metric

logging.getLogger("opensearch").setLevel(logging.WARNING)


class MSGPECSerializer(JSONSerializer):
"""A MSGPEC serializer"""

def __init__(self, output_connector: Output, *args, **kwargs):
super().__init__(*args, **kwargs)
self._encoder = output_connector._encoder
self._decoder = output_connector._decoder

def dumps(self, data):
# don't serialize strings
if isinstance(data, str):
return data
try:
return self._encoder.encode(data).decode("utf-8")
except (ValueError, TypeError) as e:
raise search.exceptions.SerializationError(data, e)

def loads(self, data):
return self._decoder.decode(data)


class OpensearchOutput(ElasticsearchOutput):
"""An OpenSearch output connector."""

Expand All @@ -56,6 +77,16 @@ class Config(ElasticsearchOutput.Config):
)
"""Number of threads to use for bulk requests."""

queue_size: int = field(
default=4, validator=[validators.instance_of(int), validators.gt(1)]
)
"""Number of queue size to use for bulk requests."""

chunk_size: int = field(
default=500, validator=[validators.instance_of(int), validators.gt(1)]
)
"""Chunk size to use for bulk requests."""

@cached_property
def _search_context(self):
return search.OpenSearch(
Expand All @@ -64,6 +95,7 @@ def _search_context(self):
http_auth=self.http_auth,
ssl_context=self.ssl_context,
timeout=self._config.timeout,
serializer=MSGPECSerializer(self),
)

def describe(self) -> str:
Expand All @@ -78,23 +110,20 @@ def describe(self) -> str:
base_description = Output.describe(self)
return f"{base_description} - Opensearch Output: {self._config.hosts}"

@Metric.measure_time()
def _write_backlog(self):
if not self._message_backlog:
return

self._bulk(
self._search_context,
self._message_backlog,
max_retries=self._config.max_retries,
chunk_size=len(self._message_backlog) / self._config.thread_count,
thread_count=self._config.thread_count,
)
self._message_backlog.clear()

def _bulk(self, *args, **kwargs):
def _bulk(self, client, actions, *args, **kwargs):
try:
helpers.parallel_bulk(*args, **kwargs)
for success, item in helpers.parallel_bulk(
client,
actions=actions,
chunk_size=self._config.chunk_size,
queue_size=self._config.queue_size,
raise_on_error=True,
raise_on_exception=True,
):
if not success:
result = item[list(item.keys())[0]]
if "error" in result:
raise result.get("error")
except search.SerializationError as error:
self._handle_serialization_error(error)
except search.ConnectionError as error:
Expand Down
6 changes: 5 additions & 1 deletion quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ services:
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- /var/run/docker.sock:/var/run/docker.sock
command: sh -c "((sleep 15 && echo 'kafka up' && kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 3 --topic consumer)&) && /opt/bitnami/scripts/kafka/run.sh"
command: sh -c "((sleep 15 && echo 'kafka up' && kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic consumer)&) && /opt/bitnami/scripts/kafka/run.sh"
healthcheck:
test:
[
Expand Down Expand Up @@ -92,6 +92,7 @@ services:
depends_on:
- kafka
- opensearch

volumes:
- ../quickstart/:/home/logprep/quickstart/
entrypoint:
Expand Down Expand Up @@ -166,3 +167,6 @@ services:
- ../quickstart/exampledata/config/postgresql:/docker-entrypoint-initdb.d
volumes:
data:

networks:
opensearch-net:
21 changes: 18 additions & 3 deletions quickstart/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,19 @@ input:
kafka_config:
bootstrap.servers: 127.0.0.1:9092
group.id: cgroup3
enable.auto.commit: "false"
enable.auto.commit: "true"
auto.commit.interval.ms: "10000"
enable.auto.offset.store: "false"
queued.min.messages: "100000"
queued.max.messages.kbytes: "65536"
statistics.interval.ms: "60000"
preprocessing:
version_info_target_field: Logprep_version_info
log_arrival_time_target_field: event.ingested
hmac:
target: <RAW_MSG>
key: "thisisasecureandrandomkey"
output_field: Full_event

output:
opensearch:
Expand All @@ -100,10 +111,13 @@ output:
- 127.0.0.1:9200
default_index: processed
error_index: errors
message_backlog_size: 1000
message_backlog_size: 16000
timeout: 10000
flush_timeout: 60
max_retries: 3
thread_count: 16
queue_size: 32
chunk_size: 500
user: admin
secret: admin
kafka:
Expand All @@ -113,4 +127,5 @@ output:
error_topic: errors
flush_timeout: 300
kafka_config:
bootstrap.servers: 127.0.0.1:9092
bootstrap.servers: 127.0.0.1:9092
statistics.interval.ms: "60000"
2 changes: 1 addition & 1 deletion quickstart/exampledata/config/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ scrape_configs:
- targets: ["localhost:9090"]
- job_name: "logprep"
static_configs:
- targets: ["localhost:8000"]
- targets: ["localhost:8000", "localhost:8001"]
- job_name: "kafka"
metrics_path: "/metrics"
static_configs:
Expand Down
40 changes: 35 additions & 5 deletions tests/unit/connector/test_opensearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
# pylint: disable=attribute-defined-outside-init
# pylint: disable=too-many-arguments
import json
import os
import re
import time
import uuid
from datetime import datetime
from math import isclose
from unittest import mock
Expand All @@ -17,6 +20,8 @@

from logprep.abc.component import Component
from logprep.abc.output import FatalOutputError
from logprep.connector.opensearch.output import OpensearchOutput
from logprep.factory import Factory
from logprep.util.time import TimeParser
from tests.unit.connector.base import BaseOutputTestCase

Expand All @@ -25,13 +30,15 @@ class NotJsonSerializableMock:
pass


helpers.bulk = mock.MagicMock()
in_ci = os.environ.get("GITHUB_ACTIONS") == "true"

helpers.parallel_bulk = mock.MagicMock()


class TestOpenSearchOutput(BaseOutputTestCase):
CONFIG = {
"type": "opensearch_output",
"hosts": ["host:123"],
"hosts": ["localhost:9200"],
"default_index": "default_index",
"error_index": "error_index",
"message_backlog_size": 1,
Expand All @@ -41,7 +48,7 @@ class TestOpenSearchOutput(BaseOutputTestCase):
def test_describe_returns_output(self):
assert (
self.object.describe()
== "OpensearchOutput (Test Instance Name) - Opensearch Output: ['host:123']"
== "OpensearchOutput (Test Instance Name) - Opensearch Output: ['localhost:9200']"
)

def test_store_sends_to_default_index(self):
Expand Down Expand Up @@ -193,8 +200,7 @@ def test_handle_bulk_index_error_calls_bulk_with_error_documents(self, fake_bulk
}
]
self.object._handle_bulk_index_error(mock_bulk_index_error)
call_args = fake_bulk.call_args[0][1]
error_document = call_args[0]
error_document = fake_bulk.call_args.kwargs.get("actions").pop()
assert "reason" in error_document
assert "@timestamp" in error_document
assert "_index" in error_document
Expand Down Expand Up @@ -360,3 +366,27 @@ def test_message_backlog_is_cleared_after_it_was_written(self):
self.object._config.message_backlog_size = 1
self.object.store({"event": "test_event"})
assert len(self.object._message_backlog) == 0

@pytest.mark.skip(reason="This test is only for local debugging")
def test_opensearch_parallel_bulk(self):
config = {
"type": "opensearch_output",
"hosts": ["localhost:9200"],
"default_index": "default_index",
"error_index": "error_index",
"message_backlog_size": 1,
"timeout": 5000,
}
output: OpensearchOutput = Factory.create({"opensearch_output": config}, mock.MagicMock())
uuid_str = str(uuid.uuid4())
result = output._search_context.search(
index="defaultindex", body={"query": {"match": {"foo": uuid_str}}}
)
len_before = len(result["hits"]["hits"])
output._message_backlog = [{"foo": uuid_str, "_index": "defaultindex"}]
output._write_backlog()
time.sleep(1)
result = output._search_context.search(
index="defaultindex", body={"query": {"match": {"foo": uuid_str}}}
)
assert len(result["hits"]["hits"]) > len_before

0 comments on commit 5046759

Please sign in to comment.