You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We operate a centralized Airflow 3 installation that serves the entire company.
Each team owns a bundle of DAGs that may act as Producers or Consumers.
Producer DAGs emit asset events when they complete.
Consumer DAGs should start only after all required producer DAGs have finished.
Each asset event includes an extra field containing a customerId.
The goal is to trigger a consumer DAG only when all producer asset events for the same customer have been published.
🟥 The Issue
I initially considered using Airflow’s Asset Scheduling feature and composing conditions like:
producer1Asset & producer2Asset
However, the challenge is that each asset event contains a customerId, and Airflow’s asset scheduling does not natively support:
Grouping or correlating asset events by a dynamic field such as customerId
Triggering a DAG only when all required assets for the same customer have arrived
Querying asset events by the extra field via the REST API
A workaround could be triggering the consumer DAG on every asset event and then querying the REST API to check whether all required events exist.
But this has two problems:
The REST API cannot filter by the extra field (e.g., customerId).
It would cause a large number of unnecessary DAG runs that immediately exit because not all prerequisites are met.
🟩 Proposed Direction / Possible Solution
One idea is to introduce a shared identifier such as:
traceId
correlationId
groupId
This ID would be included in each producer’s asset event.
The consumer DAG would then trigger only when all required asset events with the same correlationId have been published.
This raises the question:
Can Airflow’s asset scheduling or event‑based triggers support correlating asset events by a shared dynamic identifier (e.g., correlationId) and triggering a DAG only when all matching events are present?
I can say that:
I have a starting point that starts all the producers (and I can generate that correlation id over there).
I can have a very defined window where the assetsEvents can be published.
I couldn't find such feature in the docs. Do you think we can do it?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
Hi, thanks for finding the time to read this.
🟦 Current State
We operate a centralized Airflow 3 installation that serves the entire company.
Each team owns a bundle of DAGs that may act as Producers or Consumers.
extrafield containing acustomerId.The goal is to trigger a consumer DAG only when all producer asset events for the same customer have been published.
🟥 The Issue
I initially considered using Airflow’s Asset Scheduling feature and composing conditions like:
However, the challenge is that each asset event contains a
customerId, and Airflow’s asset scheduling does not natively support:customerIdextrafield via the REST APIA workaround could be triggering the consumer DAG on every asset event and then querying the REST API to check whether all required events exist.
But this has two problems:
extrafield (e.g.,customerId).🟩 Proposed Direction / Possible Solution
One idea is to introduce a shared identifier such as:
traceIdcorrelationIdgroupIdThis ID would be included in each producer’s asset event.
The consumer DAG would then trigger only when all required asset events with the same correlationId have been published.
This raises the question:
Can Airflow’s asset scheduling or event‑based triggers support correlating asset events by a shared dynamic identifier (e.g., correlationId) and triggering a DAG only when all matching events are present?
I can say that:
I couldn't find such feature in the docs. Do you think we can do it?
Beta Was this translation helpful? Give feedback.
All reactions