Skip to content

Commit 812909f

Browse files
authored
Meta injection (#366)
* add meta injection on backend * add docs * add test * update docs * update docs * fix meta from request None * fix test * fmt
1 parent 81c05b1 commit 812909f

File tree

4 files changed

+94
-5
lines changed

4 files changed

+94
-5
lines changed

django_celery_results/backends/database.py

+19-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33

44
from celery import maybe_signature
5-
from celery.backends.base import BaseDictBackend
5+
from celery.backends.base import BaseDictBackend, get_current_task
66
from celery.exceptions import ChordError
77
from celery.result import GroupResult, allow_join_result, result_from_tuple
88
from celery.utils.log import get_logger
@@ -99,6 +99,16 @@ def _get_extended_properties(self, request, traceback):
9999

100100
return extended_props
101101

102+
def _get_meta_from_request(self, request=None):
103+
"""
104+
Use the request or get_current_task to evaluate the `meta` attribute.
105+
106+
With this, is possible to assign arbitrary data in request.meta to be
107+
retrieve and stored on the TaskResult.
108+
"""
109+
request = request or getattr(get_current_task(), "request", None)
110+
return getattr(request, "meta", {})
111+
102112
def _store_result(
103113
self,
104114
task_id,
@@ -110,14 +120,19 @@ def _store_result(
110120
):
111121
"""Store return value and status of an executed task."""
112122
content_type, content_encoding, result = self.encode_content(result)
113-
_, _, meta = self.encode_content(
114-
{'children': self.current_task_children(request)}
123+
124+
meta = {
125+
**self._get_meta_from_request(request),
126+
"children": self.current_task_children(request),
127+
}
128+
_, _, encoded_meta = self.encode_content(
129+
meta,
115130
)
116131

117132
task_props = {
118133
'content_encoding': content_encoding,
119134
'content_type': content_type,
120-
'meta': meta,
135+
'meta': encoded_meta,
121136
'result': result,
122137
'status': status,
123138
'task_id': task_id,

docs/index.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ Contents
99

1010
.. toctree::
1111
:maxdepth: 1
12-
12+
1313
getting_started
14+
injecting_metadata
1415
copyright
1516

1617
.. toctree::

docs/injecting_metadata.rst

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
Injecting metadata
2+
===============
3+
4+
5+
To save arbitrary data on the field TaskResult.meta, the Celery Task Request must be manipulated as such:
6+
7+
.. code-block:: python
8+
9+
from celery import Celery
10+
11+
app = Celery('hello', broker='amqp://guest@localhost//')
12+
13+
@app.task(bind=True)
14+
def hello(task_instance):
15+
task_instance.request.meta = {'some_key': 'some_value'}
16+
task_instance.update_state(
17+
state='PROGRESS',
18+
meta='Task current result'
19+
)
20+
# If TaskResult is queried from DB at this momento it will yield
21+
# TaskResult(
22+
# result='Task current result',
23+
# meta={'some_key': 'some_value'} # some discrepancies apply as I didn't document the json parse and children data
24+
# )
25+
return 'hello world'
26+
27+
# After task is completed, if TaskResult is queried from DB at this momento it will yield
28+
# TaskResult(
29+
# result='hello world',
30+
# meta={'some_key': 'some_value'} # some discrepancies apply as I didn't document the json parse and children data
31+
# )
32+
33+
This way, the value of ``task_instance.request.meta`` will be stored on ``TaskResult.meta``.
34+
35+
Note that the `meta` arg in the method `update_state` is not really a metadata and it's not stored on ``TaskResult.meta``.
36+
This arg is used to save the CURRENT result of the task. So it's stored on ``TaskResult.result``.
37+
38+
It works this way because while a task is executing, the `TaskResult` is used really as current task state; holding information, temporarily, until the task completes.
39+
Subsequent calls to `update_state` will update the same `TaskResult`, overwriting what was there previously.
40+
Upon completion of the task, the results of the task are stored in the same TaskResult, overwriting the previous state of the task.
41+
So the return from the function is stored in ``TaskResult.result`` and ``TaskResult.status`` is set to 'SUCCESS' (or 'FAILURE').
42+

t/unit/backends/test_database.py

+31
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,37 @@ def test_backend__json_serialization__str_result__protocol_1(self):
517517
assert json.loads(tr.task_args) == ['a', 1, True]
518518
assert json.loads(tr.task_kwargs) == {'c': 6, 'd': 'e', 'f': False}
519519

520+
def test_backend__task_result_meta_injection(self):
521+
self.app.conf.result_serializer = 'json'
522+
self.app.conf.accept_content = {'pickle', 'json'}
523+
self.b = DatabaseBackend(app=self.app)
524+
525+
tid2 = uuid()
526+
request = self._create_request(
527+
task_id=tid2,
528+
name='my_task',
529+
args=[],
530+
kwargs={},
531+
task_protocol=1,
532+
)
533+
result = None
534+
535+
# inject request meta arbitrary data
536+
request.meta = {
537+
'key': 'value'
538+
}
539+
540+
self.b.mark_as_done(tid2, result, request=request)
541+
mindb = self.b.get_task_meta(tid2)
542+
543+
# check task meta
544+
assert mindb.get('result') is None
545+
assert mindb.get('task_name') == 'my_task'
546+
547+
# check task_result object
548+
tr = TaskResult.objects.get(task_id=tid2)
549+
assert json.loads(tr.meta) == {'key': 'value', 'children': []}
550+
520551
def xxx_backend(self):
521552
tid = uuid()
522553

0 commit comments

Comments
 (0)