Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 32 additions & 45 deletions src/openhound/core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ def __init__(
self.base_path = Path(base_path) if base_path else self.default_platform_path()
self.log_file_path: Path | None = None

# Share one rotating handler per file across loggers; opening the same file
# with multiple handlers breaks rotation on Windows (WinError 32).
self._file_handlers: dict[Path, "RotatingFileHandler"] = {}

self.handlers = {
LogMode.CLI: self.cli_handlers,
LogMode.CONTAINER: self.container_handlers,
Expand Down Expand Up @@ -370,33 +374,45 @@ def _file_formatter(self) -> logging.Formatter:
return OpenHoundJSONFormatter()
return OpenHoundTextFormatter()

def container_handlers(self, logger: logging.Logger, file_path: Path) -> None:
"""Set the logging handler/format when running in a container"""

formatter = self._file_formatter()

# Log to stdout for better compatibility with container-based logging systems.
# Output is human-readable text by default; set runtime.log_format = "JSON" for structured JSON.
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
logger.addHandler(stdout_handler)

# But also log the same format to a file for persistence and debugging when needed
def _build_file_handler(self, file_path: Path) -> "RotatingFileHandler":
"""Create and configure a rotating file handler for the given path."""
rotating_file_handler = RotatingFileHandler(
file_path,
when=self.rotate_when,
interval=self.interval,
backupCount=self.backup_count,
max_bytes=self.max_bytes,
)
rotating_file_handler.setFormatter(formatter)
rotating_file_handler.setFormatter(self._file_formatter())
# This regular expression overrides the default extMatch to recognize both
# default time based rotation filenames and size based rotation filenames (which gets a seconds added as well)
rotating_file_handler.extMatch = re.compile(
r"(?<!\d)\d{4}-\d{2}-\d{2}_\d{2}(-\d{2}-\d{2})?(?!\d)", re.ASCII
)
return rotating_file_handler

logger.addHandler(rotating_file_handler)
def _get_file_handler(self, file_path: Path) -> "RotatingFileHandler":
"""Return a shared rotating file handler for the path (one open file per path)."""
key = Path(file_path).resolve()
handler = self._file_handlers.get(key)
if handler is None:
handler = self._build_file_handler(file_path)
self._file_handlers[key] = handler
return handler

def container_handlers(self, logger: logging.Logger, file_path: Path) -> None:
"""Set the logging handler/format when running in a container"""

formatter = self._file_formatter()

# Log to stdout for better compatibility with container-based logging systems.
# Output is human-readable text by default; set runtime.log_format = "JSON" for structured JSON.
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
logger.addHandler(stdout_handler)

# But also log the same format to a file for persistence and debugging when needed
logger.addHandler(self._get_file_handler(file_path))

def cli_handlers(self, logger: logging.Logger, file_path: Path) -> None:
"""Set the logging handler/format when running as a standalone CLI tool"""
Expand All @@ -416,40 +432,11 @@ def cli_handlers(self, logger: logging.Logger, file_path: Path) -> None:
logger.addHandler(console_handler)

# But also save the logs to a file using the configured format (text by default)
file_formatter = self._file_formatter()
rotating_file_handler = RotatingFileHandler(
file_path,
when=self.rotate_when,
interval=self.interval,
backupCount=self.backup_count,
max_bytes=self.max_bytes,
)
rotating_file_handler.setFormatter(file_formatter)
# This regular expression overrides the default extMatch to recognize both
# default time based rotation filenames and size based rotation filenames (which gets a seconds added as well)
rotating_file_handler.extMatch = re.compile(
r"(?<!\d)\d{4}-\d{2}-\d{2}_\d{2}(-\d{2}-\d{2})?(?!\d)", re.ASCII
)

logger.addHandler(rotating_file_handler)
logger.addHandler(self._get_file_handler(file_path))

def service_handlers(self, logger: logging.Logger, file_path: Path) -> None:
"""Set the logging handler/format when running the OpenHound service"""
file_formatter = self._file_formatter()
rotating_file_handler = RotatingFileHandler(
file_path,
when=self.rotate_when,
interval=self.interval,
backupCount=self.backup_count,
max_bytes=self.max_bytes,
)
rotating_file_handler.setFormatter(file_formatter)
# This regular expression overrides the default extMatch to recognize both
# default time based rotation filenames and size based rotation filenames (which gets a seconds added as well)
rotating_file_handler.extMatch = re.compile(
r"(?<!\d)\d{4}-\d{2}-\d{2}_\d{2}(-\d{2}-\d{2})?(?!\d)", re.ASCII
)
logger.addHandler(rotating_file_handler)
logger.addHandler(self._get_file_handler(file_path))

@property
def runtime_mode(self) -> LogMode:
Expand Down
97 changes: 74 additions & 23 deletions src/openhound/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import errno
import logging
import sys
import time
from abc import ABC, abstractmethod

from dlt.common.configuration.exceptions import ConfigFieldMissingException
Expand All @@ -8,33 +12,80 @@

from openhound.core.exceptions import ConfigException, ParseException

logger = logging.getLogger(__name__)

# Windows can transiently lock freshly written load-package files, so dlt's
# per-file rename fallback raises PermissionError (WinError 5); retry the run.
_TRANSIENT_RENAME_ERRNOS = {errno.EACCES, errno.EPERM}
_MAX_TRANSIENT_RETRIES = 5
_TRANSIENT_RETRY_BACKOFF = 0.25


def _transient_filesystem_cause(err: BaseException) -> OSError | None:
"""Return the underlying transient filesystem error in the cause chain, if any."""
seen: set[int] = set()
current: BaseException | None = err
while current is not None and id(current) not in seen:
seen.add(id(current))
if isinstance(current, OSError) and current.errno in _TRANSIENT_RENAME_ERRNOS:
return current
nested = getattr(current, "exception", None)
if nested is None:
nested = current.__cause__ or current.__context__
current = nested
return None


class BasePipeline(ABC):
@property
@abstractmethod
def pipeline(self) -> "Pipeline": ...

def _run(self, source, **kwargs) -> LoadInfo:
try:
return self.pipeline.run(source, **kwargs)
except PipelineStepFailed as err:
if isinstance(err.exception, ConfigFieldMissingException):
config_cause: ConfigFieldMissingException = err.exception
raise ConfigException(
pipeline_name=err.pipeline.pipeline_name,
destination=str(err.pipeline.destination),
dataset_name=err.pipeline.dataset_name,
spec_name=config_cause.spec_name,
message=str(err.exception),
) from None

if isinstance(err.exception, ResourceExtractionError):
extract_cause: ResourceExtractionError = err.exception
raise ParseException(
pipeline_name=err.pipeline.pipeline_name,
destination=str(err.pipeline.destination),
dataset_name=err.pipeline.dataset_name,
step=extract_cause.pipe_name,
message=extract_cause.msg,
)
raise
last_err: BaseException | None = None
for attempt in range(_MAX_TRANSIENT_RETRIES):
try:
return self.pipeline.run(source, **kwargs)
except PipelineStepFailed as err:
if isinstance(err.exception, ConfigFieldMissingException):
config_cause: ConfigFieldMissingException = err.exception
raise ConfigException(
pipeline_name=err.pipeline.pipeline_name,
destination=str(err.pipeline.destination),
dataset_name=err.pipeline.dataset_name,
spec_name=config_cause.spec_name,
message=str(err.exception),
) from None

if isinstance(err.exception, ResourceExtractionError):
extract_cause: ResourceExtractionError = err.exception
raise ParseException(
pipeline_name=err.pipeline.pipeline_name,
destination=str(err.pipeline.destination),
dataset_name=err.pipeline.dataset_name,
step=extract_cause.pipe_name,
message=extract_cause.msg,
)

if _transient_filesystem_cause(err) is None:
raise
last_err = err
except PermissionError as err:
if sys.platform != "win32" or err.errno not in _TRANSIENT_RENAME_ERRNOS:
raise
last_err = err

if attempt + 1 >= _MAX_TRANSIENT_RETRIES:
break

logger.warning(
"Transient filesystem error; retrying run (%d/%d): %s",
attempt + 1,
_MAX_TRANSIENT_RETRIES,
last_err,
)
time.sleep(_TRANSIENT_RETRY_BACKOFF * (attempt + 1))

if last_err is None:
raise RuntimeError("unreachable: retry loop exited without an error")
raise last_err
106 changes: 106 additions & 0 deletions tests/test_log_handlers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
from pathlib import Path

import pytest

Expand Down Expand Up @@ -156,3 +157,108 @@ def test_text_formatter_produces_plain_text():

with pytest.raises(json.JSONDecodeError):
json.loads(output)


def test_get_file_handler_caches_handler_per_path(tmp_path):
"""The shared handler cache should return the same instance for a given path and
distinct instances for different paths, so each file is only opened once."""
custom_logger = CustomLogger("openhound.log", base_path=str(tmp_path))

path_a = tmp_path / "openhound.log"
path_b = tmp_path / "ext_test.log"

handler_a = custom_logger._get_file_handler(path_a)
try:
# A repeated lookup (even via a re-built equivalent Path) returns the cache hit
assert custom_logger._get_file_handler(path_a) is handler_a, (
"The same path should return the cached handler instance"
)
assert custom_logger._get_file_handler(Path(str(path_a))) is handler_a, (
"Equivalent paths should resolve to the same cached handler"
)

# A different path gets its own dedicated handler
handler_b = custom_logger._get_file_handler(path_b)
assert handler_b is not handler_a, (
"A different path should produce a different handler instance"
)
assert isinstance(handler_b, RotatingFileHandler), (
"Cached handlers should be RotatingFileHandler instances"
)
finally:
for handler in custom_logger._file_handlers.values():
handler.close()


def test_root_and_dlt_loggers_share_single_file_handler(tmp_path, monkeypatch):
"""The root and dlt loggers writing to openhound.log must share one handler
instance so the file is only opened once, which is required for rotation on
Windows where an open handle blocks renaming the file."""
# A sibling module may set RUNTIME__LOG_PATH on import; keep our explicit base_path.
monkeypatch.delenv("RUNTIME__LOG_PATH", raising=False)
custom_logger = CustomLogger("openhound.log", base_path=str(tmp_path))

try:
custom_logger.setup()

root_logger = logging.getLogger()
dlt_logger = logging.getLogger("dlt")

root_file_handlers = [
handler
for handler in root_logger.handlers
if isinstance(handler, RotatingFileHandler)
]
dlt_file_handlers = [
handler
for handler in dlt_logger.handlers
if isinstance(handler, RotatingFileHandler)
]

assert len(root_file_handlers) == 1, (
"The root logger should have exactly one rotating file handler"
)
assert len(dlt_file_handlers) == 1, (
"The dlt logger should have exactly one rotating file handler"
)
assert root_file_handlers[0] is dlt_file_handlers[0], (
"Root and dlt loggers must share the same handler instance for openhound.log"
)
assert root_file_handlers[0].baseFilename.endswith("openhound.log"), (
"The shared handler should write to 'openhound.log'"
)
finally:
# Restore the shared global logging state for subsequent tests.
logger_override.setup()


def test_build_file_handler_applies_rotation_settings(tmp_path):
"""_build_file_handler should produce a RotatingFileHandler configured with the
logger's rotation settings, the custom extMatch, and the selected formatter."""
custom_logger = CustomLogger(
"openhound.log",
base_path=str(tmp_path),
backup_count=7,
max_bytes=1234,
log_format="json",
)

handler = custom_logger._build_file_handler(tmp_path / "openhound.log")
try:
assert isinstance(handler, RotatingFileHandler), (
"The built handler should be a RotatingFileHandler"
)
assert handler.backupCount == 7, "The configured backup_count should be applied"
assert handler.max_bytes == 1234, "The configured max_bytes should be applied"
assert isinstance(handler.formatter, OpenHoundJSONFormatter), (
"log_format 'json' should attach the JSON formatter to the handler"
)
# The overridden extMatch must recognize both time- and size-based suffixes
assert handler.extMatch.match("2024-01-02_03"), (
"extMatch should recognize the time-based rotation suffix"
)
assert handler.extMatch.match("2024-01-02_03-04-05"), (
"extMatch should recognize the size-based rotation suffix"
)
finally:
handler.close()
Loading