Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
TaskInletAssetReference,
TaskOutletAssetReference,
)
from airflow.models.backfill import Backfill
from airflow.models.backfill import Backfill, BackfillDagRun
from airflow.models.callback import Callback, CallbackType, ExecutorCallback
from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
Expand Down Expand Up @@ -1981,6 +1981,9 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
# todo: AIP-78 simplify this function to an update statement
query = select(Backfill).where(
Backfill.completed_at.is_(None),
# Guard: backfill must have at least one association,
# otherwise it is still being set up (see #61375).
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fix the root cause instead? _create_backfill() does session.commit() (backfill.py L605) to persist the Backfill row, then creates BackfillDagRun/DagRun rows afterwards — that's what opens the race window. Changing that to session.flush() would still assign br.id (needed as FK for BackfillDagRun) without committing. The create_session() context manager already commits on successful exit, so all rows would be committed atomically — eliminating the race window entirely.

If the guard approach is preferred, there's an edge case worth considering: if _create_backfill fails after committing the Backfill row but before creating any BackfillDagRun rows (e.g. RuntimeError("No runs to create...") on L616), this guard means _mark_backfills_complete will never clean it up. Combined with the AlreadyRunningBackfill check, that orphaned backfill would block all future backfills for the same DAG permanently.

Copy link
Contributor Author

@shivaam shivaam Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about the edge case. I looked into the flush approach but I think it introduces a different race condition.

Before creating a backfill, we check if there are any active backfills for the same DAG and throw an error. Currently, we immediately commit the Backfill row, so a concurrent request sees it and raises AlreadyRunningBackfill. If we batch everything into one transaction with flush(), the Backfill row stays invisible to other sessions until all DagRuns are created and the final commit happens. That opens a window of seconds where a concurrent request sees zero active backfills and can create a duplicate backfill.

[Check for existing backfills](

num_active = session.scalar(
select(func.count()).where(
Backfill.dag_id == dag_id,
Backfill.completed_at.is_(None),
)
)
if num_active is None:
raise UnknownActiveBackfills(dag_id)
if num_active > 0:
raise AlreadyRunningBackfill(
f"Another backfill is running for Dag {dag_id}. "
f"There can be only one running backfill per Dag."
)
dag = serdag.dag
)

For the guard approach, I think we can handle the orphan two different ways:

  • Add an age-based cleanup in _mark_backfills_complete — backfills with zero BackfillDagRun rows older than 10 minutes get marked complete instead of being stuck forever.
  • In _create_backfill, if the DagRun creation fails after the Backfill row is already committed, catch the exception and mark the backfill complete immediately so it doesn't permanently block future backfills for that DAG.

I can add the age-based cleanup in this PR. Happy to also add the exception handling if you think both are worth having. Let me know what you'd prefer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaxil let me know what do you think?

Comment on lines 1981 to +1986
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new EXISTS(backfill_dag_run) guard means a Backfill that gets committed but never manages to create any BackfillDagRun rows (e.g. if _create_backfill() errors/crashes after the session.commit() at airflow/models/backfill.py:605) will never be auto-completed by the scheduler. Since _create_backfill() blocks new backfills by counting Backfill.completed_at IS NULL (airflow/models/backfill.py:577-590), this can leave a DAG permanently unable to start new backfills without manual DB cleanup. Consider adding a bounded “initializing” window (e.g., only require the association for very recent backfills) or introducing an explicit backfill state/failed marker so initialization failures don’t create stuck active backfills.

Suggested change
# todo: AIP-78 simplify this function to an update statement
query = select(Backfill).where(
Backfill.completed_at.is_(None),
# Guard: backfill must have at least one association,
# otherwise it is still being set up (see #61375).
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)),
# Treat very recent backfills with no associations as "initializing",
# but allow older ones without BackfillDagRun rows to be auto-completed
# so they don't block new backfills if initialization failed.
initializing_cutoff = now - timedelta(minutes=5)
# todo: AIP-78 simplify this function to an update statement
query = select(Backfill).where(
Backfill.completed_at.is_(None),
or_(
# Backfill has at least one association and is fully initialized.
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)),
# Or it is older than the initializing window; treat it as no longer initializing
# even if it has no BackfillDagRun rows (e.g. initialization crashed).
Backfill.created_at < initializing_cutoff,
),

Copilot uses AI. Check for mistakes.
~exists(
select(DagRun.id).where(
and_(DagRun.backfill_id == Backfill.id, DagRun.state.in_(unfinished_states))
Expand Down
53 changes: 52 additions & 1 deletion airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
AssetPartitionDagRun,
PartitionedAssetKeyLog,
)
from airflow.models.backfill import Backfill, _create_backfill
from airflow.models.backfill import Backfill, BackfillDagRun, ReprocessBehavior, _create_backfill
from airflow.models.callback import ExecutorCallback
from airflow.models.dag import DagModel, get_last_dagrun, infer_automated_data_interval
from airflow.models.dag_version import DagVersion
Expand Down Expand Up @@ -8732,6 +8732,57 @@ def test_mark_backfills_completed(dag_maker, session):
assert b.completed_at.timestamp() > 0


def test_mark_backfills_complete_skips_initializing_backfill(dag_maker, session):
clear_db_backfills()
dag_id = "test_backfill_race_lifecycle"
with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
BashOperator(task_id="hi", bash_command="echo hi")
b = Backfill(
dag_id=dag_id,
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-03"),
max_active_runs=10,
dag_run_conf={},
reprocess_behavior=ReprocessBehavior.NONE,
)
session.add(b)
session.commit()
backfill_id = b.id
session.expunge_all()
runner = SchedulerJobRunner(
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
)
runner._mark_backfills_complete()
b = session.get(Backfill, backfill_id)
assert b.completed_at is None
session.expunge_all()
dr = DagRun(
dag_id=dag_id,
run_id="backfill__2021-01-01T00:00:00+00:00",
run_type=DagRunType.BACKFILL_JOB,
logical_date=pendulum.parse("2021-01-01"),
data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")),
run_after=pendulum.parse("2021-01-02"),
state=DagRunState.SUCCESS,
backfill_id=backfill_id,
)
session.add(dr)
session.flush()
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=dr.id,
logical_date=pendulum.parse("2021-01-01"),
sort_ordinal=1,
)
)
session.commit()
session.expunge_all()
runner._mark_backfills_complete()
b = session.get(Backfill, backfill_id)
assert b.completed_at is not None


class Key1Mapper(CorePartitionMapper):
"""Partition Mapper that returns only key-1 as downstream key"""

Expand Down
Loading