Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4820e1e
Fix max_active_runs lost during DAG serialisation when value equals s…
seruman Apr 15, 2026
9ce5587
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 15, 2026
dfc21ab
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 15, 2026
5ff1b01
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 16, 2026
72055a7
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 16, 2026
176c309
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 18, 2026
6076f0b
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 27, 2026
bf505e8
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 27, 2026
3c6082d
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 27, 2026
60a1d31
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 28, 2026
415add1
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 28, 2026
966e991
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 28, 2026
eff48ad
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 29, 2026
ba58a37
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 29, 2026
10d06aa
fix: introduce DAG_DEFAULTS
seruman Apr 30, 2026
da8186e
Merge branch 'main' into fix-max-active-runs-serialisation
seruman Apr 30, 2026
02ae43d
chore: add test for catchup
seruman Apr 30, 2026
6a5eb19
fix: revert comment change, that wording was better
seruman Apr 30, 2026
3015027
fix: preserve DAG fields whose defaults match the schema default
seruman May 4, 2026
bf1a61c
Merge branch 'main' into fix-max-active-runs-serialisation
seruman May 4, 2026
91d10ee
fix: update schema default check script
seruman May 4, 2026
091b12e
Merge branch 'main' into fix-max-active-runs-serialisation
seruman May 10, 2026
e7a7217
fix: address review comments on serialisation tests
seruman May 12, 2026
4a82bba
fix: add disable_bundle_versioning case to parametrised serialisation…
seruman May 12, 2026
6971495
fix: improve docstring for config-driven fields serialisation test
seruman May 12, 2026
984240b
Merge branch 'main' into fix-max-active-runs-serialisation
seruman May 12, 2026
d26a964
fix: use tuple for pytest.mark.parametrize first argument
seruman May 13, 2026
4557699
Merge branch 'main' into fix-max-active-runs-serialisation
seruman May 13, 2026
b466fd5
Merge branch 'main' into fix-max-active-runs-serialisation
seruman May 14, 2026
662f13e
Merge branch 'main' into fix-max-active-runs-serialisation
seruman May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions airflow-core/src/airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
"value": { "$ref": "#/definitions/dict" }
}
},
"catchup": { "type": "boolean", "default": false },
"catchup": { "type": "boolean" },
"allowed_run_types": {
"anyOf": [
{ "type": "array", "items": { "type": "string" } },
Expand Down Expand Up @@ -204,9 +204,9 @@
]
},
"_concurrency": { "type" : "number"},
"max_active_tasks": { "type" : "number", "default": 16},
"max_active_runs": { "type" : "number", "default": 16},
"max_consecutive_failed_dag_runs": { "type" : "number", "default": 0},
"max_active_tasks": { "type" : "number" },
"max_active_runs": { "type" : "number" },
"max_consecutive_failed_dag_runs": { "type" : "number" },
"default_args": { "$ref": "#/definitions/dict" },
"start_date": { "$ref": "#/definitions/datetime" },
"end_date": { "$ref": "#/definitions/datetime" },
Expand All @@ -224,7 +224,7 @@
]},
"edge_info": { "$ref": "#/definitions/edge_info" },
"dag_dependencies": { "$ref": "#/definitions/dag_dependencies" },
"disable_bundle_versioning": {"type": "boolean", "default": false }
"disable_bundle_versioning": {"type": "boolean" }
},
"required": [
"dag_id",
Expand Down
18 changes: 18 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,24 @@ def test_max_active_runs_explicit_value_is_used(self, testing_dag_bundle, sessio
orm_dag = session.get(DagModel, "dag_max_runs")
assert orm_dag.max_active_runs == 3

@pytest.mark.parametrize(
("field", "cfg_key", "schema_default"),
[
("max_active_runs", "max_active_runs_per_dag", 16),
("max_active_tasks", "max_active_tasks_per_dag", 16),
("max_consecutive_failed_dag_runs", "max_consecutive_failed_dag_runs_per_dag", 0),
],
)
def test_config_driven_field_equal_to_schema_default_not_overridden_by_conf(
self, testing_dag_bundle, session, dag_maker, field, cfg_key, schema_default
):
with conf_vars({("core", cfg_key): "1"}):
with dag_maker(f"dag_{field}_schema_default", schedule=None, **{field: schema_default}) as dag:
...
update_dag_parsing_results_in_db("testing", None, [dag], {}, 0.1, set(), session)
orm_dag = session.get(DagModel, f"dag_{field}_schema_default")
assert getattr(orm_dag, field) == schema_default

def test_max_active_runs_defaults_from_conf_when_none(self, testing_dag_bundle, session, dag_maker):
with conf_vars({("core", "max_active_runs_per_dag"): "4"}):
with dag_maker("dag_max_runs_default", schedule=None) as dag:
Expand Down
94 changes: 90 additions & 4 deletions airflow-core/tests/unit/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from airflow.serialization.serialized_objects import (
BaseSerialization,
DagSerialization,
LazyDeserializedDAG,
OperatorSerialization,
_XComRef,
)
Expand Down Expand Up @@ -333,6 +334,13 @@ def _operator_defaults(overrides):
},
],
"params": [],
# These fields have no schema default; they are always emitted on the wire
# because their real default comes from airflow.cfg at parse time.
"catchup": False,
"disable_bundle_versioning": False,
"max_active_runs": 16,
"max_active_tasks": 16,
"max_consecutive_failed_dag_runs": 0,
Comment thread
ephraimbuddy marked this conversation as resolved.
},
}

