Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/authoring-and-scheduling/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

.. _asset_definitions:

Asset definitions
Asset Definitions
=================

.. versionadded:: 2.4
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/docs/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
specific language governing permissions and limitations
under the License.

Data-aware scheduling
=====================
Asset-Aware Scheduling
======================

.. versionadded:: 2.4

Expand Down
7 changes: 5 additions & 2 deletions airflow-core/docs/authoring-and-scheduling/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Authoring and Scheduling
Here you can find detailed documentation about advanced authoring and scheduling airflow dags.
It's recommended that you first review the pages in :doc:`core concepts </core-concepts/index>`

.. _authoring-section:

**Authoring**

.. toctree::
Expand All @@ -32,6 +34,7 @@ It's recommended that you first review the pages in :doc:`core concepts </core-c
dynamic-task-mapping
assets

.. _scheduling-section:

**Scheduling**

Expand All @@ -40,6 +43,6 @@ It's recommended that you first review the pages in :doc:`core concepts </core-c

cron
timezone
Data-ware scheduling with assets <datasets>
Asset-Aware Scheduling <datasets>
timetable
Event-driven scheduling <event-scheduling>
Event-Driven Scheduling <event-scheduling>
8 changes: 4 additions & 4 deletions airflow-core/docs/best-practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ There are certain limitations and overhead introduced by this operator:
same worker might be affected by previous tasks creating/modifying files etc.

You can see detailed examples of using :class:`airflow.providers.standard.operators.python.PythonVirtualenvOperator` in
:ref:`Taskflow Virtualenv example <taskflow/virtualenv_example>`
:ref:`this section in the Taskflow API tutorial <taskflow-dynamically-created-virtualenv>`.


Using ExternalPythonOperator
Expand Down Expand Up @@ -1083,7 +1083,7 @@ The nice thing about this is that you can switch the decorator back at any time
developing it "dynamically" with ``PythonVirtualenvOperator``.

You can see detailed examples of using :class:`airflow.providers.standard.operators.python.ExternalPythonOperator` in
:ref:`Taskflow External Python example <taskflow/external_python_example>`
:ref:`Taskflow External Python example <taskflow-external-python-environment>`

Using DockerOperator or Kubernetes Pod Operator
-----------------------------------------------
Expand Down Expand Up @@ -1147,9 +1147,9 @@ The drawbacks:
containers etc. in order to author a DAG that uses those operators.

You can see detailed examples of using :class:`airflow.operators.providers.Docker` in
:ref:`Taskflow Docker example <taskflow/docker_example>`
:ref:`Taskflow Docker example <taskflow-docker_environment>`
and :class:`airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator`
:ref:`Taskflow Kubernetes example <taskflow/kubernetes_example>`
:ref:`Taskflow Kubernetes example <tasfklow-kpo>`

Using multiple Docker Images and Celery Queues
----------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/docs/core-concepts/taskflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ Sensors and the TaskFlow API
.. versionadded:: 2.5.0

For an example of writing a Sensor using the TaskFlow API, see
:ref:`Using the TaskFlow API with Sensor operators <taskflow/task_sensor_example>`.
:ref:`Using the TaskFlow API with Sensor operators <taskflow-using-sensors>`.

History
-------
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/docs/tutorial/fundamentals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@



Airflow Fundamentals
====================
Airflow 101: Building Your First Workflow
=========================================
Welcome to world of Apache Airflow! In this tutorial, we'll guide you through the essential concepts of Airflow, helping
you understand how to write your first DAG. Whether you're familiar with Python or just starting out, we'll make the
journey enjoyable and straightforward.
Expand Down
160 changes: 97 additions & 63 deletions airflow-core/docs/tutorial/objectstorage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,117 +18,151 @@



Object Storage
==============
Cloud-Native Workflows with Object Storage
==========================================

This tutorial shows how to use the Object Storage API to manage objects that
reside on object storage, like S3, gcs and azure blob storage. The API is introduced
as part of Airflow 2.8.
.. versionadded:: 2.8

