Skip to content

fix: allow deadline callbacks within the same dag module#66702

Open
dondaum wants to merge 1 commit into
apache:mainfrom
dondaum:fix/deadline-callbacks-in-dag-modules
Open

fix: allow deadline callbacks within the same dag module#66702
dondaum wants to merge 1 commit into
apache:mainfrom
dondaum:fix/deadline-callbacks-in-dag-modules

Conversation

@dondaum
Copy link
Copy Markdown
Contributor

@dondaum dondaum commented May 11, 2026

This enables the use of callables from the same Dag module for deadline alert callbacks. Airflow's Dag serialization adjusts the name of the Dag module during parsing to ensure its uniqueness. Consequently, once the callable is part of the same module, the module path can no longer be used.

Currently, this works only if you define the Callable in a separate module and subsequently import it into the Dag module—as described here.

How to reproduce

# sync_deadline_test.py
from datetime import datetime, timedelta
from airflow.sdk import SyncCallback, DAG, DeadlineAlert, DeadlineReference, task
from airflow.providers.standard.operators.empty import EmptyOperator


def run_sync_callback(text: str):
    print(text)


with DAG(
    dag_id="custom_callback_deadline_alert_sync",
    deadline=DeadlineAlert(
        reference=DeadlineReference.DAGRUN_QUEUED_AT,
        interval=timedelta(seconds=10),
        callback=SyncCallback(
            run_sync_callback,
            kwargs={
                "text": "🚨 Dag {{ dag_run.dag_id }} missed deadline at {{ deadline.deadline_time }}. DagRun: {{ dag_run }}"
            },
        ),
    ),
):

    c = EmptyOperator(task_id="example_task")

    @task()
    def wait():
        import time
        time.sleep(60)

    
    c >> wait()

Logs

