Skip to content

Commit

Permalink
logging: adds datastream logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jrcastro2 committed Mar 6, 2025
1 parent 39b0bd6 commit fc2a675
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 6 deletions.
16 changes: 16 additions & 0 deletions invenio_jobs/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Loggigng module for jobs."""

from .backends import SearchJobLogBackend
from .builders import JobLogBuilder

__all__ = (
"SearchJobLogBackend",
"JobLogBuilder",
)
18 changes: 18 additions & 0 deletions invenio_jobs/logging/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Loggigng module for jobs."""

from invenio_logging.datastreams.backends import SearchBackend


class SearchJobLogBackend(SearchBackend):
"""Backend for storing job logs in datastreams."""

def __init__(self):
"""Initialize backend for job logs."""
super().__init__(log_type="job")
36 changes: 36 additions & 0 deletions invenio_jobs/logging/builders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Loggigng module for jobs."""

from invenio_logging.datastreams.builders import LogBuilder

from .backends import SearchJobLogBackend


class JobLogBuilder(LogBuilder):
"""Builder for structured job logs."""

type = "job"

backend_cls = SearchJobLogBackend

@classmethod
def build(cls, log_event):
"""Build an job log event context."""
return cls.validate(log_event)

@classmethod
def send(cls, log_event):
"""Send log event using the backend."""
cls.backend_cls().send(log_event)

@classmethod
def search(cls, query):
"""Search logs."""
results = cls.backend_cls().search(query)
return cls.schema.load(results, many=True)
5 changes: 2 additions & 3 deletions invenio_jobs/resources/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,12 @@ def read(self):
def logs(self):
"""Read an item."""
identity = g.identity
hits = self.service.search(
hits = self.service.read_logs(
identity=identity,
job_id=resource_requestctx.view_args["job_id"],
run_id=resource_requestctx.view_args["run_id"],
params=resource_requestctx.args,
)
return hits.to_dict(), 200
return hits, 200

@request_view_args
@response_handler()
Expand Down
3 changes: 3 additions & 0 deletions invenio_jobs/services/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class TasksPermissionPolicy(BasePermissionPolicy):

can_search = [Administration(), SystemProcess()]
can_read = [Administration(), SystemProcess()]
can_read_logs = [Administration(), SystemProcess()]


class JobPermissionPolicy(BasePermissionPolicy):
Expand All @@ -26,6 +27,7 @@ class JobPermissionPolicy(BasePermissionPolicy):
can_search = [Administration(), SystemProcess()]
can_create = [Administration(), SystemProcess()]
can_read = [Administration(), SystemProcess()]
can_read_logs = [Administration(), SystemProcess()]
can_update = [Administration(), SystemProcess()]
can_delete = [Administration(), SystemProcess()]

Expand All @@ -39,6 +41,7 @@ class RunPermissionPolicy(BasePermissionPolicy):
can_search = [Administration(), SystemProcess()]
can_create = [Administration(), SystemProcess()]
can_read = [Administration(), SystemProcess()]
can_read_logs = [Administration(), SystemProcess()]
can_update = [Administration(), SystemProcess()]
can_delete = [Administration(), SystemProcess()]
can_stop = [Administration(), SystemProcess()]
49 changes: 48 additions & 1 deletion invenio_jobs/services/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
import uuid

import sqlalchemy as sa
from flask import current_app
from invenio_logging.datastreams.log_event import LogEvent
from invenio_logging.datastreams.uow import LoggingOp
from invenio_logging.proxies import current_datastream_logging_manager
from invenio_records_resources.records.systemfields import IndexField
from invenio_records_resources.services.base import LinksTemplate
from invenio_records_resources.services.base.utils import map_search_params
from invenio_records_resources.services.records import RecordService
Expand All @@ -21,6 +26,7 @@
TaskRevokeOp,
unit_of_work,
)
from invenio_search.engine import dsl

from invenio_jobs.tasks import execute_run

Expand Down Expand Up @@ -85,7 +91,21 @@ def create(self, identity, data, uow=None):
)

job = Job(**valid_data)

uow.session.add(job)
uow.session.flush() # Required to get the job.id

uow.register(ModelCommitOp(job))

log_event = LogEvent(
event={"action": "job.create"},
resource={"type": "job", "id": str(job.id)},
user={"id": str(identity.id)},
message=f"Job '{job.title}' created.",
)

uow.register(LoggingOp("audit", log_event))

return self.result_item(self, identity, job, links_tpl=self.links_item_tpl)

def search(self, identity, params):
Expand Down Expand Up @@ -215,6 +235,13 @@ def read(self, identity, job_id, run_id):
self, identity, run_record, links_tpl=self.links_item_tpl
)

def read_logs(
self, identity, job_id, run_id, params=None, search_preference=None, **kwargs
):
"""Retrieve logs for a run."""
search_result = current_datastream_logging_manager.search("job", str(run_id))
return search_result

@unit_of_work()
def create(self, identity, job_id, data, uow=None):
"""Create a run."""
Expand All @@ -236,11 +263,31 @@ def create(self, identity, job_id, data, uow=None):
status=RunStatusEnum.QUEUED,
**valid_data,
)
# We want to flush to gain access to the run.id
uow.session.flush()
uow.register(ModelCommitOp(run))

log_event = LogEvent(
event={"action": "job.run"},
resource={
"type": "job",
"id": str(job_id),
"parent": {"type": "run", "id": str(run.id)},
},
user={"id": str(identity.id)},
message=f"Run '{job.title}' created.",
)

uow.register(LoggingOp("job", log_event))

uow.register(
TaskOp.for_async_apply(
execute_run,
kwargs={"run_id": run.id},
kwargs={
"run_id": run.id,
"log_data": log_event.to_dict(),
"log_type": "job",
},
task_id=str(run.task_id),
queue=run.queue,
)
Expand Down
7 changes: 5 additions & 2 deletions invenio_jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ def update_run(run, **kwargs):


@shared_task(bind=True, ignore_result=True)
def execute_run(self, run_id, kwargs=None):
def execute_run(self, run_id, log_data={}, log_type=None):
"""Execute and manage a run state and task."""
run = Run.query.filter_by(id=run_id).one_or_none()
task = current_jobs.registry.get(run.job.task).task
update_run(run, status=RunStatusEnum.RUNNING, started_at=datetime.utcnow())
kwargs = run.args.copy()
kwargs["log_data"] = log_data
kwargs["log_type"] = log_type
try:
result = task.apply(kwargs=run.args, throw=True)
result = task.apply(kwargs=kwargs, throw=True)
except SystemExit as e:
update_run(
run,
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ invenio_base.api_finalize_app =
jobs = invenio_jobs.ext:finalize_app
invenio_celery.tasks =
jobs = invenio_jobs.tasks
invenio_logging.datastreams.builders =
job = invenio_jobs.logging:JobLogBuilder

[build_sphinx]
source-dir = docs/
Expand Down

0 comments on commit fc2a675

Please sign in to comment.