The tutorial covers a simple pattern that is often used in data engineering and data
science workflows: accessing a web api, saving and analyzing the result.
Welcome to the final tutorial in our Airflow series! By now, you've built DAGs with Python and the Taskflow API, passed
data with XComs, and chained tasks together into clear, reusable workflows.

In this tutorial we'll take it a step further by introducing the **Object Storage API**. This API makes it easier to
read from and write to cloud storage -- like Amazon S3, Google Cloud Storage (GCS), or Azure Blob Storage -- without
having to worry about provider-specific SDKs or low-level credentials management.

We'll walk you through a real-world use case:

1. Pulling data from a public API
2. Saving that data to object storage in Parquet format
3. Analyzing it using SQL with DuckDB

Along the way, we'll highlight the new ``ObjectStoragePath`` abstraction, explain how Airflow handles cloud credentials via
connections, and show how this enables portable, cloud-agnostic pipelines.

Why This Matters
----------------

Many data workflows depend on files -- whether it's raw CSVs, intermediate Parquet files, or model artifacts.
Traditionally, you'd need to write S3-specific or GCS-specific code for this. Now, with ``ObjectStoragePath``, you can
write generic code that works across providers, as long as you've configured the right Airflow connection.

Let's get started!

Prerequisites
-------------
To complete this tutorial, you need a few things:

- DuckDB, an in-process analytical database,
which can be installed by running ``pip install duckdb``.
- An S3 bucket, along with the Amazon provider including ``s3fs``. You can install
the provider package by running
``pip install apache-airflow-providers-amazon[s3fs]``.
Alternatively, you can use a different storage provider by changing the URL in
the ``create_object_storage_path`` function to the appropriate URL for your
provider, for example by replacing ``s3://`` with ``gs://`` for Google Cloud
Storage, and installing a different provider.
- ``pandas``, which you can install by running ``pip install pandas``.
Before diving in, make sure you have the following:

- **DuckDB**, an in-process SQL database: Install with ``pip install duckdb``
- **Amazon S3 access** and **Amazon Provider with s3fs**: ``pip install apache-airflow-providers-amazon[s3fs]``
(You can substitute your preferred provider by changing the storage URL protocol and installing the relevant provider.)
- **Pandas** for working with tabular data: ``pip install pandas``

Creating an ObjectStoragePath
-----------------------------

The ObjectStoragePath is a path-like object that represents a path on object storage.
It is the fundamental building block of the Object Storage API.
At the heart of this tutorial is ``ObjectStoragePath``, a new abstraction for handling paths on cloud object stores.
Think of it like ``pathlib.Path``, but for buckets instead of filesystems.

.. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py
:language: python
:start-after: [START create_object_storage_path]
:end-before: [END create_object_storage_path]

The username part of the URL given to ObjectStoragePath should be a connection ID.
The specified connection will be used to obtain the right credentials to access
the backend. If it is omitted, the default connection for the backend will be used.
|

The URL syntax is simple: ``protocol://bucket/path/to/file``

The connection ID can alternatively be passed in with a keyword argument:
- The ``protocol`` (like ``s3``, ``gs`` or ``azure``) determines the backend
- The "username" part of the URL can be a ``conn_id``, telling Airflow how to authenticate
- If the ``conn_id`` is omitted, Airflow will fall back to the default connection for that backend

You can also provide the ``conn_id`` as keyword argument for clarity:

.. code-block:: python

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

This is useful when reusing a URL defined for another purpose (e.g. Asset),
which generally does not contain a username part. The explicit keyword argument
takes precedence over the URL's username value if both are specified.
This is especially handy when reusing a path defined elsewhere (like in an Asset), or when the connection isn't baked
into the URL. The keyword argument always takes precedence.

It is safe to instantiate an ObjectStoragePath at the root of your DAG. Connections
will not be created until the path is used. This means that you can create the
path in the global scope of your DAG and use it in multiple tasks.
.. tip:: You can safely create an ``ObjectStoragePath`` in your global DAG scope. Connections are resolved only when the
path is used, not when it's created.

Saving data to Object Storage
Saving Data to Object Storage
-----------------------------

An ObjectStoragePath behaves mostly like a pathlib.Path object. You can
use it to save and load data directly to and from object storage. So, a typical
flow could look like this:
Let's fetch some data and save it to the cloud.

.. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py
:language: python
:start-after: [START get_air_quality_data]
:end-before: [END get_air_quality_data]

The ``get_air_quality_data`` calls the API of the Finnish Meteorological Institute
to obtain the air quality data for the region of Helsinki. It creates a
Pandas DataFrame from the resulting json. It then saves the data to object storage
and converts it on the fly to parquet.
|

Here's what's happening:

The key of the object is automatically generated from the logical date of the task,
so we could run this everyday and it would create a new object for each day. We
concatenate this key with the base path to create the full path to the object. Finally,
after writing the object to storage, we return the path to the object. This allows
us to use the path in the next task.
- We call a public API from the Finnish Meteorological Institute for Helsinki air quality data
- The JSON response is parsed into a pandas DataFrame
- We generate a filename based on the task's logical date
- Using ``ObjectStoragePath``, we write the data directly to cloud storage as Parquet

Analyzing the data
------------------
This is a classic Taskflow pattern. The object key changes each day, allowing us to run this daily and build a dataset
over time. We return the final object path to be used in the next task.

In understanding the data, you typically want to analyze it. Duck DB is a great
tool for this. It is an in-process analytical database that allows you to run
SQL queries on data in memory.
Why this is cool: No boto3, no GCS client setup, no credentials juggling. Just simple file semantics that work across
storage backends.

Because the data is already in parquet format, we can use the ``read_parquet`` and
because both Duck DB and the ObjectStoragePath use ``fsspec`` we can register the
backend of the ObjectStoragePath with Duck DB. ObjectStoragePath exposes the ``fs``
property for this. We can then use the ``register_filesystem`` function from Duck DB
to register the backend with Duck DB.
Analyzing the Data with DuckDB
------------------------------

In Duck DB we can then create a table from the data and run a query on it. The
query is returned as a dataframe, which could be used for further analysis or
saved to object storage.
Now let's analyze that data using SQL with DuckDB.

.. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py
:language: python
:start-after: [START analyze]
:end-before: [END analyze]

You might note that the ``analyze`` function does not know the original
path to the object, but that it is passed in as a parameter and obtained
through XCom. You do not need to re-instantiate the Path object. Also
the connection details are handled transparently.
|

Putting it all together
-----------------------
A few key things to note:

The final DAG looks like this, which wraps things so that we can run it:
- DuckDB supports reading Parquet natively
- DuckDB and ObjectStoragePath both rely on ``fsspec``, which makes it easy to register the object storage backend
- We use ``path.fs`` to grab the right filesystem object and register it with DuckDB
- Finally, we query the Parquet file using SQL and return a pandas DataFrame

Notice that the function doesn't recreate the path manually -- it gets the full path from the previous task using Xcom.
This makes the task portable and decoupled from earlier logic.

Bringing It All Together
------------------------

Here's the full DAG that ties everything together:

.. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]

|

You can trigger this DAG and view it in the Graph View in the Airflow UI. Each task logs its inputs and outputs clearly,
and you can inspect returned paths in the Xcom tab.

What to Explore Next
--------------------

Here are some ways to take this further:

- Use object sensors (like ``S3KeySensor``) to wait for files uploaded by external systems
- Orchestrate S3-to-GCS transfers or cross-region data syncs
- Add branching logic to handle missing or malformed files
- Experiment with different formats like CSV or JSON

**See Also**

- Learn how to securely access cloud services by configuring Airflow connections in the :doc:`Managing Connections guide <../authoring-and-scheduling/connections>`
- Build event-driven pipelines that respond to file uploads or external triggers using the :doc:`Event-Driven Scheduling framework <../authoring-and-scheduling/event-scheduling>`
- Reinforce your understanding of decorators, return values, and task chaining with the :doc:`TaskFlow API guide <../core-concepts/taskflow>`
Loading