Skip to content

Fix deadlock created by API#66765

Open
Usuychik wants to merge 5 commits into
apache:mainfrom
Usuychik:main
Open

Fix deadlock created by API#66765
Usuychik wants to merge 5 commits into
apache:mainfrom
Usuychik:main

Conversation

@Usuychik
Copy link
Copy Markdown

After local test for Pull Request #65920 was found that there are was still deadlocks in Postgres DB
Regarding Issue #65818
related: #65836

2026-05-12 10:25:09.658 UTC [pid_XXXXXX] ERROR: deadlock detected 2026-05-12 10:25:09.658 UTC [pid_XXXXXX] DETAIL: Process pid_XXXXXX waits for ShareLock on transaction txn_XXXXXXXX; blocked by process pid_YYYYYY. Process pid_YYYYYY waits for ShareLock on transaction txn_YYYYYYYY; blocked by process pid_XXXXXX. Process pid_XXXXXX: SELECT task_instance.id AS task_instance_id, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.try_number AS task_instance_try_number, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.custom_operator_name AS task_instance_custom_operator_name, task_instance.queued_dttm AS task_instance_queued_dttm Process pid_YYYYYY: UPDATE task_instance SET updated_at='2026-05-12T10:25:08.658913+00:00'::timestamptz, dag_version_id='xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'::uuid::UUID WHERE task_instance.dag_id = 'dag-name-redacted' AND task_instance.run_id = 'manual__2026-05-12T10:24:21.114597+00:00' AND task_instance.state IN (NULL, 'restarting', 'scheduled', 'up_for_retry', 'running', 'up_for_reschedule', 'deferred', 'queued') 2026-05-12 10:25:09.658 UTC [pid_XXXXXX] HINT: See server log for query details. 2026-05-12 10:25:09.658 UTC [pid_XXXXXX] CONTEXT: while locking tuple (XXXX,X) in relation "dag_run" 2026-05-12 10:25:09.658 UTC [pid_XXXXXX] STATEMENT: SELECT task_instance.id AS task_instance_id, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.try_number AS task_instance_try_number, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.custom_operator_name AS task_instance_custom_operator_name, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.scheduled_dttm AS task_instance_scheduled_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.last_heartbeat_at AS task_instance_last_heartbeat_at, task_instance.pid AS task_instance_pid, task_instance.executor AS task_instance_executor, task_instance.executor_config AS task_instance_executor_config, task_instance.updated_at AS task_instance_updated_at, task_instance.rendered_map_index AS task_instance_rendered_map_index, task_instance.context_carrier AS task_instance_context_carrier, task_instance.span_status AS task_instance_span_status, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, task_instance.task_display_name AS task_instance_task_display_name, task_instance.dag_version_id AS task_instance_dag_version_id, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.logical_date AS dag_run_1_logical_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.state AS dag_run_1_state, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.triggered_by AS dag_run_1_triggered_by, dag_run_1.triggering_user_name AS dag_run_1_triggering_user_name, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.run_after AS dag_run_1_run_after, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.log_template_id AS dag_run_1_log_template_id, dag_run_1.created_at AS dag_run_1_created_at, dag_run_1.updated_at AS dag_run_1_updated_at, dag_run_1.clear_number AS dag_run_1_clear_number, dag_run_1.backfill_id AS dag_run_1_backfill_id, dag_run_1.bundle_version AS dag_run_1_bundle_version, dag_run_1.scheduled_by_job_id AS dag_run_1_scheduled_by_job_id, dag_run_1.context_carrier AS dag_run_1_context_carrier, dag_run_1.span_status AS dag_run_1_span_status, dag_run_1.created_dag_version_id AS dag_run_1_created_dag_version_id, dag_run_1.partition_key AS dag_run_1_partition_key, dag_run_1.partition_date AS dag_run_1_partition_date FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id WHERE task_instance.id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'::uuid::UUID FOR UPDATE 2026-05-12 10:25:09.664 UTC [pid_XXXXXX] ERROR: current transaction is aborted, commands ignored until end of transaction block 2026-05-12 10:25:09.664 UTC [pid_XXXXXX] STATEMENT: SELECT task_instance.id AS task_instance_id, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.try_number AS task_instance_try_number, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.custom_operator_name AS task_instance_custom_operator_name, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.scheduled_dttm AS task_instance_scheduled_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.last_heartbeat_at AS task_instance_last_heartbeat_at, task_instance.pid AS task_instance_pid, task_instance.executor AS task_instance_executor, task_instance.executor_config AS task_instance_executor_config, task_instance.updated_at AS task_instance_updated_at, task_instance.rendered_map_index AS task_instance_rendered_map_index, task_instance.context_carrier AS task_instance_context_carrier, task_instance.span_status AS task_instance_span_status, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, task_instance.task_display_name AS task_instance_task_display_name, task_instance.dag_version_id AS task_instance_dag_version_id, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.logical_date AS dag_run_1_logical_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.state AS dag_run_1_state, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.triggered_by AS dag_run_1_triggered_by, dag_run_1.triggering_user_name AS dag_run_1_triggering_user_name, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.run_after AS dag_run_1_run_after, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.log_template_id AS dag_run_1_log_template_id, dag_run_1.created_at AS dag_run_1_created_at, dag_run_1.updated_at AS dag_run_1_updated_at, dag_run_1.clear_number AS dag_run_1_clear_number, dag_run_1.backfill_id AS dag_run_1_backfill_id, dag_run_1.bundle_version AS dag_run_1_bundle_version, dag_run_1.scheduled_by_job_id AS dag_run_1_scheduled_by_job_id, dag_run_1.context_carrier AS dag_run_1_context_carrier, dag_run_1.span_status AS dag_run_1_span_status, dag_run_1.created_dag_version_id AS dag_run_1_created_dag_version_id, dag_run_1.partition_key AS dag_run_1_partition_key, dag_run_1.partition_date AS dag_run_1_partition_date FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id WHERE task_instance.id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'::uuid::UUID FOR UPDATE

Pull request contains #65920 work also

@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:Scheduler including HA (high availability) scheduler area:task-sdk area:Triggerer labels May 12, 2026
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented May 12, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example Dag that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

state=State.RUNNING,
start_date=DEFAULT_START_DATE,
)
session.commit()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible , could we use this

def create_session(scoped: bool = True) -> Generator[SASession, None, None]:

with create_session so we don't need to commit explicitly ?

WDYT ?

Copy link
Copy Markdown
Author

@Usuychik Usuychik May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can be wrong, cos don`t know so deep airflow stack, but from observation:
Why session.commit() is required here:
The HTTP client that hits the route handler opens its own separate DB session. The create_task_instance data must be committed before the client request so the route's session can see it. This is why every single test in
TestTIUpdateState does:
ti = create_task_instance(...)
session.commit() # ← makes the TI visible to the route's session
response = client.patch(...)

Copy link
Copy Markdown
Contributor

@Prab-27 Prab-27 May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks !! will take a look and get back to you

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ! we need this session.commit() here ! Thanks !!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:Scheduler including HA (high availability) scheduler area:task-sdk area:Triggerer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants