Skip to content

Commit

Permalink
feat: add datastream logging support to invenio-logging
Browse files Browse the repository at this point in the history
* Introduced `LogManager` to handle structured logging via registered builders.
* Added `SearchBackend` to store logs in a search engine (OpenSearch/Elasticsearch).
* Implemented `LogEvent` for structured log representation.
* Enabled dynamic log builder registration via entry points.
* Integrated Celery task (`log_event_task`) to handle asynchronous log ingestion.
* Ensured template validation in the search engine before indexing logs.
* Configurable log types to support audit, job and others.
* closes CERNDocumentServer/cds-rdm#361

Co-authored-by: Saksham Arora <[email protected]>
  • Loading branch information
jrcastro2 and sakshamarora1 committed Mar 4, 2025
1 parent 6fd9c39 commit 38e403e
Show file tree
Hide file tree
Showing 28 changed files with 995 additions and 6 deletions.
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ jobs:
uses: inveniosoftware/workflows/.github/workflows/tests-python.yml@master
with:
db-service: '[""]'
search-service: '[""]'
3 changes: 2 additions & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ recursive-include docs Makefile
recursive-include examples *.py
recursive-include examples *.sh
recursive-include invenio_logging *.html
recursive-include invenio_logging *.json
recursive-include misc *.py
recursive-include misc *.rst
recursive-include tests *.py
include .git-blame-ignore-revs
include .git-blame-ignore-revs
41 changes: 41 additions & 0 deletions invenio_logging/datastream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio OpenSearch Datastream Logging module."""

from __future__ import absolute_import, print_function

from importlib_metadata import entry_points

from . import config
from .datastreams.managers import LogManager
from .ext import InvenioLoggingBase


class InvenioLoggingDatastreams(InvenioLoggingBase):
"""Invenio-Logging extension for OpenSearch Datastreams."""

def init_app(self, app):
"""Initialize app.
:param app: An instance of :class:`~flask.Flask`.
"""
self.init_manager(app)
self.load_builders()
app.extensions["invenio-logging-datastreams"] = self

def init_manager(self, app):
"""Initialize the logging manager."""
manager = LogManager()
self.manager = manager

def load_builders(self):
"""Load log builders from entry points."""
for ep in entry_points(group="invenio_logging.datastreams.builders"):
builder_class = ep.load()
self.manager.register_builder(ep.name, builder_class)
9 changes: 9 additions & 0 deletions invenio_logging/datastreams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module for datastream logging management."""
17 changes: 17 additions & 0 deletions invenio_logging/datastreams/audit_logs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module for audit-logs management."""

from .backends import SearchAuditLogBackend
from .builders import AuditLogBuilder

__all__ = (
"SearchAuditLogBackend",
"AuditLogBuilder",
)
19 changes: 19 additions & 0 deletions invenio_logging/datastreams/audit_logs/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module for datastream logging management."""

from invenio_logging.datastreams.backends import SearchBackend


class SearchAuditLogBackend(SearchBackend):
"""Backend for storing audit logs in datastreams."""

def __init__(self):
"""Initialize backend for audit logs."""
super().__init__(log_type="audit")
31 changes: 31 additions & 0 deletions invenio_logging/datastreams/audit_logs/builders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Datastream Logging Builder module."""

from invenio_logging.datastreams.builders import LogBuilder

from .backends import SearchAuditLogBackend


class AuditLogBuilder(LogBuilder):
"""Builder for structured audit logs."""

type = "audit"

backend_cls = SearchAuditLogBackend

@classmethod
def build(cls, log_event):
"""Build an audit log event context."""
return cls.validate(log_event)

@classmethod
def send(cls, log_event):
"""Send log event using the backend."""
cls.backend_cls().send(log_event)
99 changes: 99 additions & 0 deletions invenio_logging/datastreams/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module for datastream backends."""


from abc import ABC, abstractmethod

from flask import current_app
from invenio_search import current_search_client


class LogBackend(ABC):
"""Abstract base class for log backends."""

@abstractmethod
def send(self, log_type, log_event):
"""Send a log event to the backend."""
raise NotImplementedError()


class SearchBackend(LogBackend):
"""Generic backend for storing logs in datastreams index."""

def __init__(self, log_type):
"""
Initialize SearchBackend.
:param log_type: Type of log (e.g., "audit", "task", "system").
"""
self.client = current_search_client
self.log_type = log_type
self.template_name = "datastream-log-v1.0.0"
self.index_name = f"logs-{log_type}"
self.search_fields = [
"message",
"event.action",
"user.id",
"user.email",
"resource.id",
]

self._ensure_template_exists()

