-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add datastream logging support to invenio-logging
* Introduced `LogManager` to handle structured logging via registered builders. * Added `SearchBackend` to store logs in a search engine (OpenSearch/Elasticsearch). * Implemented `LogEvent` for structured log representation. * Enabled dynamic log builder registration via entry points. * Integrated Celery task (`log_event_task`) to handle asynchronous log ingestion. * Ensured template validation in the search engine before indexing logs. * Configurable log types to support audit, job and others. * closes CERNDocumentServer/cds-rdm#361 Co-authored-by: Saksham Arora <[email protected]>
- Loading branch information
1 parent
6fd9c39
commit 8895059
Showing
28 changed files
with
971 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2025 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or modify it | ||
# under the terms of the MIT License; see LICENSE file for more details. | ||
|
||
"""Invenio OpenSearch Datastream Logging module.""" | ||
|
||
from __future__ import absolute_import, print_function | ||
|
||
from importlib_metadata import entry_points | ||
|
||
from . import config | ||
from .datastreams.managers import LogManager | ||
from .ext import InvenioLoggingBase | ||
|
||
|
||
class InvenioLoggingDatastreams(InvenioLoggingBase): | ||
"""Invenio-Logging extension for OpenSearch Datastreams.""" | ||
|
||
def init_app(self, app): | ||
"""Initialize app. | ||
:param app: An instance of :class:`~flask.Flask`. | ||
""" | ||
self.init_manager(app) | ||
self.load_builders() | ||
app.extensions["invenio-logging-datastreams"] = self | ||
|
||
def init_manager(self, app): | ||
"""Initialize the logging manager.""" | ||
manager = LogManager() | ||
self.manager = manager | ||
|
||
def load_builders(self): | ||
"""Load log builders from entry points.""" | ||
for ep in entry_points(group="invenio_logging.datastreams.builders"): | ||
builder_class = ep.load() | ||
self.manager.register_builder(ep.name, builder_class) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2025 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or modify it | ||
# under the terms of the MIT License; see LICENSE file for more details. | ||
|
||
"""Invenio module for datastream logging management.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2025 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or modify it | ||
# under the terms of the MIT License; see LICENSE file for more details. | ||
|
||
"""Invenio module for audit-logs management.""" | ||
|
||
from .backends import SearchAuditLogBackend | ||
from .builders import AuditLogBuilder | ||
|
||
__all__ = ( | ||
"SearchAuditLogBackend", | ||
"AuditLogBuilder", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2025 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or modify it | ||
# under the terms of the MIT License; see LICENSE file for more details. | ||
|
||
"""Invenio module for datastream logging management.""" | ||
|
||
from invenio_logging.datastreams.backends import SearchBackend | ||
|
||
|
||
class SearchAuditLogBackend(SearchBackend): | ||
"""Backend for storing audit logs in datastreams.""" | ||
|
||
def __init__(self): | ||
"""Initialize backend for audit logs.""" | ||
super().__init__(log_type="audit") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2025 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or modify it | ||
# under the terms of the MIT License; see LICENSE file for more details. | ||
|
||
"""Datastream Logging Builder module.""" | ||
|
||
from invenio_logging.datastreams.builders import LogBuilder | ||
|
||
from .backends import SearchAuditLogBackend | ||
|
||
|
||
class AuditLogBuilder(LogBuilder): | ||
"""Builder for structured audit logs.""" | ||
|
||
type = "audit" | ||
|
||
backend_cls = SearchAuditLogBackend | ||
|
||
@classmethod | ||
def build(cls, log_event): | ||
"""Build an audit log event context.""" | ||
return cls.validate(log_event) | ||
|
||
@classmethod | ||
def send(cls, log_event): | ||
"""Send log event using the backend.""" | ||
cls.backend_cls().send(log_event) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2025 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or modify it | ||
# under the terms of the MIT License; see LICENSE file for more details. | ||
|
||
"""Invenio module for datastream backends.""" | ||
|
||
|
||
from abc import ABC, abstractmethod | ||
|
||
from flask import current_app | ||
from invenio_search import current_search_client | ||
|
||
|
||
class LogBackend(ABC): | ||
"""Abstract base class for log backends.""" | ||
|
||
@abstractmethod | ||
def send(self, log_type, log_event): | ||
"""Send a log event to the backend.""" | ||
raise NotImplementedError() | ||
|
||
|
||
class SearchBackend(LogBackend): | ||
"""Generic backend for storing logs in datastreams index.""" | ||
|
||
def __init__(self, log_type): | ||
""" | ||
Initialize SearchBackend. | ||
:param log_type: Type of log (e.g., "audit", "task", "system"). | ||
""" | ||
self.client = current_search_client | ||
self.log_type = log_type | ||
self.template_name = "datastream-log-v1.0.0" | ||
self.index_name = f"logs-{log_type}" | ||
self.search_fields = [ | ||
"message", | ||
"event.action", | ||
"user.id", | ||
"user.email", | ||
"resource.id", | ||
] | ||
|
||
self._ensure_template_exists() | ||
|
||
def _ensure_template_exists(self): | ||
"""Check if required template exists, enforce if missing.""" | ||
index_prefix = current_app.config.get("SEARCH_INDEX_PREFIX", "") | ||
full_template_name = f"{index_prefix}{self.template_name}" | ||
if not self.client.indices.exists_index_template(name=full_template_name): | ||
raise RuntimeError( | ||
f"Required template '{self.template_name}' is missing. " | ||
"Ensure it is created before logging events." | ||
) | ||
|
||
def send(self, log_event): | ||
"""Send the log event to Search engine.""" | ||
try: | ||
index_prefix = current_app.config.get("SEARCH_INDEX_PREFIX", "") | ||
full_index_name = f"{index_prefix}{self.index_name}" | ||
self.client.index(index=full_index_name, body=log_event) | ||
|
||
except Exception as e: | ||
current_app.logger.error(f"Failed to send log Search engine: {e}") | ||
raise e | ||
|
||
def search(self, term=None, size=10): | ||
""" | ||
Search log events. | ||
:param size: Number of results to return. | ||
:return: List of log events that match the search term. | ||
""" | ||
try: | ||
index_prefix = current_app.config.get("SEARCH_INDEX_PREFIX", "") | ||
full_index_name = f"{index_prefix}{self.index_name}" | ||
|
||
search_query = { | ||
"size": size, | ||
"query": { | ||
"multi_match": { | ||
"query": term, | ||
"fields": self.search_fields, | ||
"operator": "and", | ||
} | ||
}, | ||
"sort": [{"timestamp": {"order": "desc"}}], | ||
} | ||
# TODO: add pagination? | ||
response = self.client.search(index=full_index_name, body=search_query) | ||
return [hit["_source"] for hit in response.get("hits", {}).get("hits", [])] | ||
|
||
except Exception as e: | ||
current_app.logger.error(f"Failed to search logs: {e}") | ||
raise e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2025 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or modify it | ||
# under the terms of the MIT License; see LICENSE file for more details. | ||
|
||
"""Datastream Logging Builder module.""" | ||
|
||
from abc import ABC, abstractmethod | ||
|
||
from marshmallow import ValidationError | ||
|
||
from .schema import LogEventSchema | ||
|
||
|
||
class LogBuilder(ABC): | ||
"""Base log builder for structured logging.""" | ||
|
||
context_generators = [] | ||
"""List of ContextGenerator to update log event context.""" | ||
|
||
type = "generic" | ||
"""Type of log event.""" | ||
|
||
schema = LogEventSchema() | ||
"""Schema for validating log events.""" | ||
|
||
@classmethod | ||
def validate(cls, log_event): | ||
"""Validate the log event against the schema.""" | ||
try: | ||
return cls.schema.load(log_event) | ||
except ValidationError as err: | ||
raise ValueError(f"Invalid log data: {err.messages}") | ||
|
||
@classmethod | ||
@abstractmethod | ||
def build(cls, **kwargs): | ||
"""Build log event context based on log type and additional context.""" | ||
raise NotImplementedError() | ||
|
||
@classmethod | ||
def resolve_context(cls, log_event): | ||
"""Resolve all references in the log context.""" | ||
for ctx_func in cls.context_generators: | ||
ctx_func(log_event) | ||
return log_event | ||
|
||
@classmethod | ||
def send(cls, log_event): | ||
"""Send log event to the log backend.""" | ||
raise NotImplementedError() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2025 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or modify it | ||
# under the terms of the MIT License; see LICENSE file for more details. | ||
|
||
"""Datastream log event.""" | ||
|
||
from datetime import datetime | ||
|
||
|
||
class LogEvent: | ||
"""Class to represent a structured log event.""" | ||
|
||
def __init__( | ||
self, event={}, resource={}, user={}, extra={}, timestamp=None, message=None | ||
): | ||
""" | ||
Create a LogEvent instance. | ||
:param event: Dict with `action` (required) and optional `description`. | ||
:param resource: Dict with `type`, `id`, and optional `metadata`. | ||
:param user: Dict with `id`, `email`, and optional `roles` (default: empty). | ||
:param extra: Additional metadata dictionary (default: empty). | ||
:param timestamp: Optional timestamp (defaults to now). | ||
:param message: Optional human-readable message. | ||
""" | ||
self.timestamp = timestamp or datetime.now().isoformat() | ||
self.event = event | ||
self.resource = resource | ||
self.user = user | ||
self.extra = extra | ||
self.message = message | ||
|
||
def to_dict(self): | ||
"""Convert the log event to a dictionary matching the schema.""" | ||
return { | ||
"timestamp": self.timestamp, | ||
"event": self.event, | ||
"message": self.message, | ||
"user": self.user, | ||
"resource": self.resource, | ||
"extra": self.extra, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2025 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or modify it | ||
# under the terms of the MIT License; see LICENSE file for more details. | ||
|
||
"""Invenio module for datastream logging management.""" | ||
|
||
from .log_event import LogEvent | ||
from .tasks import log_event_task | ||
|
||
|
||
class LogManager: | ||
"""Manager for handling logging builders.""" | ||
|
||
def __init__(self): | ||
"""Initialize the log manager with builders.""" | ||
self.builders = {} | ||
|
||
def log(self, log_type, log_event, async_mode=True): | ||
""" | ||
Log an event using the correct builder. | ||
:param log_type: Type of log (e.g., "audit", "job"). | ||
:param log_event: Instance of LogEvent. | ||
:param async_mode: If True, sends logs via Celery. | ||
""" | ||
if log_type not in self.builders: | ||
raise ValueError( | ||
f"No log builder found for type '{log_type}'. Available types: {self.builders.keys()}" | ||
) | ||
if not isinstance(log_event, LogEvent): | ||
raise ValueError("log_event must be an instance of LogEvent") | ||
|
||
log_data = log_event.to_dict() | ||
|
||
if async_mode: | ||
log_event_task.delay(log_type, log_data) | ||
else: | ||
log_builder = self.builders[log_type] | ||
log = log_builder.build(log_data) | ||
log_builder.send(log) | ||
|
||
def register_builder(self, log_type, builder_class): | ||
"""Register a log builder.""" | ||
self.builders[log_type] = builder_class |
Oops, something went wrong.