Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Meta injection #366

Merged
merged 8 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions django_celery_results/backends/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json

from celery import maybe_signature
from celery.backends.base import BaseDictBackend
from celery.backends.base import BaseDictBackend, get_current_task
from celery.exceptions import ChordError
from celery.result import GroupResult, allow_join_result, result_from_tuple
from celery.utils.log import get_logger
Expand Down Expand Up @@ -99,6 +99,17 @@ def _get_extended_properties(self, request, traceback):

return extended_props

def _get_meta_from_request(self, request=None):
"""
Use the request or get_current_task to evaluate the `meta` attribute.

With this, is possible to assign arbitrary data in request.meta to be
retrieve and stored on the TaskResult.
"""
request = request or getattr(get_current_task(), "request", None)
if request:
return getattr(request, "meta", {})

def _store_result(
self,
task_id,
Expand All @@ -110,14 +121,19 @@ def _store_result(
):
"""Store return value and status of an executed task."""
content_type, content_encoding, result = self.encode_content(result)
_, _, meta = self.encode_content(
{'children': self.current_task_children(request)}

meta = {
**self._get_meta_from_request(request),
"children": self.current_task_children(request),
}
_, _, encoded_meta = self.encode_content(
meta,
)

task_props = {
'content_encoding': content_encoding,
'content_type': content_type,
'meta': meta,
'meta': encoded_meta,
'result': result,
'status': status,
'task_id': task_id,
Expand Down
3 changes: 2 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ Contents

.. toctree::
:maxdepth: 1

getting_started
injecting_metadata
copyright

.. toctree::
Expand Down
42 changes: 42 additions & 0 deletions docs/injecting_metadata.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
Injecting metadata
===============


To save arbitrary data on the field TaskResult.meta, the Celery Task Request must be manipulated as such:

.. code-block:: python

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task(bind=True)
def hello(task_instance):
task_instance.request.meta = {'some_key': 'some_value'}
task_instance.update_state(
state='PROGRESS',
meta='Task current result'
)
# If TaskResult is queried from DB at this momento it will yield
# TaskResult(
# result='Task current result',
# meta={'some_key': 'some_value'} # some discrepancies apply as I didn't document the json parse and children data
# )
return 'hello world'

# After task is completed, if TaskResult is queried from DB at this momento it will yield
# TaskResult(
# result='hello world',
# meta={'some_key': 'some_value'} # some discrepancies apply as I didn't document the json parse and children data
# )

This way, the value of ``task_instance.request.meta`` will be stored on ``TaskResult.meta``.

Note that the `meta` arg in the method `update_state` is not really a metadata and it's not stored on ``TaskResult.meta``.
This arg is used to save the CURRENT result of the task. So it's stored on ``TaskResult.result``.

It works this way because while a task is executing, the `TaskResult` is used really as current task state; holding information, temporarily, until the task completes.
Subsequent calls to `update_state` will update the same `TaskResult`, overwriting what was there previously.
Upon completion of the task, the results of the task are stored in the same TaskResult, overwriting the previous state of the task.
So the return from the function is stored in ``TaskResult.result`` and ``TaskResult.status`` is set to 'SUCCESS' (or 'FAILURE').

32 changes: 32 additions & 0 deletions t/unit/backends/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,38 @@ def test_backend__json_serialization__str_result__protocol_1(self):
assert json.loads(tr.task_args) == ['a', 1, True]
assert json.loads(tr.task_kwargs) == {'c': 6, 'd': 'e', 'f': False}

def test_backend__task_result_meta_injection(self):
self.app.conf.result_serializer = 'json'
self.app.conf.accept_content = {'pickle', 'json'}
self.b = DatabaseBackend(app=self.app)

tid2 = uuid()
request = self._create_request(
task_id=tid2,
name='my_task',
args=[],
kwargs={},
task_protocol=1,
)
result = None

# inject request meta arbitrary data
request.meta = {
'some_key': 'some_value'
}

self.b.mark_as_done(tid2, result, request=request)
mindb = self.b.get_task_meta(tid2)

# check task meta
assert mindb.get('result') is None
assert mindb.get('task_name') == 'my_task'
assert mindb.get('meta') == {'some_key': 'some_value'}

# check task_result object
tr = TaskResult.objects.get(task_id=tid2)
assert json.loads(tr.meta) == {}

def xxx_backend(self):
tid = uuid()

Expand Down