Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logging: adds datastream logging #70

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions invenio_jobs/logging/__init__.py
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type should be application instead of job. To be used by any other logging produced by the app (that is not audit logs). Consider moving this somewhere more core than invenio-jobs

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Loggigng module for jobs."""

from .backends import SearchJobLogBackend
from .builders import JobLogBuilder

__all__ = (
"SearchJobLogBackend",
"JobLogBuilder",
)
18 changes: 18 additions & 0 deletions invenio_jobs/logging/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Loggigng module for jobs."""

from invenio_logging.datastreams.backends import SearchBackend


class SearchJobLogBackend(SearchBackend):
"""Backend for storing job logs in datastreams."""

def __init__(self):
"""Initialize backend for job logs."""
super().__init__(log_type="job")
36 changes: 36 additions & 0 deletions invenio_jobs/logging/builders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Loggigng module for jobs."""

from invenio_logging.datastreams.builders import LogBuilder

from .backends import SearchJobLogBackend


class JobLogBuilder(LogBuilder):
"""Builder for structured job logs."""

Copy link
Author

@jrcastro2 jrcastro2 Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope for now: Consider adding context_processors with information useful to debug, such as host, ipaddress ...

type = "job"

backend_cls = SearchJobLogBackend

@classmethod
def build(cls, log_event):
"""Build an job log event context."""
return cls.validate(log_event)

@classmethod
def send(cls, log_event):
"""Send log event using the backend."""
cls.backend_cls().send(log_event)

@classmethod
def search(cls, query):
"""Search logs."""
results = cls.backend_cls().search(query)
return cls.schema.load(results, many=True)
5 changes: 2 additions & 3 deletions invenio_jobs/resources/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,12 @@ def read(self):
def logs(self):
"""Read an item."""
identity = g.identity
hits = self.service.search(
hits = self.service.read_logs(
identity=identity,
job_id=resource_requestctx.view_args["job_id"],
run_id=resource_requestctx.view_args["run_id"],
params=resource_requestctx.args,
)
return hits.to_dict(), 200
return hits, 200

@request_view_args
@response_handler()
Expand Down
3 changes: 3 additions & 0 deletions invenio_jobs/services/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class TasksPermissionPolicy(BasePermissionPolicy):

can_search = [Administration(), SystemProcess()]
can_read = [Administration(), SystemProcess()]
can_read_logs = [Administration(), SystemProcess()]


class JobPermissionPolicy(BasePermissionPolicy):
Expand All @@ -26,6 +27,7 @@ class JobPermissionPolicy(BasePermissionPolicy):
can_search = [Administration(), SystemProcess()]
can_create = [Administration(), SystemProcess()]
can_read = [Administration(), SystemProcess()]
can_read_logs = [Administration(), SystemProcess()]
can_update = [Administration(), SystemProcess()]
can_delete = [Administration(), SystemProcess()]

Expand All @@ -39,6 +41,7 @@ class RunPermissionPolicy(BasePermissionPolicy):
can_search = [Administration(), SystemProcess()]
can_create = [Administration(), SystemProcess()]
can_read = [Administration(), SystemProcess()]
can_read_logs = [Administration(), SystemProcess()]
can_update = [Administration(), SystemProcess()]
can_delete = [Administration(), SystemProcess()]
can_stop = [Administration(), SystemProcess()]
49 changes: 48 additions & 1 deletion invenio_jobs/services/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
import uuid

import sqlalchemy as sa
from flask import current_app
from invenio_logging.datastreams.log_event import LogEvent
from invenio_logging.datastreams.uow import LoggingOp
from invenio_logging.proxies import current_datastream_logging_manager
from invenio_records_resources.records.systemfields import IndexField
from invenio_records_resources.services.base import LinksTemplate
from invenio_records_resources.services.base.utils import map_search_params
from invenio_records_resources.services.records import RecordService
Expand All @@ -21,6 +26,7 @@
TaskRevokeOp,
unit_of_work,
)
from invenio_search.engine import dsl

from invenio_jobs.tasks import execute_run

Expand Down Expand Up @@ -85,7 +91,21 @@ def create(self, identity, data, uow=None):
)

job = Job(**valid_data)

uow.session.add(job)
uow.session.flush() # Required to get the job.id

uow.register(ModelCommitOp(job))

log_event = LogEvent(
event={"action": "job.create"},
resource={"type": "job", "id": str(job.id)},
user={"id": str(identity.id)},
message=f"Job '{job.title}' created.",
)

uow.register(LoggingOp("audit", log_event))

return self.result_item(self, identity, job, links_tpl=self.links_item_tpl)

def search(self, identity, params):
Expand Down Expand Up @@ -215,6 +235,13 @@ def read(self, identity, job_id, run_id):
self, identity, run_record, links_tpl=self.links_item_tpl
)

def read_logs(
self, identity, job_id, run_id, params=None, search_preference=None, **kwargs
):
"""Retrieve logs for a run."""
search_result = current_datastream_logging_manager.search("job", str(run_id))
return search_result

@unit_of_work()
def create(self, identity, job_id, data, uow=None):
"""Create a run."""
Expand All @@ -236,11 +263,31 @@ def create(self, identity, job_id, data, uow=None):
status=RunStatusEnum.QUEUED,
**valid_data,
)
# We want to flush to gain access to the run.id
uow.session.flush()
uow.register(ModelCommitOp(run))

log_event = LogEvent(
event={"action": "job.run"},
resource={
"type": "job",
"id": str(job_id),
"parent": {"type": "run", "id": str(run.id)},
},
user={"id": str(identity.id)},
message=f"Run '{job.title}' created.",
)

uow.register(LoggingOp("job", log_event))

uow.register(
TaskOp.for_async_apply(
execute_run,
kwargs={"run_id": run.id},
kwargs={
"run_id": run.id,
"log_data": log_event.to_dict(),
"log_type": "job",
},
task_id=str(run.task_id),
queue=run.queue,
)
Expand Down
7 changes: 5 additions & 2 deletions invenio_jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ def update_run(run, **kwargs):


@shared_task(bind=True, ignore_result=True)
def execute_run(self, run_id, kwargs=None):
def execute_run(self, run_id, log_data={}, log_type=None):
"""Execute and manage a run state and task."""
run = Run.query.filter_by(id=run_id).one_or_none()
task = current_jobs.registry.get(run.job.task).task
update_run(run, status=RunStatusEnum.RUNNING, started_at=datetime.utcnow())
kwargs = run.args.copy()
kwargs["log_data"] = log_data
kwargs["log_type"] = log_type
try:
result = task.apply(kwargs=run.args, throw=True)
result = task.apply(kwargs=kwargs, throw=True)
except SystemExit as e:
update_run(
run,
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ invenio_base.api_finalize_app =
jobs = invenio_jobs.ext:finalize_app
invenio_celery.tasks =
jobs = invenio_jobs.tasks
invenio_logging.datastreams.builders =
job = invenio_jobs.logging:JobLogBuilder

[build_sphinx]
source-dir = docs/
Expand Down
Loading