Skip to content

Fix max_active_runs lost during DAG serialisation when value equals schema default#65310

Open
seruman wants to merge 30 commits into
apache:mainfrom
seruman:fix-max-active-runs-serialisation
Open

Fix max_active_runs lost during DAG serialisation when value equals schema default#65310
seruman wants to merge 30 commits into
apache:mainfrom
seruman:fix-max-active-runs-serialisation

Conversation

@seruman
Copy link
Copy Markdown

@seruman seruman commented Apr 15, 2026

When a DAG explicitly sets max_active_runs=16 and airflow.cfg has max_active_runs_per_dag = 1, the dag table ends up with 1. Setting it to 17 or any other value that isn't 16 works fine.

The serialisation optimisation from #55849 strips DAG fields that match their schema.json default.
This works for static defaults like catchup=False, but max_active_runs, max_active_tasks, and max_consecutive_failed_dag_runs get their defaults from airflow.cfg at parse time, not from the schema.

When the user's explicit value happens to equal the schema default (16), it gets stripped, LazyDeserializedDAG returns None, and collection.py falls back to whatever airflow.cfg says.

The fix skips the schema-default exclusion for these three config-driven fields so they always survive serialisation.

After deploying this, the first parse cycle will produce a slightly different serialised payload for every DAG (three extra int fields), which means a one-time dag_hash change and a new DagVersion for DAGs that have running task instances.

closes: #65307
related: #57604
related: #56646


Was generative AI tooling used to co-author this PR?
  • Yes

Generated-by: pi (Claude Opus 4.6) following the guidelines

Note

Prompted it like when DAGs and config configured like this I observe this and rows in metadata is like this and informed it with my suspicion on hard coded default 16 in the schema to point me to relevant paths. After the proposed fix and unit tests went over the code an tests to make sure it is correct and aligns with the rest of the project and see if there're any alternative solutions. Spawned Airflow with breeze, tested the same exact scenarios we had in the real deployment to verify the new behaviour along with the dag_has change I mentioned above.


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

…chema default

The serialisation optimisation from apache#55849 strips DAG fields that match
their schema.json default. For max_active_runs, max_active_tasks, and
max_consecutive_failed_dag_runs this is wrong because their runtime
defaults come from airflow.cfg, not the schema. When a user explicitly
sets max_active_runs=16 and the config has max_active_runs_per_dag=1,
the value gets stripped and the dag table ends up with 1.

Skip the schema-default exclusion for these three config-driven fields
so they always survive serialisation.
@seruman seruman changed the title Fix max active runs serialisation Fix max_active_runs lost during DAG serialisation when value equals schema default Apr 15, 2026
Comment thread airflow-core/src/airflow/serialization/serialized_objects.py Outdated
@potiuk potiuk marked this pull request as draft April 22, 2026 19:31
@potiuk
Copy link
Copy Markdown
Member

potiuk commented Apr 22, 2026

@seruman Converting to draft — this PR doesn't yet meet our Pull Request quality criteria.

  • Unresolved review comments (1 thread): please walk through each unresolved review thread. Even if a suggestion looks incorrect or irrelevant — and some of them will be, especially any comments left by automated reviewers like GitHub Copilot — it is still the author's responsibility to respond: apply the fix, reply in-thread with a brief explanation of why the suggestion does not apply, or resolve the thread if the feedback is no longer relevant. Leaving threads unaddressed for weeks blocks the PR from moving forward.

See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@potiuk
Copy link
Copy Markdown
Member

potiuk commented Apr 22, 2026

Quick follow-up to the triage comment above — one clarification on the "Unresolved review comments" item:

Once you believe a thread has been addressed — whether by pushing a fix, or by replying in-thread with an explanation of why the suggestion doesn't apply — please mark the thread as resolved yourself by clicking the "Resolve conversation" button at the bottom of each thread. Reviewers don't auto-close their own threads, so an addressed-but-unresolved thread reads as "still waiting on the author" and keeps the PR from moving forward. The author doing the resolve-click is the expected convention on this project.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@seruman
Copy link
Copy Markdown
Author

seruman commented Apr 27, 2026

@potiuk I think triage tool false flags self-reviews, marked it as resolved. I just wanted reviewers comment on it.

@seruman seruman marked this pull request as ready for review April 27, 2026 08:49
@uranusjr uranusjr requested a review from kaxil April 29, 2026 08:12
@kaxil
Copy link
Copy Markdown
Member

kaxil commented Apr 29, 2026

Fix is correct and the bug is real. Before merging, worth weighing against extending client_defaults instead of adding a new carve-out.

