Skip to content

Commit b06b97c

Browse files
IntegerAlexgitbot
andcommitted
refactor(core): implement deterministic async flush synchronization
Replace timing-based best-effort flushing with a robust synchronization barrier using a dedicated _FlushSignal sentinel and threading events. The AsyncLogger.flush method now enqueues a signal that blocks the calling thread until the background worker processes all preceding log entries, ensuring data integrity during shutdown Co-authored-by: gitbot <gitbot@gossorg.in>
1 parent 5e8a9a0 commit b06b97c

4 files changed

Lines changed: 109 additions & 25 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10-
## [2.2.0] -
10+
## [0.2.2] - 2026-03-
1111

1212
### Added
1313
- `LOG_LEVEL_DEBUG`, `LOG_LEVEL_INFO`, `LOG_LEVEL_WARNING`, `LOG_LEVEL_ERROR`, and `LOG_LEVEL_CRITICAL` named constants exported from the top-level package, replacing bare integer literals throughout the API.

kakashi/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@
9595
__description__ = "Professional High-Performance Logging Library"
9696
__url__ = "https://github.com/kakashi/logging"
9797

98+
# Backward-compatible aliases for callers using `kakashi.version`/`kakashi.author`.
99+
version = __version__
100+
author = __author__
101+
98102
# ============================================================================
99103
# MAIN EXPORTS
100104
# ============================================================================
@@ -151,6 +155,8 @@
151155
# ---- VERSION AND METADATA ----
152156
"__version__",
153157
"__author__",
158+
"version",
159+
"author",
154160
"__description__",
155161
"__url__",
156-
]
162+
]

kakashi/core/logger.py

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@
3535
LOG_LEVEL_ERROR = 40
3636
LOG_LEVEL_CRITICAL = 50
3737

38-
# Wait time (seconds) for AsyncLogger.close() best-effort flush
39-
_ASYNC_CLOSE_WAIT_SECS = 0.05
40-
4138
# Thread-local storage for lock-free operation
4239
_thread_local = threading.local()
4340

@@ -47,44 +44,74 @@
4744
_async_shutdown = threading.Event()
4845

4946

47+
class _FlushSignal:
48+
"""Queue item used to signal a synchronous flush barrier."""
49+
50+
__slots__ = ("event",)
51+
52+
def __init__(self, event: threading.Event):
53+
self.event = event
54+
55+
5056

5157

5258

5359
def _async_worker_thread():
5460
"""Background worker for async logging."""
5561
batch = []
5662
batch_size = 50 # Optimal batch size for throughput/latency balance
57-
58-
while not _async_shutdown.is_set():
63+
64+
while True:
5965
try:
6066
# Collect batch
6167
batch.clear()
6268
timeout = 0.1 # 100ms batch timeout
63-
69+
shutdown_requested = False
70+
pending_count = 0
71+
6472
try:
6573
# Get first item (blocking)
6674
item = _async_queue.get(timeout=timeout)
67-
if item is None: # Shutdown signal
75+
if item is None:
76+
_async_queue.task_done()
6877
break
78+
if isinstance(item, _FlushSignal):
79+
item.event.set()
80+
_async_queue.task_done()
81+
continue
82+
6983
batch.append(item)
70-
84+
pending_count = 1
85+
7186
# Collect additional items (non-blocking)
7287
for _ in range(batch_size - 1):
7388
try:
7489
item = _async_queue.get_nowait()
75-
if item is None: # Shutdown signal
90+
if item is None:
91+
_async_queue.task_done()
92+
shutdown_requested = True
7693
break
94+
if isinstance(item, _FlushSignal):
95+
item.event.set()
96+
_async_queue.task_done()
97+
continue
7798
batch.append(item)
99+
pending_count += 1
78100
except queue.Empty:
79101
break
80-
102+
81103
except queue.Empty:
82104
continue
83-
105+
84106
# Process batch
85107
if batch:
86108
_process_async_batch(batch)
87-
109+
for _ in range(pending_count):
110+
_async_queue.task_done()
111+
112+
if shutdown_requested:
113+
break
114+
88115
except Exception:
89116
pass # Ignore errors in background thread
90117

@@ -255,12 +282,8 @@ def __init__(self, name: str, min_level: int = LOG_LEVEL_INFO):
255282
_ensure_async_worker()
256283

