Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/66914.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stop recomputing ``next_dagrun_*`` fields on paused Dags each parse cycle. Before this change, ``DagModel.calculate_dagrun_date_fields`` ran unconditionally on every parse cycle for every Dag, including paused ones, which caused ``next_dagrun_logical_date`` and ``next_dagrun_run_after`` to advance one cron period per cycle while staying strictly before "now" for ``catchup=False`` timetables — visible to external API consumers via ``GET /api/v2/dags`` and ``GET /api/v2/dags/{id}/details``. The fields now stay frozen while the Dag is paused and are recomputed fresh by the REST and CLI unpause paths when ``is_paused`` flips back to ``False``.
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,15 @@ def patch_dag(

data = patch_body.model_dump(include=fields_to_update, by_alias=True)

was_paused = dag.is_paused
for key, val in data.items():
setattr(dag, key, val)

if was_paused and dag.is_paused is False:
# Re-populate next_dagrun_* immediately on unpause so the API and scheduler
# see a fresh value rather than the frozen pre-pause snapshot.
dag.recompute_next_dagrun_fields_after_unpause(session=session)

return dag


Expand Down Expand Up @@ -389,6 +395,14 @@ def patch_dags(
.execution_options(synchronize_session="fetch")
)

if patch_body.is_paused is False:
# Re-populate next_dagrun_* immediately on bulk unpause so the API and
# scheduler see fresh values rather than frozen pre-pause snapshots.
for dag in dags:
session.refresh(dag, ["is_paused"])
if dag.is_paused is False:
dag.recompute_next_dagrun_fields_after_unpause(session=session)

return DAGCollectionResponse(
dags=dags,
total_entries=total_entries,
Expand Down
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ def set_is_paused(is_paused: bool, args, *, session: Session = NEW_SESSION) -> N
def _update_is_paused(dag_model: DagModel) -> bool:
old_is_paused = dag_model.is_paused
dag_model.is_paused = is_paused
if old_is_paused and not is_paused:
# Re-populate next_dagrun_* immediately on unpause so the API and
# scheduler see a fresh value rather than the frozen pre-pause one.
dag_model.recompute_next_dagrun_fields_after_unpause(session=session)
return old_is_paused

old_values = [
Expand Down
51 changes: 51 additions & 0 deletions airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,21 @@ def calculate_dagrun_date_fields(
# TODO: AIP-76 perhaps we need to add validation for manual runs ensure consistency between
# partition_key / partition_date and run_after

if self.is_paused:
# While the Dag is paused, the scheduler will not materialise any run
# regardless of what these fields hold. Recomputing them every parse
# cycle (the DAG processor calls this for every Dag, paused or not)
# has two user-visible costs: (a) ``catchup=False`` timetables advance
# the ``next_dagrun`` value forward one cron period per parse cycle
# while it stays strictly before "now", so any external consumer of
# ``/api/v2/dags/.../details`` sees a Dag whose "Next Run" is
# perpetually in the past (see #66462 / #66907 / #66552); and (b) we
# pay the timetable computation cost on every paused Dag every parse
# cycle for a value nothing will read until unpause. Stop touching the
# fields here; the unpause path triggers a fresh recompute via
# ``recompute_next_dagrun_fields_after_unpause``.
return

if isinstance(last_automated_run, datetime):
raise ValueError(
"Passing a datetime to `DagModel.calculate_dagrun_date_fields` is not supported. "
Expand Down Expand Up @@ -801,6 +816,42 @@ def calculate_dagrun_date_fields(
next_dagrun_partition_date=str(self.next_dagrun_partition_date),
)

@provide_session
def recompute_next_dagrun_fields_after_unpause(self, *, session: Session = NEW_SESSION) -> None:
"""
Refresh ``next_dagrun_*`` immediately after a Dag transitions to unpaused.

Companion to the ``is_paused`` short-circuit in
``calculate_dagrun_date_fields``: while paused the fields stay frozen, so
callers that flip the Dag back to running need a way to force one fresh
computation without waiting for the next parse cycle. Reads the latest
serialized Dag for the timetable and the most recent automated DagRun,
then delegates to the normal recompute path.

Safe no-op if the serialized Dag is missing (next parse will repopulate).
"""
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.types import DagRunType

if self.is_paused:
return
serialized = session.scalar(
select(SerializedDagModel)
.where(SerializedDagModel.dag_id == self.dag_id)
.order_by(SerializedDagModel.last_updated.desc())
.limit(1)
)
if serialized is None:
return
last_automated_run = session.scalar(
select(DagRun)
.where(DagRun.dag_id == self.dag_id, DagRun.run_type != DagRunType.MANUAL)
.order_by(DagRun.logical_date.desc())
.limit(1)
)
self.calculate_dagrun_date_fields(serialized.dag, last_automated_run=last_automated_run)

@provide_session
def get_asset_triggered_next_run_info(
self, *, session: Session = NEW_SESSION
Expand Down
65 changes: 65 additions & 0 deletions airflow-core/tests/unit/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4087,3 +4087,68 @@ def test_calculate_dagrun_date_fields(
assert dag_model.next_dagrun_create_after == next_run_after
assert dag_model.next_dagrun_partition_key == next_partition_key
assert dag_model.next_dagrun_partition_date == next_partition_date


@pytest.mark.need_serialized_dag
def test_calculate_dagrun_date_fields_short_circuits_when_paused(dag_maker):
"""Paused Dags must not have ``next_dagrun_*`` recomputed each parse cycle."""
with dag_maker(schedule="@daily", catchup=False, start_date=TEST_DATE):
BashOperator(task_id="hi", bash_command="yo")

run = dag_maker.create_dagrun()
serdag = dag_maker.serialized_dag
dag_model = dag_maker.dag_model

# Establish a baseline while unpaused.
dag_model.is_paused = False
dag_model.calculate_dagrun_date_fields(dag=serdag, last_automated_run=run)
baseline_next_dagrun = dag_model.next_dagrun
baseline_create_after = dag_model.next_dagrun_create_after
assert baseline_next_dagrun is not None # sanity

# Pause the Dag and recompute many "parse cycles" later. The fields must not move.
dag_model.is_paused = True
with time_machine.travel("2030-01-01T00:00:00Z", tick=False):
dag_model.calculate_dagrun_date_fields(dag=serdag, last_automated_run=run)
assert dag_model.next_dagrun == baseline_next_dagrun
assert dag_model.next_dagrun_create_after == baseline_create_after


@pytest.mark.need_serialized_dag
def test_recompute_next_dagrun_fields_after_unpause(dag_maker, session):
"""Unpause path must refresh ``next_dagrun_*`` so the API/scheduler see fresh values."""
with dag_maker(schedule="@daily", catchup=False, start_date=TEST_DATE):
BashOperator(task_id="hi", bash_command="yo")

dag_maker.create_dagrun()
dag_model = dag_maker.dag_model

# Simulate the pause-frozen state.
dag_model.is_paused = True
dag_model.next_dagrun = None
dag_model.next_dagrun_create_after = None
session.commit()

# Unpause and call the helper.
dag_model.is_paused = False
dag_model.recompute_next_dagrun_fields_after_unpause(session=session)
assert dag_model.next_dagrun is not None
assert dag_model.next_dagrun_create_after is not None


@pytest.mark.need_serialized_dag
def test_recompute_next_dagrun_fields_after_unpause_noop_when_still_paused(dag_maker, session):
"""The helper must not touch the fields if the Dag is still paused."""
with dag_maker(schedule="@daily", catchup=False, start_date=TEST_DATE):
BashOperator(task_id="hi", bash_command="yo")

dag_maker.create_dagrun()
dag_model = dag_maker.dag_model
dag_model.is_paused = True
dag_model.next_dagrun = None
dag_model.next_dagrun_create_after = None
session.commit()

dag_model.recompute_next_dagrun_fields_after_unpause(session=session)
assert dag_model.next_dagrun is None
assert dag_model.next_dagrun_create_after is None
Loading