Skip to content

Logstash rework #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 113 additions & 5 deletions tests/extras/test_logstash.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,115 @@
from yellowbox.extras.logstash import LogstashService
from time import sleep

# rebuilding the logstash service is an ongoing ticket, for now we just want to make sure it runs alright
import pytest
import json
import socket
from yellowbox.extras.logstash import FakeLogstashService

@pytest.fixture
def logstash():
ls = FakeLogstashService()
ls.start()
return ls


def create_socket(logstash):
return socket.create_connection((logstash.local_host, logstash.port))


def send_record(logstash, **kwargs):
s = create_socket(logstash)
s.sendall(json.dumps(kwargs).encode("utf-8")+b"\n")
s.close()
sleep(0.01)


def send_records(logstash, *records):
s = create_socket(logstash)
data = "\n".join([json.dumps(record) for record in records]) + "\n"
data = data.encode("utf-8")
s.sendall(data)
s.close()
sleep(0.01)


def test_sanity(logstash):
send_record(logstash, msg="test")
assert logstash.records[0] == {"msg": "test"}


def test_multiple_records(logstash):
send_records(logstash, {"msg": "hello"}, {"msg": "meow"})
assert logstash.records == [{"msg": "hello"}, {"msg": "meow"}]


def test_multiple_connections(logstash):
send_record(logstash, msg="test")
send_record(logstash, msg="test2")
assert logstash.records == [{"msg": "test"}, {"msg": "test2"}]


def test_half_record(logstash):
s = create_socket(logstash)
s.sendall(b'{"ms')
s.sendall(b'g": "t')
s.sendall(b'est"}\n')
s.close()
sleep(0.01)
assert logstash.records[0] == {"msg": "test"}


def test_bad_record(logstash):
s = create_socket(logstash)
s.sendall(b"{'sdafasdgsdgs\n")
sleep(0.05)

# Bad socket was closed
with pytest.raises(BrokenPipeError):
s.sendall(b"asdasd")
sleep(0.05)
s.sendall(b"asdasd")

# Server still works
send_record(logstash, msg="hello")
assert logstash.records == [{"msg": "hello"}]


def test_assert_logs(logstash):
send_record(logstash, level="INFO")

with pytest.raises(AssertionError):
logstash.assert_logs("ERROR")

send_record(logstash, level="ERROR")

# Doesn't throw an error
logstash.assert_logs("ERROR")


def test_assert_no_logs(logstash):
send_record(logstash, level="INFO")

# Doesn't throw an error
logstash.assert_no_logs("ERROR")

send_record(logstash, level="ERROR", message="hello")

with pytest.raises(AssertionError):
logstash.assert_no_logs("ERROR")


def test_filter_records(logstash):
send_records(logstash, {"level": "INFO"}, {"level": "WARNING"},
{"level": "ERROR"})
assert list(logstash.filter_records("warning")) == [
{"level": "WARNING"}, {"level": "ERROR"}]


def test_is_alive():
logstash = FakeLogstashService()
assert not logstash.is_alive()
logstash.start()
assert logstash.is_alive()
logstash.stop()
assert not logstash.is_alive()

def test_make(docker_client):
with LogstashService.run(docker_client):
pass
92 changes: 54 additions & 38 deletions yellowbox/extras/logstash.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import selectors
import socket
import threading
from typing import Any, Callable, Dict, Iterable, List, Union, cast
from typing import Any, Callable, Dict, Iterator, List, Union, cast
from weakref import WeakMethod

from yellowbox.subclasses import YellowService
Expand All @@ -18,6 +18,23 @@
_CLOSE_SENTINEL = b"\0"


def _level_to_int(level: Union[str, int]) -> int:
"""Convert a log level in str or int into an int.

Args:
level: Can be a string or int

Returns:
level as int.
"""
if not isinstance(level, int):
level = logging.getLevelName(level.upper())

# In rare cases it might be false, but it shouldn't generally happen.
assert isinstance(level, int)
return level


class FakeLogstashService(YellowService):
"""Implements a fake logging service that closely resembles Logstash.

Expand Down Expand Up @@ -50,25 +67,6 @@ class FakeLogstashService(YellowService):
>>> ls.assert_logs("ERROR")
>>> assert ls.records[0]["record"] == "value"
"""
port: int
records: List[Dict[str, Any]]
"""
Records generated by Python's numerous Logstash packages have at least the
following keys in common:
* @timestamp: str - Log timestamp in ISO-8601 format.
* @version: str - Logstash format version (always 1)
* message: str - Log message.
* host: str - Host sending the message.
* path: str - Path to the module writing the log.
* tags: List[str] - ?
* type: str - "Logstash"
* level: str - An all upper-case name of the log level
* logger_name: str - Logger name
* stack_info: Optional[str] - Formatted stacktrace if one exists.

More keys may be added by the specific software sending the logs.
"""

