Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl: signature in periodic task #361

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Changes from 10 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ebe6975
feature: PeriodicTask by signature
SunnyCapt Sep 2, 2020
0b91eca
feature: Sign field
SunnyCapt Sep 3, 2020
967f200
Bugfix: id_rsa.pubpermissions & key files creating
SunnyCapt Sep 3, 2020
93431fb
Feature: callback in periodic task (support for option)
SunnyCapt Sep 3, 2020
f8e3f17
Fix: sign hash of serialized task signature
SunnyCapt Sep 7, 2020
6388db4
added tests of periodic tasks with task signatures
SunnyCapt Sep 8, 2020
7b02ec0
refactor of keys loading
SunnyCapt Sep 10, 2020
1d96917
Fix deprication warnings in tests and refactor key generating & loading
SunnyCapt Sep 10, 2020
834a432
Update authors file
SunnyCapt Sep 10, 2020
18b0290
Merge branch 'master' into master
SunnyCapt Oct 25, 2020
89cb908
fix imports
SunnyCapt Oct 26, 2020
e7fdac4
useless commit
SunnyCapt Oct 26, 2020
9b0c91e
Merge remote-tracking branch 'celery/master' into master
SunnyCapt Dec 6, 2020
7c9479c
call some functions before calling real apply_async
SunnyCapt Apr 20, 2021
28d6391
added comments about app.conf.call_before_run_periodic_task
SunnyCapt Apr 21, 2021
c85b77d
django_celery_beat.schedulers.DatabaseScheduler.apply_async refactoring
SunnyCapt Apr 21, 2021
bd74d99
Merge branch 'master' of https://github.com/celery/django-celery-beat
SunnyCapt Apr 21, 2021
7bfca12
fix schedulers.py imports
SunnyCapt Apr 21, 2021
02778de
fix performing an action before starting a periodic task
SunnyCapt Apr 21, 2021
2604ab9
fix tests
SunnyCapt Apr 21, 2021
f2ec316
added readable info about serialized task
SunnyCapt Apr 22, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -92,3 +92,4 @@ Wes Winham <[email protected]>
Williams Mendez <[email protected]>
WoLpH <[email protected]>
dongweiming <[email protected]>
SunnyCapt <[email protected]>
22 changes: 15 additions & 7 deletions django_celery_beat/admin.py
Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@ class PeriodicTaskForm(forms.ModelForm):
required=False,
max_length=200,
)
# todo: add field for task_signature

class Meta:
"""Form metadata."""
@@ -198,19 +199,26 @@ def toggle_tasks(self, request, queryset):

def run_tasks(self, request, queryset):
self.celery_app.loader.import_default_modules()
tasks = [(self.celery_app.tasks.get(task.task),
loads(task.args),
loads(task.kwargs),
task.queue)
for task in queryset]
tasks = [
(
task.get_verified_task_signature(raise_exceptions=False)
if task.task_signature is not None
else self.celery_app.tasks.get(task.task),
loads(task.args),
loads(task.kwargs),
task.queue
) for task in queryset
]

if any(t[0] is None for t in tasks):
for i, t in enumerate(tasks):
if t[0] is None:
break

# variable "i" will be set because list "tasks" is not empty
not_found_task_name = queryset[i].task
not_found_task_name = queryset[i].get_verified_task_signature(raise_exceptions=False).name \
if queryset[i].task_signature is not None and queryset[i].get_verified_task_signature(
raise_exceptions=False) is not None else queryset[i].task

self.message_user(
request,
@@ -222,7 +230,7 @@ def run_tasks(self, request, queryset):
task_ids = [task.apply_async(args=args, kwargs=kwargs, queue=queue)
if queue and len(queue)
else task.apply_async(args=args, kwargs=kwargs)
for task, args, kwargs, queue in tasks]
for task, args, kwargs, queue in tasks if task is not None]
tasks_run = len(task_ids)
self.message_user(
request,
23 changes: 23 additions & 0 deletions django_celery_beat/migrations/0015_periodictask_task_signature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 2.2.16 on 2020-09-01 10:17

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_celery_beat', '0014_remove_clockedschedule_enabled'),
]