The gap this PR papers over: #55849 added DAG-level schema-default exclusion without extending the client_defaults mechanism from #54569. Doing the latter solves the same bug, keeps the byte optimization for the common case, and generalizes correctly to catchup and disable_bundle_versioning, which have the same structural defect today (their consumers happen to treat None as False, so the symptom doesn't surface, but the field is still being lost on the wire).

Concrete sketch

  1. SDK-side DAG_DEFAULTS mirroring OPERATOR_DEFAULTS:
DAG_DEFAULTS = {
    "max_active_runs": ("core", "max_active_runs_per_dag"),
    "max_active_tasks": ("core", "max_active_tasks_per_dag"),
    "max_consecutive_failed_dag_runs": ("core", "max_consecutive_failed_dag_runs_per_dag"),
    "catchup": ("scheduler", "catchup_by_default"),
    "disable_bundle_versioning": ("dag_processor", "disable_bundle_versioning"),
}
  1. DagSerialization.generate_client_defaults() resolves these against current cfg at serialize time, and to_dict() writes them alongside the tasks entry:
json_dict["client_defaults"] = {
    "tasks": OperatorSerialization.generate_client_defaults(),
    "dag":   DagSerialization.generate_client_defaults(),
}
  1. SerializedDAG._is_excluded: for fields in DAG_DEFAULTS, exclude iff var == client_defaults["dag"][attrname]. Skip the schema-default branch for these. Drop default: from schema.json for the five fields, it stops being load-bearing.

  2. Read path: LazyDeserializedDAG.__getattr__ (and the existing fallback in collection.py) consults client_defaults["dag"] before any current-cfg fallback. The captured value is the cfg at parse time, which matters across multi-process boundaries and cfg edits between parse and read.

Verified locally, every scenario round-trips correctly (cfg ∈ {1, 16} × user_set ∈ {None, 1, 16, 42}):

