Skip to content

Commit 0359ef6

Browse files
feat: Add ClpKeyValuePairStreamHandler which supports logging dictionary-type log events into CLP's key-value pair IR format. (#46)
Co-authored-by: kirkrodrigues <[email protected]>
1 parent 53242ec commit 0359ef6

File tree

6 files changed

+518
-8
lines changed

6 files changed

+518
-8
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ readme = "README.md"
1414
requires-python = ">=3.7"
1515
dependencies = [
1616
"backports.zoneinfo >= 0.2.1; python_version < '3.9'",
17-
"clp-ffi-py >= 0.0.11",
17+
"clp-ffi-py >= 0.0.14",
1818
"typing-extensions >= 3.7.4",
1919
"tzlocal == 5.1; python_version < '3.8'",
2020
"tzlocal >= 5.2; python_version >= '3.8'",
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import logging
2+
from typing import Any, Dict
3+
4+
from clp_logging.utils import Timestamp
5+
6+
TIMESTAMP_KEY: str = "timestamp"
7+
TIMESTAMP_UNIX_MILLISECS_KEY: str = "unix_millisecs"
8+
TIMESTAMP_UTC_OFFSET_SECS_KEY: str = "utc_offset_secs"
9+
10+
LEVEL_KEY: str = "level"
11+
LEVEL_NUM_KEY: str = "num"
12+
LEVEL_NAME_KEY: str = "name"
13+
14+
SOURCE_LOCATION_KEY: str = "source_location"
15+
SOURCE_LOCATION_PATH_KEY: str = "path"
16+
SOURCE_LOCATION_LINE_KEY: str = "line"
17+
18+
19+
class AutoGeneratedKeyValuePairsBuffer:
20+
"""
21+
A reusable buffer for auto-generated key-value pairs.
22+
23+
This buffer maintains a predefined dictionary for common metadata fields, to
24+
enable efficient reuse without creating new dictionaries for each log event.
25+
"""
26+
27+
def __init__(self) -> None:
28+
self._buf: Dict[str, Any] = {
29+
TIMESTAMP_KEY: {
30+
TIMESTAMP_UNIX_MILLISECS_KEY: None,
31+
TIMESTAMP_UTC_OFFSET_SECS_KEY: None,
32+
},
33+
LEVEL_KEY: {
34+
LEVEL_NUM_KEY: None,
35+
LEVEL_NAME_KEY: None,
36+
},
37+
SOURCE_LOCATION_KEY: {
38+
SOURCE_LOCATION_PATH_KEY: None,
39+
SOURCE_LOCATION_LINE_KEY: None,
40+
},
41+
}
42+
43+
def generate(self, ts: Timestamp, record: logging.LogRecord) -> Dict[str, Any]:
44+
"""
45+
Generates the auto-generated key-value pairs by populating the
46+
underlying buffer with the given log event metadata.
47+
48+
:param ts: The timestamp assigned to the log event.
49+
:param record: The LogRecord containing metadata for the log event.
50+
:return: The populated underlying buffer as the auto-generated key-value
51+
pairs.
52+
"""
53+
54+
self._buf[TIMESTAMP_KEY][TIMESTAMP_UNIX_MILLISECS_KEY] = ts.get_unix_ts()
55+
self._buf[TIMESTAMP_KEY][TIMESTAMP_UTC_OFFSET_SECS_KEY] = ts.get_utc_offset()
56+
57+
# NOTE: We don't add all the metadata contained in `record`. Instead, we only add the
58+
# following fields:
59+
# - Log level
60+
# - Source location
61+
62+
self._buf[LEVEL_KEY][LEVEL_NUM_KEY] = record.levelno
63+
self._buf[LEVEL_KEY][LEVEL_NAME_KEY] = record.levelname
64+
65+
self._buf[SOURCE_LOCATION_KEY][SOURCE_LOCATION_PATH_KEY] = record.pathname
66+
self._buf[SOURCE_LOCATION_KEY][SOURCE_LOCATION_LINE_KEY] = record.lineno
67+
68+
return self._buf

src/clp_logging/handlers.py

Lines changed: 178 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,23 @@
33
import socket
44
import sys
55
import time
6+
import warnings
67
from abc import ABCMeta, abstractmethod
8+
from contextlib import nullcontext
79
from math import floor
810
from pathlib import Path
911
from queue import Empty, Queue
1012
from signal import SIGINT, signal, SIGTERM
1113
from threading import Thread, Timer
1214
from types import FrameType
13-
from typing import Callable, ClassVar, Dict, IO, Optional, Tuple, Union
15+
from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union
1416

1517
import tzlocal
16-
from clp_ffi_py.ir import FourByteEncoder
18+
from clp_ffi_py.ir import FourByteEncoder, Serializer
19+
from clp_ffi_py.utils import serialize_dict_to_msgpack
1720
from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor
1821

22+
from clp_logging.auto_generated_kv_pairs_utils import AutoGeneratedKeyValuePairsBuffer
1923
from clp_logging.protocol import (
2024
BYTE_ORDER,
2125
EOF_CHAR,
@@ -25,12 +29,15 @@
2529
UINT_MAX,
2630
ULONG_MAX,
2731
)
32+
from clp_logging.utils import Timestamp
2833

2934
# TODO: lock writes to zstream if GIL ever goes away
3035
# Note: no need to quote "Queue[Tuple[int, bytes]]" in python 3.9
3136

3237
DEFAULT_LOG_FORMAT: str = " %(levelname)s %(name)s %(message)s"
3338
WARN_PREFIX: str = " [WARN][clp_logging]"
39+
AUTO_GENERATED_KV_PAIRS_KEY: str = "auto_generated_kv_pairs"
40+
USER_GENERATED_KV_PAIRS_KEY: str = "user_generated_kv_pairs"
3441

3542

3643
def _init_timeinfo(fmt: Optional[str], tz: Optional[str]) -> Tuple[str, str]:
@@ -129,9 +136,9 @@ def _write(self, loglevel: int, msg: str) -> None:
129136
# override
130137
def emit(self, record: logging.LogRecord) -> None:
131138
"""
132-
Override `logging.Handler.emit` in base class to ensure
133-
`logging.Handler.handleError` is always called and avoid requiring a
134-
`logging.LogRecord` to call internal writing functions.
139+
Implements `logging.Handler.emit` to ensure
140+
`logging.Handler.handleError` is always called and so derived classes
141+
only need to implement `_write` instead of implementing this method.
135142
"""
136143
msg: str = self.format(record) + "\n"
137144
try:
@@ -792,3 +799,169 @@ def __init__(
792799
super().__init__(
793800
open(fpath, mode), enable_compression, timestamp_format, timezone, loglevel_timeout
794801
)
802+
803+
804+
class ClpKeyValuePairStreamHandler(logging.Handler):
805+
"""
806+
A custom logging handler that serializes key-value pair log events into the
807+
CLP key-value pair IR format.
808+
809+
Differences from `logging.StreamHandler`:
810+
811+
- Log events (`logging.LogRecord`) should contain the key-value pairs that a user wants to log
812+
as a Python dictionary.
813+
- As a result, the key-value pairs will not be formatted into a string before being written.
814+
- The key-value pairs will be serialized into the CLP key-value pair IR format before writing to
815+
the stream.
816+
817+
Key-value pairs in the log event must abide by the following rules:
818+
- Keys must be of type `str`.
819+
- Values must be one of the following types:
820+
- Primitives: `int`, `float`, `str`, `bool`, or `None`.
821+
- Arrays, where each array:
822+
- may contain primitive values, dictionaries, or nested arrays.
823+
- can be empty.
824+
- Dictionaries, where each dictionary:
825+
- must adhere to the aforementioned rules for keys and values.
826+
- can be empty.
827+
828+
:param stream: A writable byte output stream to which the handler will write the serialized IR
829+
byte sequences.
830+
:param enable_compression: Whether to compress the serialized IR byte sequences using Zstandard.
831+
"""
832+
833+
def __init__(
834+
self,
835+
stream: IO[bytes],
836+
enable_compression: bool = True,
837+
) -> None:
838+
super().__init__()
839+
840+
self._enable_compression: bool = enable_compression
841+
self._serializer: Optional[Serializer] = None
842+
self._formatter: Optional[logging.Formatter] = None
843+
self._ostream: IO[bytes] = stream
844+
845+
self._auto_gen_kv_pairs_buf: AutoGeneratedKeyValuePairsBuffer = (
846+
AutoGeneratedKeyValuePairsBuffer()
847+
)
848+
849+
self._init_new_serializer(stream)
850+
851+
# override
852+
def setFormatter(self, fmt: Optional[logging.Formatter]) -> None:
853+
if fmt is None:
854+
return
855+
warnings.warn(
856+
f"{self.__class__.__name__} doesn't currently support Formatters",
857+
category=RuntimeWarning,
858+
)
859+
self._formatter = fmt
860+
861+
# override
862+
def emit(self, record: logging.LogRecord) -> None:
863+
"""
864+
Implements `logging.Handler.emit` to encode the given record into CLP's
865+
IR format before it's written to the underlying stream.
866+
867+
:param record: The log event to serialize.
868+
"""
869+
try:
870+
self._write(record)
871+
except Exception:
872+
self.handleError(record)
873+
874+
# override
875+
def setStream(self, stream: IO[bytes]) -> Optional[IO[bytes]]:
876+
"""
877+
Sets the instance's stream to the given value, if it's different from
878+
the current value. The old stream is flushed before the new stream is
879+
set.
880+
881+
NOTE: The old stream will also be closed by this method.
882+
883+
:param stream: A writable byte output stream to which the handler will write the serialized
884+
IR byte sequences.
885+
:return: The old stream if the stream was changed, or `None` if it wasn't.
886+
"""
887+
888+
# NOTE: This function is implemented by mirroring CPython's implementation.
889+
890+
if stream is self._ostream:
891+
return None
892+
893+
old_stream: IO[bytes] = self._ostream
894+
with self.lock if self.lock else nullcontext():
895+
# TODO: The following call will close the old stream whereas `logging.StreamHandler`'s
896+
# implementation will only flush the stream without closing it. To support
897+
# `logging.StreamHandler`'s behaviour, we need `clp_ffi_py.ir.Serializer` to allow
898+
# closing the serializer without closing the underlying output stream.
899+
self._init_new_serializer(stream)
900+
self._ostream = stream
901+
return old_stream
902+
903+
# override
904+
def close(self) -> None:
905+
if self._is_closed():
906+
return
907+
self._close_serializer()
908+
super().close()
909+
910+
def _is_closed(self) -> bool:
911+
return self._serializer is None
912+
913+
def _close_serializer(self) -> None:
914+
"""
915+
Closes the current serializer if it's open.
916+
917+
NOTE: The underlying output stream will also be closed.
918+
"""
919+
if self._is_closed():
920+
return
921+
assert self._serializer is not None
922+
self._serializer.close()
923+
self._serializer = None
924+
925+
def _init_new_serializer(self, stream: IO[bytes]) -> None:
926+
"""
927+
Initializes a new serializer that will write to the given stream.
928+
929+
:param stream: The stream that the underlying serializer will write to.
930+
"""
931+
self._close_serializer()
932+
self._serializer = Serializer(
933+
ZstdCompressor().stream_writer(stream) if self._enable_compression else stream
934+
)
935+
936+
def _write(self, record: logging.LogRecord) -> None:
937+
"""
938+
Writes the log event into the underlying serializer.
939+
940+
:param record: The log event to serialize.
941+
:raise RuntimeError: If the handler has been already closed.
942+
:raise TypeError: If `record.msg` is not a Python dictionary.
943+
"""
944+
if self._is_closed():
945+
raise RuntimeError("Stream already closed.")
946+
947+
if not isinstance(record.msg, dict):
948+
raise TypeError("`record.msg` must be a Python dictionary.")
949+
950+
self._serialize_kv_pair_log_event(
951+
self._auto_gen_kv_pairs_buf.generate(Timestamp.now(), record), record.msg
952+
)
953+
954+
def _serialize_kv_pair_log_event(
955+
self, auto_gen_kv_pairs: Dict[str, Any], user_gen_kv_pairs: Dict[str, Any]
956+
) -> None:
957+
"""
958+
:param auto_gen_kv_pairs: A dict of auto-generated kv-pairs.
959+
:param user_gen_kv_pairs: A dict of user-generated kv-pairs.
960+
"""
961+
if self._is_closed():
962+
raise RuntimeError("Stream already closed.")
963+
assert self._serializer is not None
964+
self._serializer.serialize_log_event_from_msgpack_map(
965+
serialize_dict_to_msgpack(auto_gen_kv_pairs),
966+
serialize_dict_to_msgpack(user_gen_kv_pairs),
967+
)

src/clp_logging/utils.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from __future__ import annotations
2+
3+
import time
4+
from math import floor
5+
6+
7+
class Timestamp:
8+
"""
9+
A timestamp represented as a Unix timestamp and a timezone offset from UTC.
10+
"""
11+
12+
@staticmethod
13+
def now() -> Timestamp:
14+
"""
15+
:return: A `Timestamp` instance representing the current time.
16+
"""
17+
ts: float = time.time()
18+
return Timestamp(
19+
unix_ts=floor(ts * 1000),
20+
utc_offset=time.localtime(ts).tm_gmtoff,
21+
)
22+
23+
def __init__(self, unix_ts: int, utc_offset: int):
24+
"""
25+
Initializes a `Timestamp` instance with the given time.
26+
27+
:param unix_ts: Unix timestamp in milliseconds.
28+
:param utc_offset: The number of seconds the timezone is ahead of
29+
(positive) or behind (negative) UTC.
30+
"""
31+
self._utc_offset: int = utc_offset
32+
self._unix_ts: int = unix_ts
33+
34+
def get_unix_ts(self) -> int:
35+
"""
36+
:return: The Unix timestamp in milliseconds.
37+
"""
38+
return self._unix_ts
39+
40+
def get_utc_offset(self) -> int:
41+
"""
42+
:return: The number of seconds the timezone is ahead of (positive) or behind (negative) UTC.
43+
"""
44+
return self._utc_offset

tests/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import unittest
22
from typing import Iterable, Optional, Union
33

4-
from tests.test_handlers import TestCLPBase, TestCLPSegmentStreamingBase
4+
from tests.test_handlers import (
5+
TestCLPBase,
6+
TestClpKeyValuePairLoggingBase,
7+
TestCLPSegmentStreamingBase,
8+
)
59

610

711
def add_tests(suite: unittest.TestSuite, loader: unittest.TestLoader, test_class: type) -> None:
@@ -35,4 +39,7 @@ def load_tests(
3539
for seg_test_class in TestCLPSegmentStreamingBase.__subclasses__():
3640
add_tests(suite, loader, seg_test_class)
3741

42+
for kv_pair_handler_test_class in TestClpKeyValuePairLoggingBase.__subclasses__():
43+
add_tests(suite, loader, kv_pair_handler_test_class)
44+
3845
return suite

0 commit comments

Comments
 (0)