diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index 79885d870..c4cda2d1b 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -77,6 +77,7 @@ * Responds with 405 """ +import asyncio import multiprocessing as mp import queue import re @@ -101,7 +102,7 @@ from logprep.util.credentials import CredentialsFactory -def decorator_basic_auth(func: Callable): +def basic_auth(func: Callable): """Decorator to check basic authentication. Will raise 401 on wrong credentials or missing Authorization-Header""" @@ -121,7 +122,7 @@ async def func_wrapper(*args, **kwargs): return func_wrapper -def decorator_request_exceptions(func: Callable): +def handle_request_exceptions(func: Callable): """Decorator to wrap http calls and raise exceptions""" async def func_wrapper(*args, **kwargs): @@ -144,7 +145,7 @@ async def func_wrapper(*args, **kwargs): return func_wrapper -def decorator_add_metadata(func: Callable): +def add_metadata(func: Callable): """Decorator to add metadata to resulting http event. Uses attribute collect_meta of endpoint class to decide over metadata collection Uses attribute metafield_name to define key name for metadata @@ -200,27 +201,34 @@ def __init__( collect_meta: bool, metafield_name: str, credentials: dict, + number_of_http_requests: CounterMetric, ) -> None: self.messages = messages self.collect_meta = collect_meta self.metafield_name = metafield_name self.credentials = credentials + self.number_of_http_requests = number_of_http_requests if self.credentials: self.basicauth_b64 = b64encode( f"{self.credentials.username}:{self.credentials.password}".encode("utf-8") ).decode("utf-8") + def collect_metrics(self): + """Increment number of requests""" + self.number_of_http_requests += 1 + class JSONHttpEndpoint(HttpEndpoint): """:code:`json` endpoint to get json from request""" _decoder = msgspec.json.Decoder() - @decorator_request_exceptions - @decorator_basic_auth - @decorator_add_metadata + @handle_request_exceptions + @basic_auth + @add_metadata async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """json endpoint method""" + self.collect_metrics() data = await req.stream.read() data = data.decode("utf8") metadata = kwargs.get("metadata", {}) @@ -234,11 +242,12 @@ class JSONLHttpEndpoint(HttpEndpoint): _decoder = msgspec.json.Decoder() - @decorator_request_exceptions - @decorator_basic_auth - @decorator_add_metadata + @handle_request_exceptions + @basic_auth + @add_metadata async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """jsonl endpoint method""" + self.collect_metrics() data = await req.stream.read() data = data.decode("utf8") event = kwargs.get("metadata", {}) @@ -253,11 +262,12 @@ class PlaintextHttpEndpoint(HttpEndpoint): """:code:`plaintext` endpoint to get the body from request and put it in :code:`message` field""" - @decorator_request_exceptions - @decorator_basic_auth - @decorator_add_metadata + @handle_request_exceptions + @basic_auth + @add_metadata async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """plaintext endpoint method""" + self.collect_metrics() data = await req.stream.read() metadata = kwargs.get("metadata", {}) event = {"message": data.decode("utf8")} @@ -412,7 +422,11 @@ def setup(self): endpoint_class = self._endpoint_registry.get(endpoint_type) credentials = cred_factory.from_endpoint(endpoint_path) endpoints_config[endpoint_path] = endpoint_class( - self.messages, collect_meta, metafield_name, credentials + self.messages, + collect_meta, + metafield_name, + credentials, + self.metrics.number_of_http_requests, ) app = self._get_asgi_app(endpoints_config) diff --git a/tests/unit/connector/test_http_input.py b/tests/unit/connector/test_http_input.py index 1a21c2c44..cd94bde1e 100644 --- a/tests/unit/connector/test_http_input.py +++ b/tests/unit/connector/test_http_input.py @@ -6,7 +6,6 @@ from copy import deepcopy from unittest import mock -import falcon import pytest import requests import uvicorn @@ -73,6 +72,12 @@ def setup_method(self): }, } + expected_metrics = [ + *BaseInputTestCase.expected_metrics, + "logprep_message_backlog_size", + "logprep_number_of_http_requests", + ] + def teardown_method(self): while not self.object.messages.empty(): self.object.messages.get(timeout=0.001)