delimiter: bytes = b"\n"
encoding: str = "utf-8"
local_host: str = "localhost"
Expand All @@ -81,7 +79,24 @@ def __init__(self, port: int = 0) -> None:
port: Port to listen on. By default or if set to 0, port is chosen
by the OS.
"""
self.records = []

self.records: List[Dict[str, Any]] = []
"""
Records generated by Python's numerous Logstash packages have at least the
following keys in common:
* @timestamp: str - Log timestamp in ISO-8601 format.
* @version: str - Logstash format version (always 1)
* message: str - Log message.
* host: str - Host sending the message.
* path: str - Path to the module writing the log.
* tags: List[str] - ?
* type: str - "Logstash"
* level: str - An all upper-case name of the log level
* logger_name: str - Logger name
* stack_info: Optional[str] - Formatted stacktrace if one exists.

More keys may be added by the specific software sending the logs.
"""

root = socket.socket()
root.bind(("0.0.0.0", port))
Expand All @@ -97,7 +112,7 @@ def __init__(self, port: int = 0) -> None:
# Sockets used for signalling shutdown
self._rshutdown, self._wshutdown = socket.socketpair()

self.port = self._root.getsockname()[1]
self.port: int = self._root.getsockname()[1]

def __del__(self):
# Will never happen while thread is running.
Expand Down Expand Up @@ -136,6 +151,8 @@ def process_socket_data():
chunks = data.split(delimiter)

# Single partial chunk (no delimiter)
# todo: Cap max chunks length. Not a security issue as we're using
# todo: Yellowbox solely for testing.
if len(chunks) == 1:
partial_chunks.append(chunks[0])
return
Expand All @@ -152,6 +169,7 @@ def process_socket_data():
self.records.append(record_dict)
except json.JSONDecodeError:
self._selector.unregister(sock)
sock.shutdown(socket.SHUT_RDWR)
sock.close()
# noinspection PyUnboundLocalVariable
_logger.exception("Failed decoding json, closing socket. "
Expand Down Expand Up @@ -183,16 +201,20 @@ def _background_thread(self):
key.data()
except Exception:
_logger.exception("Unknown error occurred, closing connection.")
self._selector.unregister(key.fileobj)
# noinspection PyUnresolvedReferences
key.fileobj.close()
sock = cast(socket.socket, key.fileobj)
self._selector.unregister(sock)
sock.shutdown(socket.SHUT_RDWR)
sock.close()

def start(self, *, retry_spec: None = None) -> None:
"""Start the service.

Args:
retry_spec: Ignored.
"""
if retry_spec:
_logger.warning(f"{self.__class__.__name__} does not support passing "
f"a retry spec.")
self._root.listen()
self._selector.register(self._root, selectors.EVENT_READ)
self._selector.register(self._rshutdown, selectors.EVENT_READ)
Expand Down Expand Up @@ -237,8 +259,10 @@ def disconnect(self, network: Any):
"""Does nothing. Conforms to YellowService interface."""
pass

def _filter_records(self, level: int) -> Iterable[Dict[str, Any]]:
def filter_records(self, level: Union[str, int]) -> Iterator[Dict[str, Any]]:
"""Filter records in the given level or above."""
level = _level_to_int(level)

return (record for record in self.records if
logging.getLevelName(record["level"]) >= level)

Expand All @@ -253,13 +277,9 @@ def assert_logs(self, level: Union[str, int]):
Raises:
AssertionError: No logs above the given level were received.
"""
if not isinstance(level, int):
level = logging.getLevelName(level.upper())

# In rare cases it might be false, but shouldn't happen generally.
assert isinstance(level, int)
level = _level_to_int(level)

if not any(self._filter_records(level)):
if not any(self.filter_records(level)):
raise AssertionError(f"No logs of level {logging.getLevelName(level)} "
f"or above were received.")

Expand All @@ -272,13 +292,9 @@ def assert_no_logs(self, level: Union[str, int]):
Raises:
AssertionError: A log above the given level was received.
"""
if not isinstance(level, int):
level = logging.getLevelName(level.upper())

# In rare cases it might be false, but shouldn't happen generally.
assert isinstance(level, int)
level = _level_to_int(level)

record = next(self._filter_records(level), None)
record = next(self.filter_records(level), None)
if record:
raise AssertionError(f"A log level {record['level']} was received. "
f"Message: {record['message']}")