257284
def close(self) -> None:
258-
"""Best-effort flush of pending messages for this logger by waiting briefly."""
259-
# Draining items belonging to this logger from the queue would require
260-
# restructuring the shared queue, so we do a best-effort flush by
261-
# waiting briefly for the background worker to catch up. This does not
262-
# guarantee that all enqueued messages have been processed.
263-
time.sleep(_ASYNC_CLOSE_WAIT_SECS)
285+
"""Flush all pending async messages before returning."""
286+
self.flush()
264287

265288
def _log_async(self, level: int, message: str, fields: Optional[Dict[str, Any]] = None) -> None:
266289
"""True asynchronous logging - non-blocking enqueue."""
@@ -310,11 +333,11 @@ def exception(self, message: str, **fields) -> None:
310333
self._log_async(40, message, fields)
311334

312335
def flush(self) -> None:
313-
"""Best-effort, timing-based flush by yielding to the background worker."""
314-
# For async logger, we can't force immediate flush, but we can yield
315-
# briefly to allow background processing. This does not guarantee that
316-
# all pending messages have been processed.
317-
time.sleep(_ASYNC_CLOSE_WAIT_SECS)
336+
"""Block until all queued async messages are processed."""
337+
_ensure_async_worker()
338+
marker = _FlushSignal(threading.Event())
339+
_async_queue.put(marker)
340+
marker.event.wait()
318341

319342

320343
# Lock-free logger cache using thread-local storage
@@ -376,6 +399,9 @@ def shutdown_async_logging() -> None:
376399
"""Shutdown async logging gracefully."""
377400
global _async_worker
378401
if _async_worker and _async_worker.is_alive():
402+
# Ensure all enqueued items are processed before stopping the worker.
403+
_async_queue.join()
404+
379405
# Send shutdown sentinel so the worker can finish processing
380406
try:
381407
_async_queue.put_nowait(None) # Shutdown signal

performance_tests/test_api_compatibility.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from pathlib import Path
1212
from typing import Any
1313
import tempfile
14+
import threading
1415

1516
# Add parent directory to path
1617
sys.path.insert(0, str(Path(__file__).parent.parent))
@@ -240,3 +241,54 @@ def log_messages():
240241

241242
# If we get here without errors, concurrent access works
242243
assert True
244+
245+
246+
class TestAsyncFlushRegression:
247+
"""Regression coverage for AsyncLogger.flush synchronization semantics."""
248+
249+
def test_async_flush_waits_for_worker_processing(self, monkeypatch):
250+
"""
251+
Ensure flush is synchronization-based, not timing-based.
252+
253+
The worker is deliberately blocked while processing a batch. flush()
254+
must block until processing is released.
255+
"""
256+
from kakashi import get_async_logger, shutdown_async_logging
257+
import kakashi.core.logger as core_logger
258+
259+
original_process = core_logger._process_async_batch
260+
processing_started = threading.Event()
261+
release_processing = threading.Event()
262+
flush_finished = threading.Event()
263+
264+
def blocking_process(batch):
265+
processing_started.set()
266+
# If flush is truly synchronized with queue processing, it will
267+
# remain blocked until this event is released.
268+
release_processing.wait(timeout=2.0)
269+
original_process(batch)
270+
271+
monkeypatch.setattr(core_logger, "_process_async_batch", blocking_process)
272+
273+
logger = get_async_logger("test_async_flush_regression")
274+
logger.info("must be processed before flush returns")
275+
276+
# If flush regresses to a fixed sleep, this worker blocking won't matter.
277+
flush_thread = threading.Thread(
278+
target=lambda: (logger.flush(), flush_finished.set()),
279+
daemon=True,
280+
)
281+
flush_thread.start()
282+
283+
assert processing_started.wait(timeout=1.0), "Worker never started processing"
284+
assert not flush_finished.wait(timeout=0.05), (
285+
"flush() returned before queued work finished; likely timing-based regression"
286+
)
287+
288+
release_processing.set()
289+
flush_thread.join(timeout=1.0)
290+
assert not flush_thread.is_alive(), "flush() did not finish after worker was released"
291+
assert flush_finished.is_set(), "flush() did not complete successfully"
292+
293+
# Keep test isolation strong for following tests.
294+
shutdown_async_logging()

0 commit comments

Comments
 (0)