Issue "XCom pickle results from DatabricksSqlOperator" Fixed #59151
+108
−8
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fix DatabricksSqlOperator XCom pickle serialization
closes: #59103
Description
This PR fixes the issue where
DatabricksSqlOperatorfails with_pickle.PicklingError: Can't pickle <class 'airflow.providers.databricks.hooks.databricks_sql.Row'>when XCom push is enabled (do_xcom_push=True).Root Cause
The Databricks SQL connector returns
databricks.sql.types.Rowobjects, which are dynamically created classes that cannot be pickled. XCom requires all return values to be picklable for storage in the Airflow metadata database. When using the defaultfetch_all_handler, these unpicklable Row objects were returned directly without conversion.Solution
Introduced a new
PicklableRowwrapper class inDatabricksSqlHookthat:__reduce__method_fields,_asdict(), iteration, and attribute accesscount(1)→_0)Changes
DatabricksSqlHook.run()to always convert Row objects to PicklableRow, even when no handler is provided_make_common_data_structure()to use PicklableRow instead of dynamic namedtuplestest_xcom_pickle_results_with_row_objects()to verify pickle serialization works correctlyTesting
_fieldsattribute returns properly renamed field names_asdict()method returns dictionaries with original field names