Expand Down Expand Up @@ -3805,13 +3813,14 @@ def test_dag_schema_defaults_optimization():
dag_with_defaults = DAG(
dag_id="test_defaults_dag",
start_date=datetime(2023, 1, 1),
# These should match schema defaults and be excluded
catchup=False,
# These match remaining schema defaults and should be excluded
fail_fast=False,
render_template_as_native_obj=False,
# These are config-driven: no schema default, always emitted on the wire
catchup=False,
max_active_runs=16,
max_active_tasks=16,
max_consecutive_failed_dag_runs=0,
render_template_as_native_obj=False,
disable_bundle_versioning=False,
# These should be excluded as None
description=None,
Expand All @@ -3826,6 +3835,16 @@ def test_dag_schema_defaults_optimization():
for field in DagSerialization.get_schema_defaults("dag").keys():
assert field not in dag_data, f"Schema default field '{field}' should be excluded"

# Config-driven fields have no schema default and are always present on the wire
for field in (
"catchup",
"max_active_runs",
"max_active_tasks",
"max_consecutive_failed_dag_runs",
"disable_bundle_versioning",
):
assert field in dag_data, f"Config-driven field '{field}' must always be serialised"

# None fields should also be excluded
none_fields = ["description", "doc_md"]
for field in none_fields:
Expand All @@ -3834,7 +3853,8 @@ def test_dag_schema_defaults_optimization():
# Test deserialization restores defaults correctly
deserialized_dag = DagSerialization.from_dict(serialized)

# Verify schema defaults are restored
# Verify values round-trip correctly: schema-default fields are restored from the schema,
# config-driven fields are read directly from the wire.
assert deserialized_dag.catchup is False
assert deserialized_dag.fail_fast is False
assert deserialized_dag.max_active_runs == 16
Expand Down Expand Up @@ -3864,6 +3884,72 @@ def test_dag_schema_defaults_optimization():
assert dag_non_defaults_data["description"] == "Test description"


@pytest.mark.parametrize(
("cfg_overrides", "dag_kwargs", "expected_wire"),
[
pytest.param(
{
("core", "max_active_runs_per_dag"): "1",
("core", "max_active_tasks_per_dag"): "1",
("core", "max_consecutive_failed_dag_runs_per_dag"): "1",
},
{
"dag_id": "test_dag_fields_cfg_ne_user",
"max_active_runs": 16,
"max_active_tasks": 16,
"max_consecutive_failed_dag_runs": 0,
},
{"max_active_runs": 16, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0},
id="user_value_differs_from_cfg",
),
pytest.param(
{
("core", "max_active_runs_per_dag"): "16",
("core", "max_active_tasks_per_dag"): "16",
("core", "max_consecutive_failed_dag_runs_per_dag"): "0",
},
{
"dag_id": "test_dag_fields_cfg_eq_user",
"max_active_runs": 16,
"max_active_tasks": 16,
"max_consecutive_failed_dag_runs": 0,
},
{"max_active_runs": 16, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0},
id="user_value_equals_cfg",
),
pytest.param(
{("scheduler", "catchup_by_default"): "True"},
{"dag_id": "test_dag_catchup_override", "catchup": False},
{"catchup": False},
id="catchup_false_with_catchup_by_default_true",
),
pytest.param(
{("dag_processor", "disable_bundle_versioning"): "False"},
{"dag_id": "test_dag_disable_bundle_versioning", "disable_bundle_versioning": True},
{"disable_bundle_versioning": True},
id="disable_bundle_versioning_true_with_cfg_false",
),
],
)
def test_dag_config_driven_fields_always_serialized(cfg_overrides, dag_kwargs, expected_wire):
"""Config-driven DAG fields are always present on the wire regardless of the airflow.cfg value.

Fields like max_active_runs and other config-driven fields were silently dropped during
serialisation when their value matched the schema default, regardless of what airflow.cfg
was set to. #55849 excluded any field whose value matched the schema default.
"""
with conf_vars(cfg_overrides):
dag = DAG(start_date=datetime(2023, 1, 1), **dag_kwargs)
serialized = DagSerialization.to_dict(dag)

for field, value in expected_wire.items():
assert serialized["dag"][field] == value

lazy_dag = LazyDeserializedDAG(data=serialized)
for field, value in expected_wire.items():
assert getattr(lazy_dag, field) == value


def test_email_optimization_removes_email_attrs_when_email_empty():
"""Test that email_on_failure and email_on_retry are removed when email is empty."""
with DAG(dag_id="test_email_optimization") as dag:
Expand Down
11 changes: 10 additions & 1 deletion scripts/in_container/run_schema_defaults_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,16 @@ def compare_dag_defaults() -> list[str]:
if (
server_value is not None
and server_value not in [[], {}, (), set()]
and field_name not in ["dag_id", "dag_display_name"]
and field_name
not in [
"dag_id",
"dag_display_name",
"max_active_runs",
"max_active_tasks",
"max_consecutive_failed_dag_runs",
"catchup",
"disable_bundle_versioning",
]
):
errors.append(
f"DAG server field '{field_name}' has default {server_value!r} but no schema default"
Expand Down
Loading