cfg user sets on wire reader sees
1 16 16 16 (the PR's bug case)
1 none omitted 1 (from client_defaults["dag"])
16 none omitted 16
1 1 omitted 1
1 42 42 42
16 16 omitted 16

Why this beats the carve-out

  • Optimization preserved. Common case stays compact, no +85 B/DAG always-emit.
  • Self-registering. The PR's _CONFIG_DRIVEN_FIELDS frozenset would drift, and it's already incomplete (catchup, disable_bundle_versioning).
  • Schema-honest. The payload no longer claims a static default it doesn't honor. Readers don't depend on having access to current cfg.
  • Reuses the Decouple Serialization and Deserialization Code for tasks #54569 infrastructure (generate_client_defaults, _matches_client_defaults, the wire slot, the deserializer plumbing).

Tradeoff: ~30-40 line diff vs the current 13. Larger surface, but lands on the architecture already in place, and the one-time dag_hash churn happens once instead of twice if a follow-up extends client_defaults later.

# the hardcoded schema default because the schema default (e.g. 16) may differ
# from the runtime config value (e.g. 1). Excluding them loses explicitly-set
# values that happen to equal the schema default.
_CONFIG_DRIVEN_FIELDS = frozenset(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lift to module scope. As written, this frozenset is rebuilt on every _is_excluded call (which fires once per DAG attribute per serialization). Move it next to the other module-level constants.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be obsolete with 10d06aa

"downstream_task_ids": [],
},
"is_paused_upon_creation": False,
"max_active_runs": 16,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The literal 16 here only holds while no test has overridden [core] max_active_runs_per_dag. Wrap the test body in conf_vars(...) like test_max_active_runs_equal_to_schema_default_not_overridden_by_conf already does, so the assertion is self-pinning. Same for the new test_config_driven_dag_fields_always_serialized below -- worth pinning the cfg there too.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
dag_schema_defaults = cls.get_schema_defaults("dag")
if attrname in dag_schema_defaults:
if attrname in dag_schema_defaults and attrname not in _CONFIG_DRIVEN_FIELDS:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned I'd prefer solution in #65310 (comment) with client_defaults instead

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that sounds better, that's why I wanted to point it out in #65310 (comment)

How 10d06aa looks like?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realized something, my bad 🤦

@kaxil kaxil added this to the Airflow 3.2.2 milestone Apr 29, 2026
@seruman seruman force-pushed the fix-max-active-runs-serialisation branch from b826307 to e7e330d Compare April 30, 2026 08:15
@seruman
Copy link
Copy Markdown
Author

seruman commented Apr 30, 2026

Damn I failed to rebase 🤦 sorry for the noise.

Edit: I did a bad rebase, that caused adding a bunch of new reviewers due to codeowners, I'm honestly sorry for this.

@seruman seruman force-pushed the fix-max-active-runs-serialisation branch from e7e330d to 10d06aa Compare April 30, 2026 08:19
Comment thread airflow-core/tests/unit/serialization/test_dag_serialization.py
@kaxil kaxil added the backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch label Apr 30, 2026
@kaxil
Copy link
Copy Markdown
Member

kaxil commented May 1, 2026

Walking back my earlier suggestion. The client_defaults indirection makes sense for tasks because there are many of them per payload, so factoring N common defaults out of M tasks saves N×M bytes. For DAG-level fields there's exactly one DAG per payload, so the wrapper costs more than it saves and the abstraction doesn't earn its complexity. Measured locally: client_defaults approach is ~50 B/DAG heavier than just always-emitting these fields, on top of ~80 extra lines of code.

Simplest fix: keep the schema.json changes (drop default: for the 5 fields), drop everything else. With no schema default, _is_excluded already returns False for these fields → they're always emitted → reader reads them directly. No DAG_DEFAULTS, no generate_client_defaults, no _matches_client_defaults, no deserialize-time setattr loop, no __getattr__ fallback.

The PR's original carve-out approach was actually closer to right than this rewrite. Apologies for the detour 🤦🏻‍♂️.

@seruman
Copy link
Copy Markdown
Author

seruman commented May 4, 2026

@kaxil yeah that makes much more sense. I do not think 50B/DAG would be much of an issue -at least in my case- but the abstraction was feeling heavy.

Had to add an exclusion list to scripts/in_container/run_schema_defaults_check.py, not quite feels right. Without the explicit exclusion;

❌ Found discrepancies between schema and server defaults:
  • DAG server field 'catchup' has default False but no schema default
  • DAG server field 'max_active_runs' has default 16 but no schema default
  • DAG server field 'disable_bundle_versioning' has default False but no schema default
  • DAG server field 'dag_id' has default 'temp' but no schema default
  • DAG server field 'max_consecutive_failed_dag_runs' has default 0 but no schema default
  • DAG server field 'max_active_tasks' has default 16 but no schema default
  • DAG server field 'dag_display_name' has default 'temp' but no schema default

Python side defaults would diverge from the schema side.

No need for an apology, TIL how Airflow serializes DAGs 🎉

@potiuk
Copy link
Copy Markdown
Member

potiuk commented May 5, 2026

@seruman — Your unresolved review thread(s) from @ephraimbuddy, @kaxil appear to have been addressed (post-review commits and/or in-thread replies on every thread, with the latest commit pushed after the most recent thread). I've added the ready for maintainer review label so the PR re-enters the maintainer review queue.

@ephraimbuddy, @kaxil — could you take another look when you have a chance? If you agree the feedback was addressed, please mark the threads as resolved so the queue signal stays accurate. If a thread still needs work, please reply in-line — @seruman will follow up.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label May 5, 2026
Copy link
Copy Markdown
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few suggestions on the tests — nothing blocking. Two are about widening coverage so the fix doesn't regress for the sibling fields; the rest are tightening / parametrisation / docstring tweaks.

One extra point that I couldn't leave inline because the line sits outside the diff hunk:

improvement: test_dag_schema_defaults_optimization (around the for field in DagSerialization.get_schema_defaults("dag").keys() loop, ~L3831) is now weaker than its surrounding comments suggest. After this PR, that loop iterates only over the fields that still have a schema default (fail_fast, render_template_as_native_obj, callback flags). It no longer asserts anything about catchup, max_active_runs, max_active_tasks, max_consecutive_failed_dag_runs, or disable_bundle_versioning — yet the DAG above is still constructed with catchup=False, max_active_runs=16, max_active_tasks=16, max_consecutive_failed_dag_runs=0, disable_bundle_versioning=False under a comment that reads "These should match schema defaults and be excluded". That comment is now wrong, and assert deserialized_dag.max_active_runs == 16 succeeds for the opposite reason it used to (the value is on the wire now, not restored from a schema default). Minimal fix: update the comment, and add a positive for field in (...): assert field in dag_data block to lock in the new contract right next to the test that locks in the old one.


Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting

Comment thread airflow-core/tests/unit/dag_processing/test_collection.py Outdated
Comment thread airflow-core/tests/unit/serialization/test_dag_serialization.py
Comment thread airflow-core/tests/unit/serialization/test_dag_serialization.py Outdated
Comment thread airflow-core/tests/unit/serialization/test_dag_serialization.py Outdated
Comment thread airflow-core/tests/unit/serialization/test_dag_serialization.py Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:DAG-processing backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

max_active_runs in DAG definition ignored when value matches built-in default

4 participants