Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No Built-in Way to Identify Tasks Scheduled by Celery Beat #854

Open
Bauerna opened this issue Mar 16, 2025 · 0 comments
Open

No Built-in Way to Identify Tasks Scheduled by Celery Beat #854

Bauerna opened this issue Mar 16, 2025 · 0 comments

Comments

@Bauerna
Copy link

Bauerna commented Mar 16, 2025

Description

Celery Beat currently does not provide any metadata in the task.request object to indicate that a task was scheduled by Beat. This makes it impossible to differentiate between a manually triggered task (.delay() or .apply_async()) and a Beat-scheduled task without adding a custom kwargs parameter.

Steps to Reproduce

Set up a simple task for your worker to handle.

# tasks/simple_tasks.py
from app.tasks.celery_app import celery


@celery.task(name="tasks.add")
def add(x: int, y: int):
    return x + y

Set up Celery Beat (your producer) with a schedule to trigger the task. Also, create a prerun.connect method so we can compare logging from the different sources.

# tasks/celery_app.py
import logging
from celery import Celery
from celery.signals import task_prerun
from celery.schedules import crontab

logger = logging.getLogger(__name__)

celery = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)

celery.conf.beat_schedule = {
    "add_every_minute": {
        "task": "tasks.add",
        "schedule": crontab(minute="*"),  # Runs every minute
        "args": (2, 3),
    }
}

celery.conf.timezone = "UTC"

@task_prerun.connect
def log_scheduled_task(task_id, task, **kwargs):
    logger.info(f"Request {task.request}")

In a python console, trigger the task manually.

from app.tasks.celery_app import celery
from app.tasks.simple_tasks import add
task = add.delay(2, 3)

Finally, wait for the producer to trigger the task.

Capture the logs of both and compare.

Expected Behavior

Celery Beat should provide some metadata (e.g., a celerybeat: true header or a specific delivery_info field) to distinguish Beat-scheduled tasks from manually triggered ones.

Actual Behavior

The request object is identical in both cases:

Request object from manual triggering:

[2025-03-16 12:44:47,780: INFO/ForkPoolWorker-8] Request <Context: {'lang': 'py', 'task': 'tasks.add', 'id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'parent_id': None, 'argsrepr': '(2, 3)', 'kwargsrepr': '{}', 'origin': 'XXX@XXX', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'reply_to': 'ba8db540-47a4-3eb9-bdd5-520017cccb2d', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '79f94ba2-d732-40c6-87ad-64446ee9c8e9'}, 'reply_to': 'ba8db540-47a4-3eb9-bdd5-520017cccb2d', 'correlation_id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'hostname': 'XXX@XXX', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}, 'args': [2, 3], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None, 'called_directly': False, 'headers': None}>

Request object from beat triggering:

[2025-03-16 12:45:00,004: INFO/ForkPoolWorker-8] Request <Context: {'lang': 'py', 'task': 'tasks.add', 'id': 'f456e265-a921-453d-9b92-82cff94335bc', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'f456e265-a921-453d-9b92-82cff94335bc', 'parent_id': None, 'argsrepr': '[2, 3]', 'kwargsrepr': '{}', 'origin': 'XXX@XXX', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': 'f456e265-a921-453d-9b92-82cff94335bc', 'reply_to': '15316467-2a05-31ec-bfe5-ed1d11a6b3d1', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '7b77e502-0fc2-4df0-974b-3d71a230cf8d'}, 'reply_to': '15316467-2a05-31ec-bfe5-ed1d11a6b3d1', 'correlation_id': 'f456e265-a921-453d-9b92-82cff94335bc', 'hostname': 'XXX@XXX', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}, 'args': [2, 3], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None, 'called_directly': False, 'headers': None}>

Proposed Solution

  • Add a custom header (celerybeat: true) when Celery Beat schedules a task.
  • Modify delivery_info to indicate that the task was scheduled by Beat.

Environment

Celery v5.4.0
Django Celery Beat v2.7.0
Broker: Redis
Backend: Redis

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant