Fix DAG-level on_failure_callback not firing#63692
Fix DAG-level on_failure_callback not firing#63692Sathvik-Chowdary-Veerapaneni wants to merge 14 commits into
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
Previously only DetachedInstanceError was caught when accessing consumed_asset_events on ORM DagRun objects. Other SQLAlchemy exceptions (e.g. InvalidRequestError) crashed the scheduler. closes: apache#63374
DagRunContext creation could crash when ORM relationship access failed, preventing the callback from being produced entirely. The callback is now sent with minimal context on failure.
f49ef66 to
dfd11a3
Compare
There was a problem hiding this comment.
Do we want this in info? Wondering if debug is enough.
| except Exception: | ||
| self.log.exception( | ||
| "Failed to build DagRunContext for dag_id=%s run_id=%s; " | ||
| "sending callback with minimal context", | ||
| self.dag_id, | ||
| self.run_id, | ||
| ) | ||
| context_from_server = None |
There was a problem hiding this comment.
Even if it happens, this is bad: running callback without the full context is worse than the callback failing. Also, the exception is broad here
| "this DAG processor (serving bundles: %s). Skipping.", | ||
| getattr(req, "dag_id", "unknown"), | ||
| req.bundle_name, | ||
| bundle_names, |
There was a problem hiding this comment.
Any callback fetched at this point is bound to have bundle_name in bundle_names. Debugging here?
| except Exception: | ||
| self.log.exception( | ||
| "Failed to build DagRunContext for dag_id=%s run_id=%s; " | ||
| "sending callback with minimal context", | ||
| self.dag_id, | ||
| self.run_id, | ||
| ) | ||
| context_from_server = None |
There was a problem hiding this comment.
Even if it happens, this is bad: running callback without the full context is worse than the callback failing. Also, the exception is broad here
|
@Sathvik-Chowdary-Veerapaneni can you address open comments? |
- Removed duplicate bundle-name guard after scoped callback fetch
- Updated scheduler callback request log from info to debug
- Removed broad fallback around DagRunContext creation
- Restored test name and comments for existing bundle filtering behavior
- Removed regression test for dropped minimal-context callback fallback
|
Thanks for the reminder. I pushed updates addressing the open review comments by narrowing the PR back to the core DAG callback fix:
I also updated the PR description to remove the stale fallback/logging notes. Verified locally in a clean worktree:
Both passed. |
Fixed DAG-level
on_failure_callbacknot firing.When the scheduler builds a
DagCallbackRequest,DagRunContextneeds to readDagRunORM relationship data. In the reported failure path, SQLAlchemy can raise more thanDetachedInstanceError, which prevented the callback request from being produced.Changes:
SQLAlchemyErrorwhile buildingDagRunContextand reload theDagRunfrom the DB before collecting context.debug.Tests:
airflow-core/tests/unit/models/test_dagrun.py::TestDagRun::test_dagrun_update_state_with_handle_callback_failureairflow-core/tests/unit/dag_processing/test_manager.py::TestDagFileProcessorManager::test_fetch_callbacks_ignores_other_bundlescloses #63374