fix dag version inflation caused by unmatched serialized result of task using reserialized command#61077
fix dag version inflation caused by unmatched serialized result of task using reserialized command#61077wjddn279 wants to merge 6 commits intoapache:mainfrom
Conversation
|
This is not the right fix. The tests can be fixed. The main Airflow implementation should not be changed. |
|
The problem is clear. I've reviewed several approaches, but I'm not sure if this can be resolved without modifying the existing serialize logic. I applied minimal changes, but it's essentially a modification with no functional changes. |
|
You got this wrong, the sorted data used in creating the hash is not stored in the DB. What you have in the DB is not sorted. You can read the hashing logic here: airflow/airflow-core/src/airflow/models/serialized_dag.py Lines 362 to 372 in 033d185 |
|
The problem in the current situation is that the hash value is changing. The sorted order is different between "bear": ["bear", "operator"] and "bear": ["operator", "bear"] and it makes hash value different. |
6628c9d to
16927b5
Compare
|
Instead of changing the existing serialize logic, I modified the sorting method. While there is a difference in the serialized values between those created through dag_processor and those that aren't, I modified it so that the hash result values after sorting are the same. Previously there was no sorting of tuples, but by adding it, I changed it so that the sort result values are the same. |
16927b5 to
152d9cc
Compare
Can you add a test for this in the cli for dag reserialize ensuring that it gives the same value |
Done! thanks! |
357d51e to
2fdd227
Compare
|
I added test code to verify that the hash value is identical to the existing dag_processor. If you check the test code, you can see the parts that change during conversion to bytes and decoding for inter-process socket communication (enum -> str, tuple -> list), as explained in the description. |
|
Could you take another look at this? |
|
Hello, we still have the issue with the DAG reserialization process (#60868), apparently this fix was not merged with Airflow 3.1.7 :( Do you have a timeline for this to be integrated? Best regards |
|
@ashb @kaxil @ephraimbuddy @amoghrajesh Kindly ping code owners and reviewers! |
kaxil
left a comment
There was a problem hiding this comment.
Fix looks correct. The root cause is well understood: serialize_for_task_group() returns tuples (e.g. (DagAttributeTypes.OP, 'bear')) which survive as-is in the CLI reserialize path, but get converted to lists through msgpack round-trip in the dag_processor path. The sort function only handled list, so tuples fell through unsorted, causing hash mismatches and spurious DAG version bumps.
A few comments on the tests below.
359e474 to
7f866dd
Compare
| from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess | ||
| from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame | ||
| from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG |
There was a problem hiding this comment.
Move Imports at the top of the file please
There was a problem hiding this comment.
Pull request overview
This PR fixes a bug where executing airflow dags reserialize would unnecessarily create new DAG versions even though the DAG definition had not changed. The root cause was that task_group.children values, which are (DagAttributeTypes, task_id) tuples in the reserialize path, were not being sorted during hash computation in _sort_serialized_dag_dict. In contrast, the dag_processor's msgpack round-trip converts tuples to lists, which did get sorted — causing hash divergence for task IDs alphabetically before 'operator' (the string value of DagAttributeTypes.OP).
Changes:
- Fix
_sort_serialized_dag_dictto also processtuplevalues (in addition tolist), ensuring(DagAttributeTypes.OP, 'bear')tuples get sorted identically to the dag_processor's['operator', 'bear']lists. - Add a regression test
test_reserialize_should_make_equal_hash_with_dag_processorthat simulates the dag_processor msgpack encoding/decoding and asserts hash equality. - Add a test DAG (
test_dag_reserialize.py) with task ID'bear'(which reproduces the bug, as 'bear' < 'operator' alphabetically).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
airflow-core/src/airflow/models/serialized_dag.py |
Core bug fix: extend _sort_serialized_dag_dict to process tuples (in addition to lists) for consistent hash computation |
airflow-core/tests/unit/cli/commands/test_dag_command.py |
Regression test verifying that the hash from dag_reserialize matches the hash computed via the dag_processor msgpack round-trip flow |
airflow-core/tests/unit/dags/test_dag_reserialize.py |
Test DAG with task ID 'bear' specifically chosen to reproduce the original bug |
You can also share your feedback on Copilot code review. Take the survey.
closed: #60868
Reason the Issue occurs?
To summarize the issue: when dag_parsing occurs, there is no increase in the Dag version, but when the command
airflow dags reserializeis executed, an increase in the Dag version is observed.For the following Dag, I confirmed that the hash result from parsing in the dag_processor differs from the hash result through the airflow command. Upon checking the serialized values, I found that the order of the following two fields was different:
I understood that sorting should be applied to the values, so I needed to investigate the cause of this discrepancy.
Deep Dive
Initially, I checked whether there were differences in the logic between the dag_processor and the airflow command parsing logic, but they were identical. Therefore, I logged and compared the serialize_dag before sorting was applied.
the location of each logging is here ( dag_data data_ data_json)
The parsing results were different from the start. In fact, the Dag parsing result should match what's generated from the airflow dags command. However, I inferred that in the dag_processor, the format was changed during the dumps process of the model for inter-subprocess communication.
I confirmed that the result from the airflow dags command was also changed to match the dag_processor format after json.loads was applied (enum -> str, tuple -> list).
Ironically, the dag_processor, where the format was changed, had sorting applied correctly, while the airflow dags command did not, resulting in the discrepancy between the two.
Solution
The standard solution would be to control the values that change in the dag_processor, but this appears to be impossible with the current approach. Therefore, I modified the existing task_group serialization method to align with the dag_processor's format.I did not modify the serialize logic to maintain logic consistency. Instead, I modified the sorting logic for generating hash values so that sorting is also applied to tuple values.
I confirmed that applying sorted to a tuple of enum and str, such as
(<DagAttributeTypes.OP: 'operator'>, 'bear'), applies sorting the same way as with strings.('bear', (<DagAttributeTypes.OP: 'operator'>, 'bear'))Therefore, the hash result values are generated identically.Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.