def _ensure_template_exists(self):
"""Check if required template exists, enforce if missing."""
index_prefix = current_app.config.get("SEARCH_INDEX_PREFIX", "")
full_template_name = f"{index_prefix}{self.template_name}"
if not self.client.indices.exists_index_template(name=full_template_name):
raise RuntimeError(
f"Required template '{self.template_name}' is missing. "
"Ensure it is created before logging events."
)

def send(self, log_event):
"""Send the log event to Search engine."""
try:
index_prefix = current_app.config.get("SEARCH_INDEX_PREFIX", "")
full_index_name = f"{index_prefix}{self.index_name}"
self.client.index(index=full_index_name, body=log_event)

except Exception as e:
current_app.logger.error(f"Failed to send log Search engine: {e}")
raise e

def search(self, term=None, size=10):
"""
Search log events.
:param size: Number of results to return.
:return: List of log events that match the search term.
"""
try:
index_prefix = current_app.config.get("SEARCH_INDEX_PREFIX", "")
full_index_name = f"{index_prefix}{self.index_name}"

search_query = {
"size": size,
"query": {
"multi_match": {
"query": term,
"fields": self.search_fields,
"operator": "and",
}
},
"sort": [{"timestamp": {"order": "desc"}}],
}
# TODO: add pagination?
response = self.client.search(index=full_index_name, body=search_query)
return [hit["_source"] for hit in response.get("hits", {}).get("hits", [])]

except Exception as e:
current_app.logger.error(f"Failed to search logs: {e}")
raise e
54 changes: 54 additions & 0 deletions invenio_logging/datastreams/builders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Datastream Logging Builder module."""

from abc import ABC, abstractmethod

from marshmallow import ValidationError

from .schema import LogEventSchema


class LogBuilder(ABC):
"""Base log builder for structured logging."""

context_generators = []
"""List of ContextGenerator to update log event context."""

type = "generic"
"""Type of log event."""

schema = LogEventSchema()
"""Schema for validating log events."""

@classmethod
def validate(cls, log_event):
"""Validate the log event against the schema."""
try:
return cls.schema.load(log_event)
except ValidationError as err:
raise ValueError(f"Invalid log data: {err.messages}")

@classmethod
@abstractmethod
def build(cls, **kwargs):
"""Build log event context based on log type and additional context."""
raise NotImplementedError()

@classmethod
def resolve_context(cls, log_event):
"""Resolve all references in the log context."""
for ctx_func in cls.context_generators:
ctx_func(log_event)
return log_event

@classmethod
def send(cls, log_event):
"""Send log event to the log backend."""
raise NotImplementedError()
46 changes: 46 additions & 0 deletions invenio_logging/datastreams/log_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Datastream log event."""

from datetime import datetime


class LogEvent:
"""Class to represent a structured log event."""

def __init__(
self, event={}, resource={}, user={}, extra={}, timestamp=None, message=None
):
"""
Create a LogEvent instance.
:param event: Dict with `action` (required) and optional `description`.
:param resource: Dict with `type`, `id`, and optional `metadata`.
:param user: Dict with `id`, `email`, and optional `roles` (default: empty).
:param extra: Additional metadata dictionary (default: empty).
:param timestamp: Optional timestamp (defaults to now).
:param message: Optional human-readable message.
"""
self.timestamp = timestamp or datetime.now().isoformat()
self.event = event
self.resource = resource
self.user = user
self.extra = extra
self.message = message

def to_dict(self):
"""Convert the log event to a dictionary matching the schema."""
return {
"timestamp": self.timestamp,
"event": self.event,
"message": self.message,
"user": self.user,
"resource": self.resource,
"extra": self.extra,
}
48 changes: 48 additions & 0 deletions invenio_logging/datastreams/managers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module for datastream logging management."""

from .log_event import LogEvent
from .tasks import log_event_task


class LogManager:
"""Manager for handling logging builders."""

def __init__(self):
"""Initialize the log manager with builders."""
self.builders = {}

def log(self, log_type, log_event, async_mode=True):
"""
Log an event using the correct builder.
:param log_type: Type of log (e.g., "audit", "job").
:param log_event: Instance of LogEvent.
:param async_mode: If True, sends logs via Celery.
"""
if log_type not in self.builders:
raise ValueError(
f"No log builder found for type '{log_type}'. Available types: {self.builders.keys()}"
)
if not isinstance(log_event, LogEvent):
raise ValueError("log_event must be an instance of LogEvent")

log_data = log_event.to_dict()

if async_mode:
log_event_task.delay(log_type, log_data)
else:
log_builder = self.builders[log_type]
log = log_builder.build(log_data)
log_builder.send(log)

def register_builder(self, log_type, builder_class):
"""Register a log builder."""
self.builders[log_type] = builder_class
Loading

0 comments on commit 38e403e

Please sign in to comment.