Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,26 +216,35 @@ def resolve_asset_uri_ref_dag_dep(self, dep_data: dict) -> Iterator[DagDependenc

def resolve_asset_alias_dag_dep(self, dep_data: dict) -> Iterator[DagDependency]:
dep_id = dep_data["dependency_id"]
for asset_id, asset_name in self.alias_names_to_asset_ids_names[dep_id]:
is_source_alias = dep_data["source"] == "asset-alias"
yield from [
assets = self.alias_names_to_asset_ids_names[dep_id]
if assets:
for asset_id, asset_name in assets:
is_source_alias = dep_data["source"] == "asset-alias"
# asset
DagDependency(
yield DagDependency(
source="asset" if is_source_alias else f"asset-alias:{dep_id}",
target=f"asset-alias:{dep_id}" if is_source_alias else "asset",
label=asset_name,
dependency_type="asset",
dependency_id=str(asset_id),
),
)
# asset alias
DagDependency(
yield DagDependency(
source=f"asset:{asset_id}" if is_source_alias else dep_data["source"],
target=dep_data["target"] if is_source_alias else f"asset:{asset_id}",
label=dep_id,
dependency_type="asset-alias",
dependency_id=dep_id,
),
]
)
else:
yield DagDependency(
source=dep_data["source"],
target=dep_data["target"],
# handle the case that serialized_dag does not have label column (e.g., from 2.x)
label=dep_data.get("label", dep_id),
dependency_type=dep_data["dependency_type"],
dependency_id=dep_id,
)


class SerializedDagModel(Base):
Expand Down Expand Up @@ -656,7 +665,6 @@ def load_json(deps_data):
if load_json is not None
else query.all()
)

resolver = _DagDependenciesResolver(dag_id_dependencies=iterator, session=session)
dag_depdendencies_by_dag = resolver.resolve()
return dag_depdendencies_by_dag
Expand Down
80 changes: 78 additions & 2 deletions airflow-core/src/airflow/serialization/dag_dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,85 @@
@dataclass(frozen=True, order=True)
class DagDependency:
"""
Dataclass for representing dependencies between DAGs.
Dataclass for representing dependencies between dags.

These are calculated during serialization and attached to serialized DAGs.
These are calculated during serialization and attached to serialized dags.

The source and target keys store the information of what component depends on what.
For an asset related dependency, a root node will have the source value equal to its dependency_type and
an end node will have the target value equal to its dependency_type. It's easier to explain by examples.

For the example below,

.. code-block:: python

# we assume the asset is active
DAG(dag_id="dag_1", schedule=[Asset.ref(uri="uri")])

we get dag dependency like

.. code-block:: python

DagDependency(
source="asset",
target="dag_1",
label="name", # asset name, we always use asset name as label
dependency_type="asset",
dependency_id=1, # asset id
)

This will look like `Asset name` -> `Dag dag_1` on the dependency graph. This is a root asset node as it
has the source value as asset, and it points to its target "dag_1"

For more complex dependency like asset alias,

.. code-block:: python

# we assume the asset is active
DAG(
dag_id="dag_2",
schedule=[
AssetAlias(name="alias_1"), # resolved into Asset(uri="uri", name="name")
AssetAlias(name="alias_2"), # resolved to nothing
],
)

we'll need to store more data,

.. code-block:: python

[
DagDependency(
source="asset",
target="asset-alias:alias_1",
label="name",
dependency_type="asset",
dependency_id="1",
),
DagDependency(
source="asset:1",
target="dag_2",
label="alias_1",
dependency_type="asset-alias",
dependency_id="alias_1",
),
DagDependency(
source="asset-alias",
target="dag_2",
label="alias_2",
dependency_type="asset-alias",
dependency_id="alias_2",
),
]


We want it to look like `Asset name` -> `AssetAlias alias_1` -> `Dag dag_1` on the dependency graph. The
first node here is a root node point to an asset alias. Thus, its target is set to the asset we're point
to. The second node represents the asset alias points to this asset and then this asset points to the dag.
The third node represents a dependency between an asset alias and dag directly as it's not resolved.

For asset ref cases, it works similar to asset if it's a valid asset ref. If not, it works the same as
an unresolved asset alias.
"""

source: str
Expand Down
57 changes: 55 additions & 2 deletions airflow-core/tests/unit/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
from sqlalchemy import func, select, update

import airflow.example_dags as example_dags_module
from airflow.models.asset import AssetActive, AssetModel
from airflow.models.asset import AssetActive, AssetAliasModel, AssetModel
from airflow.models.dag import DAG as SchedulerDAG, DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DagBag
from airflow.models.serialized_dag import SerializedDagModel as SDM
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG, Asset, task as task_decorator
from airflow.sdk import DAG, Asset, AssetAlias, task as task_decorator
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG
from airflow.settings import json
Expand Down Expand Up @@ -431,6 +431,59 @@ def test_get_dependencies_with_asset_ref(self, dag_maker, session):

db.clear_db_assets()

def test_get_dependencies_with_asset_alias(self, dag_maker, session):
db.clear_db_assets()

asset_name = "name"
asset_uri = "test://asset1"
asset_id = 1

asset_model = AssetModel(id=asset_id, uri=asset_uri, name=asset_name)
aam1 = AssetAliasModel(name="alias_1") # resolve to asset
aam2 = AssetAliasModel(name="alias_2") # resolve to nothing

session.add_all([aam1, aam2, asset_model, AssetActive.for_asset(asset_model)])
aam1.assets.append(asset_model)
session.commit()

with dag_maker(
dag_id="test_get_dependencies_with_asset_alias",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[AssetAlias(name="alias_1"), AssetAlias(name="alias_2")],
) as dag:
BashOperator(task_id="any", bash_command="sleep 5")
dag.sync_to_db()
SDM.write_dag(dag, bundle_name="testing")

dependencies = SDM.get_dag_dependencies(session=session)
assert dependencies == {
"test_get_dependencies_with_asset_alias": [
DagDependency(
source="asset",
target="asset-alias:alias_1",
label="name",
dependency_type="asset",
dependency_id="1",
),
DagDependency(
source="asset:1",
target="test_get_dependencies_with_asset_alias",
label="alias_1",
dependency_type="asset-alias",
dependency_id="alias_1",
),
DagDependency(
source="asset-alias",
target="test_get_dependencies_with_asset_alias",
label="alias_2",
dependency_type="asset-alias",
dependency_id="alias_2",
),
]
}

db.clear_db_assets()

@pytest.mark.parametrize("min_update_interval", [0, 10])
@mock.patch.object(DagVersion, "get_latest_version")
def test_min_update_interval_is_respected(
Expand Down