operations = [
migrations.AddField(
model_name='periodictask',
name='task_signature',
field=models.BinaryField(help_text="Serialized `celery.canvas.Signature` type's object of task (or chain, group, etc.) got by https://pypi.org/project/dill/", null=True),
),
migrations.AddField(
model_name='periodictask',
name='task_signature_sign',
field=models.CharField(help_text="Signature (in hex) of serialized `celery.canvas.Signature` type's object (see task_signature field)", max_length=1028, null=True),
),
]
23 changes: 23 additions & 0 deletions django_celery_beat/migrations/0016_auto_20200903_1356.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 2.2.16 on 2020-09-03 13:56

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_celery_beat', '0015_periodictask_task_signature'),
]

operations = [
migrations.AddField(
model_name='periodictask',
name='callback_signature',
field=models.BinaryField(help_text="Serialized `celery.canvas.Signature` type's callback task got by https://pypi.org/project/dill/ (use as link arg in `.apply_async` method)", null=True),
),
migrations.AddField(
model_name='periodictask',
name='callback_signature_sign',
field=models.CharField(help_text="Signature (in hex) of serialized `celery.canvas.Signature` type's callback task (see callback_signature field)", max_length=1028, null=True),
),
]
79 changes: 75 additions & 4 deletions django_celery_beat/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Database models."""
from datetime import timedelta

import dill
import timezone_field
from celery import schedules, current_app
from celery.utils.log import get_logger
from django.conf import settings
from django.core.exceptions import MultipleObjectsReturned, ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
@@ -11,10 +13,11 @@
from django.utils.translation import gettext_lazy as _

from . import managers, validators
from .tzcrontab import TzAwareCrontab
from .utils import make_aware, now
from .clockedschedule import clocked
from .tzcrontab import TzAwareCrontab
from .utils import make_aware, now, verify_task_signature

logger = get_logger(__name__)

DAYS = 'days'
HOURS = 'hours'
@@ -388,6 +391,27 @@ class PeriodicTask(models.Model):
help_text=_('The Name of the Celery Task that Should be Run. '
'(Example: "proj.tasks.import_contacts")'),
)
task_signature = models.BinaryField(
null=True,
help_text='Serialized `celery.canvas.Signature` type\'s object of task (or chain, group, '
'etc.) got by https://pypi.org/project/dill/'
)
callback_signature = models.BinaryField(
null=True,
help_text='Serialized `celery.canvas.Signature` type\'s callback task got '
'by https://pypi.org/project/dill/ (use as link arg in `.apply_async` method)'
) # todo: add support for error_callback (link_error option)
task_signature_sign = models.CharField(
null=True,
max_length=1028,
help_text='Signature (in hex) of serialized `celery.canvas.Signature` type\'s object (see task_signature field)'
)
callback_signature_sign = models.CharField(
null=True,
max_length=1028,
help_text='Signature (in hex) of serialized `celery.canvas.Signature` type\'s callback '
'task (see callback_signature field)'
)

# You can only set ONE of the following schedule FK's
# TODO: Redo this as a GenericForeignKey
@@ -548,8 +572,8 @@ def validate_unique(self, *args, **kwargs):
'must be set.'
)

err_msg = 'Only one of clocked, interval, crontab, '\
'or solar must be set'
err_msg = 'Only one of clocked, interval, crontab, ' \
'or solar must be set'
if len(selected_schedule_types) > 1:
error_info = {}
for selected_schedule_type in selected_schedule_types:
@@ -578,6 +602,53 @@ def _clean_expires(self):
_('Only one can be set, in expires and expire_seconds')
)

def get_verified_task_signature(self, raise_exceptions=True):
try:
self.get_verified_callback_signature()
except ValueError as e:
err = 'Wrong callback: {} [{}]'.format(e, self)
logger.error(err)
if raise_exceptions:
raise ValueError(err)
return None

return self._get_verified_obj_signature('task', raise_exceptions)

def get_verified_callback_signature(self, raise_exceptions=True):
return self._get_verified_obj_signature('callback', raise_exceptions)

def _get_verified_obj_signature(self, object_name, raise_exceptions):
assert object_name in ('task', 'callback'), ValueError('Unknown object_name')

obj_signarute = getattr(self, '{}_signature'.format(object_name), None)
obj_signarute_sign = getattr(self, '{}_signature_sign'.format(object_name), None)

if obj_signarute is None:
return None

if obj_signarute_sign is None:
err = 'Not found `{}_signature_sign` for `{}` (use django_celery_be' \
'at.utils.sign to sign). Task disabled.'.format(object_name, self)
self.enabled = False
self.save(update_fields=['enabled'])
logger.error(err)
if raise_exceptions:
raise ValueError(err)
return None

obj_signarute = bytes(obj_signarute)

if not verify_task_signature(obj_signarute, obj_signarute_sign):
err = 'Wrong sign for `{}`. Task disabled.'.format(self)
self.enabled = False
self.save(update_fields=['enabled'])
logger.error(err)
if raise_exceptions:
raise ValueError(err)
return None

return dill.loads(obj_signarute)

@property
def expires_(self):
return self.expires or self.expire_seconds
52 changes: 47 additions & 5 deletions django_celery_beat/schedulers.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
"""Beat Scheduler Implementation."""
from __future__ import absolute_import, unicode_literals

import datetime
import logging
import math

from multiprocessing.util import Finalize
import sys

from celery import current_app
from celery import schedules
from celery.beat import Scheduler, ScheduleEntry

# noinspection PyProtectedMember
from celery.beat import Scheduler, ScheduleEntry, SchedulingError, BeatLazyFunc
# noinspection PyUnresolvedReferences
from celery.five import (
items, monotonic, python_2_unicode_compatible,
reraise, values
)
from celery.utils.encoding import safe_str, safe_repr
from celery.utils.log import get_logger
from celery.utils.time import maybe_make_aware
from kombu.utils.encoding import safe_str, safe_repr
from kombu.utils.json import dumps, loads

from django.conf import settings
# noinspection PyProtectedMember
from django.db import transaction, close_old_connections
from django.db.utils import DatabaseError, InterfaceError
from django.core.exceptions import ObjectDoesNotExist
# noinspection PyUnresolvedReferences
from multiprocessing.util import Finalize

from .models import (
PeriodicTask, PeriodicTasks,
@@ -56,6 +66,8 @@ def __init__(self, model, app=None):
self.app = app or current_app._get_current_object()
self.name = model.name
self.task = model.task
self.task_signature = model.get_verified_task_signature()

try:
self.schedule = model.schedule
except model.DoesNotExist:
@@ -74,7 +86,10 @@ def __init__(self, model, app=None):
)
self._disable(model)

self.options = {}
self.options = {
'link': model.get_verified_callback_signature()
}

for option in ['queue', 'exchange', 'routing_key', 'priority']:
value = getattr(model, option)
if value is None:
@@ -368,3 +383,30 @@ def schedule(self):
repr(entry) for entry in self._schedule.values()),
)
return self._schedule

def apply_async(self, entry, producer=None, advance=True, **kwargs):
# Update time-stamps and run counts before we actually execute,
# so we have that done if an exception is raised (doesn't schedule
# forever.)
entry = self.reserve(entry) if advance else entry
task = entry.task_signature if entry.task_signature is not None else self.app.tasks.get(entry.task)

try:
entry_args = [v() if isinstance(v, BeatLazyFunc) else v for v in (entry.args or [])]
entry_kwargs = {k: v() if isinstance(v, BeatLazyFunc) else v for k, v in entry.kwargs.items()}
if task:
return task.apply_async(entry_args, entry_kwargs,
producer=producer,
**entry.options)
else:
return self.send_task(entry.task, entry_args, entry_kwargs,
producer=producer,
**entry.options)
except Exception as exc: # pylint: disable=broad-except
reraise(SchedulingError, SchedulingError(
"Couldn't apply scheduled task {0.name}: {exc}".format(
entry, exc=exc)), sys.exc_info()[2])
finally:
self._tasks_since_sync += 1
if self.should_sync():
self._do_sync()
83 changes: 83 additions & 0 deletions django_celery_beat/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
"""Utilities."""
import os
from hashlib import sha256

import Crypto.PublicKey.RSA as RSA
# -- XXX This module must not use translation as that causes
# -- a recursive loader import!
from celery.utils.log import get_logger
from django.conf import settings
from django.utils import timezone
from functools import lru_cache

is_aware = timezone.is_aware
# celery schedstate return None will make it not work
@@ -11,6 +17,68 @@
# see Issue #222
now_localtime = getattr(timezone, 'template_localtime', timezone.localtime)

logger = get_logger(__name__)


def generate_keys(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we need rsa key?

Copy link
Author

@SunnyCapt SunnyCapt Mar 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's need to sign serialized objects so as not to execute any python code that may appear in the database (execute only signed serilized code which can be signed and added only by main proc)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why noy json serializer? it's default in celery

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON serializer should be the default task serializer in the integration packages as well

Copy link
Author

@SunnyCapt SunnyCapt Apr 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I'll rewrite (I used dill because it was faster and easier to implement)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dill is ok for pickle, but pickle is not the default serializer. so rewriting this with json would be great

private_key_path=os.environ.get('DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH', './id_rsa'),
public_key_path=os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub')
):
private_key = RSA.generate(4096, os.urandom)
public_key = private_key.publickey()

if os.path.exists(private_key_path):
if input('Do you realy want to rewrite `{}` key file? [y/n]: '.format(private_key_path)) != 'y':
return

if os.path.exists(public_key_path):
if input('Do you realy want to rewrite `{}` key file? [y/n]: '.format(public_key_path)) != 'y':
return

open(private_key_path, 'wb').close()
os.chmod(private_key_path, 0o600)
with open(private_key_path, 'wb') as id_rsa:
id_rsa.write(private_key.exportKey())

open(public_key_path, 'wb').close()
os.chmod(public_key_path, 0o644)
with open(public_key_path, 'wb') as id_rsa_pub:
id_rsa_pub.write(public_key.exportKey())


@lru_cache(maxsize=None)
def _load_private_key():
private_key_path = os.environ.get('DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH', './id_rsa')

if os.path.exists(private_key_path):
with open(private_key_path, 'rb') as id_rsa:
private_key = RSA.importKey(id_rsa.read())
return private_key

raise FileNotFoundError(
'Private key not found. Use `django_celery_beat.utils.generate_keys` '
'to generate new RSA keys... [{}]'.format(private_key_path)
)


@lru_cache(maxsize=None)
def _load_public_key():
public_key_path = os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub')

if os.path.exists(public_key_path):
with open(public_key_path, 'rb') as id_rsa_pub:
_public_key = RSA.importKey(id_rsa_pub.read())
return _public_key

raise FileNotFoundError(
'Private key not found. Use `django_celery_beat.utils.generate_keys` '
'to generate new RSA keys... [{}]'.format(public_key_path)
)


def _load_keys():
return _load_private_key(), _load_public_key()


def make_aware(value):
"""Force datatime to have timezone information."""
@@ -46,3 +114,18 @@ def is_database_scheduler(scheduler):
scheduler == 'django'
or issubclass(symbol_by_name(scheduler), DatabaseScheduler)
)


def sign_task_signature(serialized_task_signature):
"""Sign the bytes data to protect against database changes and return signature in hex"""
private_key = _load_private_key()

assert isinstance(serialized_task_signature, bytes), ValueError('Data must be bytes')
return hex(private_key.sign(sha256(serialized_task_signature).hexdigest().encode(), '')[0])


def verify_task_signature(serialized_task_signature, sign_in_hex):
"""Check the signature and return True if it is correct for the specified data"""
public_key = _load_public_key()

return public_key.verify(sha256(serialized_task_signature).hexdigest().encode(), (int(sign_in_hex, 16),))
3 changes: 3 additions & 0 deletions requirements/default.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
django-timezone-field>=4.0,<5.0
python-crontab>=2.3.4
dill
pycrypto
django-appconf
4 changes: 2 additions & 2 deletions requirements/runtime.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
celery>=4.4,<6.0
Django>=2.2
celery>=4.4.7,<6.0
Django>=2.2
6 changes: 3 additions & 3 deletions t/unit/test_admin.py
Original file line number Diff line number Diff line change
@@ -81,7 +81,7 @@ class ValidateUniqueTests(TestCase):
def test_validate_unique_raises_if_schedule_not_set(self):
with self.assertRaises(ValidationError) as cm:
PeriodicTask(name='task0').validate_unique()
self.assertEquals(
self.assertEqual(
cm.exception.args[0],
'One of clocked, interval, crontab, or solar must be set.',
)
@@ -103,9 +103,9 @@ def test_validate_unique_raises_for_multiple_schedules(self):
with self.assertRaises(ValidationError) as cm:
PeriodicTask(name=name, **options_dict).validate_unique()
errors = cm.exception.args[0]
self.assertEquals(errors.keys(), options_dict.keys())
self.assertEqual(errors.keys(), options_dict.keys())
for error_msg in errors.values():
self.assertEquals(error_msg, [expected_error_msg])
self.assertEqual(error_msg, [expected_error_msg])

def test_validate_unique_not_raises(self):
PeriodicTask(crontab=CrontabSchedule()).validate_unique()
68 changes: 63 additions & 5 deletions t/unit/test_models.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import os
import random
import string

from django.test import TestCase, override_settings
import dill
import timezone_field
from celery.canvas import Signature
from django.apps import apps
from django.db.migrations.state import ProjectState
from django.db.migrations.autodetector import MigrationAutodetector
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.questioner import NonInteractiveMigrationQuestioner

import timezone_field
from django.db.migrations.state import ProjectState
from django.test import TestCase, override_settings

from django_celery_beat import migrations as beat_migrations
from django_celery_beat.models import crontab_schedule_celery_timezone
from django_celery_beat.models import crontab_schedule_celery_timezone, PeriodicTask, IntervalSchedule
from django_celery_beat.utils import sign_task_signature, generate_keys


class MigrationTests(TestCase):
@@ -68,3 +72,57 @@ def test_default_timezone_without_settings_config(self):
@override_settings(CELERY_TIMEZONE=FIRST_VALID_TIMEZONE)
def test_default_timezone_with_settings_config(self):
assert crontab_schedule_celery_timezone() == self.FIRST_VALID_TIMEZONE


class PeriodicTaskSignatureTestCase(TestCase):
test_private_key_path = './test_id_rsa'
test_public_key_path = './test_id_rsa.pub'

@classmethod
def setUpClass(cls):
super(PeriodicTaskSignatureTestCase, cls).setUpClass()

os.environ.update({
'DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH': cls.test_private_key_path,
'DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH': cls.test_public_key_path,
})

generate_keys(
private_key_path=cls.test_private_key_path,
public_key_path=cls.test_public_key_path
)

def test_periodic_task_with_signatures(self):
empty_task_signature = Signature(task='empty_task')

serialized_empty_task = dill.dumps(empty_task_signature)
s = sign_task_signature(serialized_empty_task)

interval, _ = IntervalSchedule.objects.get_or_create(
every=2,
period=IntervalSchedule.MINUTES
)
periodic_task = PeriodicTask.objects.create(
name='test-' + ''.join(random.choices(string.ascii_letters, k=20)),
task_signature=serialized_empty_task,
task_signature_sign=s,
callback_signature=serialized_empty_task,
callback_signature_sign=s,
interval=interval,
)

task_signature = periodic_task.get_verified_callback_signature(raise_exceptions=False)
callback_signature = periodic_task.get_verified_callback_signature(raise_exceptions=False)

self.assertEqual(empty_task_signature, task_signature)
self.assertEqual(empty_task_signature, callback_signature)

@classmethod
def tearDownClass(cls) -> None:
super(PeriodicTaskSignatureTestCase, cls).tearDownClass()

if os.path.exists(cls.test_private_key_path):
os.remove(cls.test_private_key_path)

if os.path.exists(cls.test_public_key_path):
os.remove(cls.test_public_key_path)
46 changes: 46 additions & 0 deletions t/unit/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
from unittest import TestCase

import dill
from celery.canvas import Signature

from django_celery_beat.utils import sign_task_signature, verify_task_signature, generate_keys


class UtilsTests(TestCase):
test_private_key_path = './test_id_rsa'
test_public_key_path = './test_id_rsa.pub'

@classmethod
def setUpClass(cls) -> None:
super(UtilsTests, cls).setUpClass()

os.environ.update({
'DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH': cls.test_private_key_path,
'DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH': cls.test_public_key_path,
})

generate_keys(
private_key_path=cls.test_private_key_path,
public_key_path=cls.test_public_key_path
)

def test_sign_verify_task_signature(self):
empty_task_signature = Signature()

serialized_empty_task = dill.dumps(empty_task_signature)
s = sign_task_signature(serialized_empty_task)

is_valid = verify_task_signature(serialized_empty_task, s)

self.assertTrue(is_valid)

@classmethod
def tearDownClass(cls) -> None:
super(UtilsTests, cls).tearDownClass()

if os.path.exists(cls.test_private_key_path):
os.remove(cls.test_private_key_path)

if os.path.exists(cls.test_public_key_path):
os.remove(cls.test_public_key_path)