Fix backfill marked complete before DagRuns are created#62561
Fix backfill marked complete before DagRuns are created#62561shivaam wants to merge 3 commits intoapache:mainfrom
Conversation
The scheduler's _mark_backfills_complete() could mark a backfill as completed during the window between the Backfill row commit and DagRun creation. Add an EXISTS guard on BackfillDagRun so backfills still being initialized are skipped.
dcaf372 to
3372139
Compare
Removed commented-out lines for clarity.
eladkal
left a comment
There was a problem hiding this comment.
LGTM
will need a 2nd reviewer as this is scheduler core area
| Backfill.completed_at.is_(None), | ||
| # Guard: backfill must have at least one association, | ||
| # otherwise it is still being set up (see #61375). | ||
| exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)), |
There was a problem hiding this comment.
Should we fix the root cause instead? _create_backfill() does session.commit() (backfill.py L605) to persist the Backfill row, then creates BackfillDagRun/DagRun rows afterwards — that's what opens the race window. Changing that to session.flush() would still assign br.id (needed as FK for BackfillDagRun) without committing. The create_session() context manager already commits on successful exit, so all rows would be committed atomically — eliminating the race window entirely.
If the guard approach is preferred, there's an edge case worth considering: if _create_backfill fails after committing the Backfill row but before creating any BackfillDagRun rows (e.g. RuntimeError("No runs to create...") on L616), this guard means _mark_backfills_complete will never clean it up. Combined with the AlreadyRunningBackfill check, that orphaned backfill would block all future backfills for the same DAG permanently.
There was a problem hiding this comment.
Good point about the edge case. I looked into the flush approach but I think it introduces a different race condition.
Before creating a backfill, we check if there are any active backfills for the same DAG and throw an error. Currently, we immediately commit the Backfill row, so a concurrent request sees it and raises AlreadyRunningBackfill. If we batch everything into one transaction with flush(), the Backfill row stays invisible to other sessions until all DagRuns are created and the final commit happens. That opens a window of seconds where a concurrent request sees zero active backfills and can create a duplicate backfill.
[Check for existing backfills](
airflow/airflow-core/src/airflow/models/backfill.py
Lines 577 to 591 in bae2c27
For the guard approach, I think we can handle the orphan two different ways:
- Add an age-based cleanup in _mark_backfills_complete — backfills with zero BackfillDagRun rows older than 10 minutes get marked complete instead of being stuck forever.
- In _create_backfill, if the DagRun creation fails after the Backfill row is already committed, catch the exception and mark the backfill complete immediately so it doesn't permanently block future backfills for that DAG.
I can add the age-based cleanup in this PR. Happy to also add the exception handling if you think both are worth having. Let me know what you'd prefer.
|
Is it possible to also add copilot reviewer to this cr as well? |
There was a problem hiding this comment.
Pull request overview
Fixes a scheduler race where _mark_backfills_complete() could mark a Backfill as complete in the gap between the Backfill row being committed and its DagRuns being created.
Changes:
- Add a scheduler-side guard requiring a Backfill to have at least one
BackfillDagRunassociation before it can be marked complete. - Add a unit test covering the “initializing backfill” window (no associated DagRuns/BackfillDagRun rows yet), ensuring it is not prematurely completed.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
airflow-core/src/airflow/jobs/scheduler_job_runner.py |
Updates the backfill completion query to require existence of at least one BackfillDagRun row before completing. |
airflow-core/tests/unit/jobs/test_scheduler_job.py |
Adds a regression test that reproduces the initialization window and asserts the scheduler skips completion until associations exist. |
You can also share your feedback on Copilot code review. Take the survey.
| # todo: AIP-78 simplify this function to an update statement | ||
| query = select(Backfill).where( | ||
| Backfill.completed_at.is_(None), | ||
| # Guard: backfill must have at least one association, | ||
| # otherwise it is still being set up (see #61375). | ||
| exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)), |
There was a problem hiding this comment.
The new EXISTS(backfill_dag_run) guard means a Backfill that gets committed but never manages to create any BackfillDagRun rows (e.g. if _create_backfill() errors/crashes after the session.commit() at airflow/models/backfill.py:605) will never be auto-completed by the scheduler. Since _create_backfill() blocks new backfills by counting Backfill.completed_at IS NULL (airflow/models/backfill.py:577-590), this can leave a DAG permanently unable to start new backfills without manual DB cleanup. Consider adding a bounded “initializing” window (e.g., only require the association for very recent backfills) or introducing an explicit backfill state/failed marker so initialization failures don’t create stuck active backfills.
| # todo: AIP-78 simplify this function to an update statement | |
| query = select(Backfill).where( | |
| Backfill.completed_at.is_(None), | |
| # Guard: backfill must have at least one association, | |
| # otherwise it is still being set up (see #61375). | |
| exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)), | |
| # Treat very recent backfills with no associations as "initializing", | |
| # but allow older ones without BackfillDagRun rows to be auto-completed | |
| # so they don't block new backfills if initialization failed. | |
| initializing_cutoff = now - timedelta(minutes=5) | |
| # todo: AIP-78 simplify this function to an update statement | |
| query = select(Backfill).where( | |
| Backfill.completed_at.is_(None), | |
| or_( | |
| # Backfill has at least one association and is fully initialized. | |
| exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)), | |
| # Or it is older than the initializing window; treat it as no longer initializing | |
| # even if it has no BackfillDagRun rows (e.g. initialization crashed). | |
| Backfill.created_at < initializing_cutoff, | |
| ), |
What
The scheduler's
_mark_backfills_complete()prematurely marks a backfillas completed when it runs during the window between the Backfill row
commit and the DagRun creation in
_create_backfill().closes: #61375
Why
_create_backfill() works in two steps:
The scheduler runs _mark_backfills_complete() every 30 seconds. If it happens to run between step 1 and step 2, it sees a backfill with no running DagRuns (because they don't exist yet) and marks it done. The DagRuns get created after, but the backfill is already completed.
How
Added an EXISTS check on the backfill_dag_run table in the completion query. Now a backfill needs at least one BackfillDagRun row before it can be marked complete. If it has zero, it means the backfill is still being set up, so we skip it.
Tests
test_mark_backfills_complete_skips_initializing_backfill— verifies that backfill without any dagruns is skipped, then completed after DagRuns finish. If we remove the fix, the test will fail.Was generative AI tooling used to co-author this PR?