2026-05-05 10:22:17.561532+00:00 [error    ] Callback execution failed      [callback_runner] callback_kwargs={'text': '🚨 Dag {{ dag_run.dag_id }} missed deadline at {{ deadline.deadline_time }}. DagRun: {{ dag_run }}', 'context': {'dag_run': {'dag_run_id': 'manual__2026-05-05T10:22:06.225020+00:00', 'dag_id': 'custom_callback_deadline_alert_sync', 'logical_date': '2026-05-05T10:22:06Z', 'queued_at': '2026-05-05T10:22:06.233422Z', 'start_date': '2026-05-05T10:22:06.267176Z', 'end_date': None, 'duration': None, 'data_interval_start': '2026-05-05T10:22:06Z', 'data_interval_end': '2026-05-05T10:22:06Z', 'run_after': '2026-05-05T10:22:06.225020Z', 'last_scheduling_decision': '2026-05-05T10:22:16.977390Z', 'run_type': 'manual', 'state': 'running', 'triggered_by': 'ui', 'triggering_user_name': 'admin', 'conf': {}, 'note': None, 'dag_versions': [{'id': '019df7a8-1f1a-771b-bdab-ed3e0499252d', 'version_number': 3, 'dag_id': 'custom_callback_deadline_alert_sync', 'bundle_name': 'dags-folder', 'bundle_version': None, 'created_at': '2026-05-05T10:21:23.610414Z', 'dag_display_name': 'custom_callback_deadline_alert_sync', 'bundle_url': None}], 'bundle_version': None, 'dag_display_name': 'custom_callback_deadline_alert_sync', 'partition_key': None}, 'deadline': {'id': '019df7a8-c5ac-787b-938b-4ab1a1b1d669', 'deadline_time': '2026-05-05T10:22:16.233422Z'}}} callback_path=unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test.run_sync_callback error_detail=[{'exc_type': 'ModuleNotFoundError', 'exc_value': "No module named 'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test'", 'exc_notes': [], 'syntax_error': None, 'is_cause': False, 'frames': [{'filename': '/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py', 'lineno': 111, 'name': 'execute_callback'}, {'filename': '/usr/python/lib/python3.10/importlib/__init__.py', 'lineno': 126, 'name': 'import_module'}, {'filename': '<frozen importlib._bootstrap>', 'lineno': 1050, 'name': '_gcd_import'}, {'filename': '<frozen importlib._bootstrap>', 'lineno': 1027, 'name': '_find_and_load'}, {'filename': '<frozen importlib._bootstrap>', 'lineno': 1004, 'name': '_find_and_load_unlocked'}], 'is_group': False, 'exceptions': []}] error_msg="Callback execution failed: ModuleNotFoundError: No module named 'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test'" loc=callback_supervisor.py:142
2026-05-05 10:22:17.562123+00:00 [error    ] Callback failed                [callback_runner] error="Callback execution failed: ModuleNotFoundError: No module named 'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test'" loc=callback_supervisor.py:218
2026-05-05T10:22:17.566313Z [info     ] Workload finished              [callback_supervisor] duration=0.06016514600014489 exit_code=1 loc=callback_supervisor.py:368 workload_id=019df7a8-c5ab-7541-9d7f-c0ba0018c132 workload_type=ExecutorCallback
2026-05-05T10:22:17.566601Z [error    ] Workload execution failed.     [airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:110 workload_type=ExecuteCallback
Traceback (most recent call last):
  File "/opt/airflow/airflow-core/src/airflow/executors/local_executor.py", line 102, in _run_worker
    BaseExecutor.run_workload(
  File "/opt/airflow/airflow-core/src/airflow/executors/base_executor.py", line 670, in run_workload
    return supervise_callback(
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py", line 376, in supervise_callback
    raise RuntimeError(f"Callback subprocess exited with code {exit_code}")
RuntimeError: Callback subprocess exited with code 1
2026-05-05T10:22:18.504254Z [info     ] Received executor event with state failed for callback 019df7a8-c5ab-7541-9d7f-c0ba0018c132 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1271
2026-05-05T10:22:18.506357Z [error    ] Callback 019df7a8-c5ab-7541-9d7f-c0ba0018c132 failed: Execution failed [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1296
2026-05-05T10:22:18.512786Z [info     ] Getting all Callbacks          [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1118
2026-05-05T10:22:18.513512Z [info     ] executor                       [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1122
2026-05-05T10:22:18.513625Z [info     ] failed                         [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1123
2026-05-05T10:22:18.513704Z [info     ] 1                              [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1124
2026-05-05T10:22:18.513773Z [info     ] {'path': 'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test.run_sync_callback', 'dag_id': 'custom_callback_deadline_alert_sync', 'kwargs': {'text': '🚨 Dag {{ dag_run.dag_id }} missed deadline at {{ deadline.deadline_time }}. DagRun: {{ dag_run }}'}, 'prefix': 'deadline_alerts', 'executor': None} [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1125
2026-05-05T10:22:18.513858Z [info     ] Getting all Callbacks          [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1126
2026-05-05T10:22:18.513924Z [info     ] No pending callb

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Comment thread task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py Outdated
@dondaum dondaum force-pushed the fix/deadline-callbacks-in-dag-modules branch from 3599534 to 167316c Compare May 11, 2026 10:27
@eladkal eladkal added this to the Airflow 3.2.2 milestone May 11, 2026
@eladkal eladkal added type:bug-fix Changelog: Bug Fixes backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch labels May 11, 2026
@eladkal eladkal requested a review from ferruzzi May 11, 2026 15:13
Copy link
Copy Markdown
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this, it's a bit of tech debt I never did resolve. Left a couple comments.

Comment thread task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
@dondaum dondaum force-pushed the fix/deadline-callbacks-in-dag-modules branch 2 times, most recently from 9c62c1e to d15dce1 Compare May 13, 2026 09:47
@dondaum dondaum requested a review from potiuk as a code owner May 13, 2026 09:47
@dondaum dondaum force-pushed the fix/deadline-callbacks-in-dag-modules branch from d15dce1 to 1e75cd3 Compare May 13, 2026 11:59
@dondaum
Copy link
Copy Markdown
Contributor Author

dondaum commented May 13, 2026

The failed CI is likely unrelated to this change. I will rebase as soon as the issue in the upstream CI is resolved.

@dondaum dondaum force-pushed the fix/deadline-callbacks-in-dag-modules branch from 1e75cd3 to 965f666 Compare May 13, 2026 15:49
@ferruzzi
Copy link
Copy Markdown
Contributor

Yeah, test timed out, we can re-trigger just that one once the others finish

@dondaum dondaum force-pushed the fix/deadline-callbacks-in-dag-modules branch from 965f666 to b9faa5d Compare May 13, 2026 18:56
@dondaum
Copy link
Copy Markdown
Contributor Author

dondaum commented May 15, 2026

Yeah, test timed out, we can re-trigger just that one once the others finish

should be good now.

Copy link
Copy Markdown
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved with two small nits. Feel free to ignore them, they are pretty minor.

module = import_module(module_path)
# If the callback is defined within the Dag module, the module path is modified during DAG serialization.
# Attempt to import it using the path of the Dag file.
if UNUSUAL_MODULE_PREFIX in module_path:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: Maybe slightly more accurate to use startswith() here instead of in but that's maybe being pedantic

version=bundle_info.version,
)
bundle.initialize()
if (bundle_path := str(bundle.path)) not in sys.path:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking nit: You are casting bundle_path to a string here, then passing it to execute_callback. execute_callback has this type hinted as a PathLike and then casts it to a Path. Likely may as well cast it to a Path here and skip the steps, but it works either way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Executors-core LocalExecutor & SequentialExecutor area:task-sdk backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants