Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastream: add basic OS logging #82

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ jobs:
uses: inveniosoftware/workflows/.github/workflows/tests-python.yml@master
with:
db-service: '[""]'
search-service: '[""]'
3 changes: 2 additions & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ recursive-include docs Makefile
recursive-include examples *.py
recursive-include examples *.sh
recursive-include invenio_logging *.html
recursive-include invenio_logging *.json
recursive-include misc *.py
recursive-include misc *.rst
recursive-include tests *.py
include .git-blame-ignore-revs
include .git-blame-ignore-revs
41 changes: 41 additions & 0 deletions invenio_logging/datastream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio OpenSearch Datastream Logging module."""

from __future__ import absolute_import, print_function

from importlib_metadata import entry_points

from . import config
from .datastreams.managers import LogManager
from .ext import InvenioLoggingBase


class InvenioLoggingDatastreams(InvenioLoggingBase):
"""Invenio-Logging extension for OpenSearch Datastreams."""

def init_app(self, app):
"""Initialize app.
:param app: An instance of :class:`~flask.Flask`.
"""
self.init_manager(app)
self.load_builders()
app.extensions["invenio-logging-datastreams"] = self

def init_manager(self, app):
"""Initialize the logging manager."""
manager = LogManager()
self.manager = manager

def load_builders(self):
"""Load log builders from entry points."""
for ep in entry_points(group="invenio_logging.datastreams.builders"):
builder_class = ep.load()
self.manager.register_builder(ep.name, builder_class)
9 changes: 9 additions & 0 deletions invenio_logging/datastreams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module for datastream logging management."""
17 changes: 17 additions & 0 deletions invenio_logging/datastreams/audit_logs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module for audit-logs management."""

from .backends import SearchAuditLogBackend
from .builders import AuditLogBuilder

__all__ = (
"SearchAuditLogBackend",
"AuditLogBuilder",
)
19 changes: 19 additions & 0 deletions invenio_logging/datastreams/audit_logs/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module for datastream logging management."""

from invenio_logging.datastreams.backends import SearchBackend


class SearchAuditLogBackend(SearchBackend):
"""Backend for storing audit logs in datastreams."""

def __init__(self):
"""Initialize backend for audit logs."""
super().__init__(log_type="audit")
37 changes: 37 additions & 0 deletions invenio_logging/datastreams/audit_logs/builders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Datastream Logging Builder module."""

from invenio_logging.datastreams.builders import LogBuilder

from .backends import SearchAuditLogBackend


class AuditLogBuilder(LogBuilder):
"""Builder for structured audit logs."""

type = "audit"

backend_cls = SearchAuditLogBackend

@classmethod
def build(cls, log_event):
"""Build an audit log event context."""
return cls.validate(log_event)

@classmethod
def send(cls, log_event):
"""Send log event using the backend."""
cls.backend_cls().send(log_event)

@classmethod
def search(cls, query):
"""Search logs."""
results = cls.backend_cls().search(query)
return cls.schema.load(results, many=True)
100 changes: 100 additions & 0 deletions invenio_logging/datastreams/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module for datastream backends."""


from abc import ABC, abstractmethod

from flask import current_app
from invenio_search import current_search_client


class LogBackend(ABC):
"""Abstract base class for log backends."""

@abstractmethod
def send(self, log_type, log_event):
"""Send a log event to the backend."""
raise NotImplementedError()


class SearchBackend(LogBackend):
"""Generic backend for storing logs in datastreams index."""

def __init__(self, log_type):
"""
Initialize SearchBackend.

:param log_type: Type of log (e.g., "audit", "task", "system").
"""
self.client = current_search_client
self.log_type = log_type
self.template_name = "datastream-log-v1.0.0"
self.index_name = f"logs-{log_type}"
self.search_fields = [
"message",
"event.action",
"user.id",
"user.email",
"resource.id",
"resource.parent.id",
]

self._ensure_template_exists()

def _ensure_template_exists(self):
"""Check if required template exists, enforce if missing."""
index_prefix = current_app.config.get("SEARCH_INDEX_PREFIX", "")
full_template_name = f"{index_prefix}{self.template_name}"
if not self.client.indices.exists_index_template(name=full_template_name):
raise RuntimeError(
f"Required template '{self.template_name}' is missing. "
"Ensure it is created before logging events."
)

def send(self, log_event):
"""Send the log event to Search engine."""
try:
index_prefix = current_app.config.get("SEARCH_INDEX_PREFIX", "")
full_index_name = f"{index_prefix}{self.index_name}"
self.client.index(index=full_index_name, body=log_event)

except Exception as e:
current_app.logger.error(f"Failed to send log Search engine: {e}")
raise e

def search(self, query=None, size=10):
"""
Search log events.

:param size: Number of results to return.
:return: List of log events that match the search query.
"""
try:
index_prefix = current_app.config.get("SEARCH_INDEX_PREFIX", "")
full_index_name = f"{index_prefix}{self.index_name}"

search_query = {
"size": size,
"query": {
"multi_match": {
"query": query,
"fields": self.search_fields,
"operator": "and",
}
},
"sort": [{"timestamp": {"order": "desc"}}],
}
# TODO: add pagination?
response = self.client.search(index=full_index_name, body=search_query)
return [hit["_source"] for hit in response.get("hits", {}).get("hits", [])]

except Exception as e:
current_app.logger.error(f"Failed to search logs: {e}")
raise e
58 changes: 58 additions & 0 deletions invenio_logging/datastreams/builders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Datastream Logging Builder module."""

from abc import ABC, abstractmethod

from marshmallow import ValidationError

from .schema import LogEventSchema


class LogBuilder(ABC):
"""Base log builder for structured logging."""

context_generators = []
"""List of ContextGenerator to update log event context."""

type = "generic"
"""Type of log event."""

schema = LogEventSchema()
"""Schema for validating log events."""

@classmethod
def validate(cls, log_event):
"""Validate the log event against the schema."""
try:
return cls.schema.dump(log_event)
except ValidationError as err:
raise ValueError(f"Invalid log data: {err.messages}")

@classmethod
@abstractmethod
def build(cls, **kwargs):
"""Build log event context based on log type and additional context."""
raise NotImplementedError()

@classmethod
def resolve_context(cls, log_event):
"""Resolve all references in the log context."""
for ctx_func in cls.context_generators:
ctx_func(log_event)
return log_event

@classmethod
def send(cls, log_event):
"""Send log event to the log backend."""
raise NotImplementedError()

def search(self, query):
"""Search logs."""
raise NotImplementedError()
47 changes: 47 additions & 0 deletions invenio_logging/datastreams/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Decorators for datastream logging."""

from invenio_logging.datastreams.log_event import LogEvent
from invenio_logging.proxies import current_datastream_logging_manager


def log_task():
"""Decorate log task events.

Useful for celery tasks that need to log events by passing down the log type and log data.
"""

def decorator(func):
def wrapper(*args, **kwargs):
log_type = kwargs.get(
"log_type", "TODO"
) # Should we have a default log type?
log_data = kwargs.get("log_data", {})

def _log_event(
message=None, event=None, user=None, resource=None, extra=None
):
"""Log event."""
log_data["message"] = message if message else log_data.get("message")
log_data["event"] = event if event else log_data.get("event")
log_data["user"] = user if user else log_data.get("user")
log_data["resource"] = (
resource if resource else log_data.get("resource")
)
log_data["extra"] = extra if extra else log_data.get("extra")
log_event = LogEvent(**log_data)
current_datastream_logging_manager.log(log_type, log_event)

kwargs["_log_event"] = _log_event
return func(*args, **kwargs)

return wrapper

return decorator
46 changes: 46 additions & 0 deletions invenio_logging/datastreams/log_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Datastream log event."""

from datetime import datetime


class LogEvent:
"""Class to represent a structured log event."""

def __init__(
self, event={}, resource={}, user={}, extra={}, timestamp=None, message=None
):
"""
Create a LogEvent instance.
:param event: Dict with `action` (required) and optional `description`.
:param resource: Dict with `type`, `id`, and optional `metadata`.
:param user: Dict with `id`, `email`, and optional `roles` (default: empty).
:param extra: Additional metadata dictionary (default: empty).
:param timestamp: Optional timestamp (defaults to now).
:param message: Optional human-readable message.
"""
self.timestamp = timestamp or datetime.now().isoformat()
self.event = event
self.resource = resource
self.user = user
self.extra = extra
self.message = message

def to_dict(self):
"""Convert the log event to a dictionary matching the schema."""
return {
"timestamp": self.timestamp,
"event": self.event,
"message": self.message,
"user": self.user,
"resource": self.resource,
"extra": self.extra,
}
Loading
Loading