Skip to content

Commit 66082ac

Browse files
authored
v1.1.0 release (#6)
* refactor: todo format * change: pyproject.toml * change: handle get_bind exception from alembic * add: testcases for convert_task_group * change: rename AVAILABLE_OPERATORS to BUILTIN_OPERATORS * refactor: delete deprecated code * add: todo items * change: support op_module in _collect_dep_ast * add: sync operators from Airflow==2.10.4 * update: version to 1.1.0 * update: README * change: support private operators * add: testcases
1 parent cc007d2 commit 66082ac

File tree

277 files changed

+3401
-516
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

277 files changed

+3401
-516
lines changed

README.md

+122-29
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,26 @@
66

77
# AirFly: Auto Generate Airflow's `dag.py` On The Fly
88

9-
Pipeline management is crucial for efficient data operations within a company. Many engineering teams rely on tools like Airflow to help them organize workflows, including ETL processes, reporting pipelines, or machine learning projects.
9+
Effective data pipeline management is essential for a company's data operations. Many engineering teams rely on tools like Airflow to organize various batch processing tasks, such as ETL/ELT workflows, data reporting pipelines, machine learning projects and so on.
1010

11-
Airflow offers rich extensibility, allowing developers to arrange workloads into a sequence of tasks. These tasks are then declared within a `DAG` context in a `dag.py` file, specifying task dependencies.
11+
Airflow is a powerful tool for task scheduling and orchestration, allowing users to define workflows as "DAGs". A typical DAG represents a data pipeline and includes a series of tasks along with their dependencies.
1212

13-
As a workflow grows in complexity, the increasing intricacy of task relations can lead to confusion and disrupt the DAG structure. This complexity often results in decreased code maintainability, particularly in collaborative scenarios.
13+
As a pipeline grows in complexity, more tasks and interdependencies are added, which often lead to confusion and disrupt the structure of the DAG, resulting in lower code quality and making it more difficult to maintain and update, especially in collaborative environments.
1414

15-
`airfly` tries to alleviate such pain points and streamline the development life cycle. It operates under the assumption that all tasks are managed within a certain Python module. Developers define task dependencies while creating task objects. During deployment, `airfly` can resolve the dependency tree and automatically generate the `dag.py` for you.
15+
`airfly` aims to alleviate such challenges and streamline the development lifecycle. It assumes that tasks are encapsulated in a specific structure, with dependencies defined as part of the task attributes. During deployment, `airfly` recursively collects all tasks, resolves the dependency tree, and automatically generates the DAG.
1616

1717
<img src="https://github.com/ryanchao2012/airfly/blob/main/assets/graph-view.png?raw=true" width="800"></img>
1818
***airfly** helps you build complex dags*
1919

2020

2121

22-
2322
## Key Features
2423

25-
* `dag.py` automation: focus on your task, let airfly handle the rest.
26-
* No need to install Airflow: keep your environment lean.
27-
* support task group: a nice feature from Airflow 2.0+
28-
* support duck typing: flexible class inheritance.
24+
* `dag.py` Automation: focus on your tasks and let airfly handle the rest.
25+
* No Airflow Installation Required: keep your environment lean without the need for Airflow.
26+
* Task Group Support: a nice feature from Airflow 2.0+.
27+
* Duck Typing Support: flexible class inheritance for greater customization.
28+
2929

3030
## Install
3131

@@ -64,7 +64,8 @@ Options:
6464

6565
## How It Works
6666

67-
`airfly` assumes the tasks are populated in a Python module(or a package, e.g., `man_dag` in the below example), the dependencies are declared by assigning `upstream` or `downstream` attributes to each task. A task holds some attributes corresponding to an airflow operator, when `airfly` walks through the entire module, all tasks are discovered and collected, the dependency tree and the `DAG` context are auto-built, with some `ast` helpers, `airfly` can wrap the information, convert it into python code, and finally save them to `dag.py`.
67+
`airfly` assumes that tasks are defined within a Python module (or package, such as `main_dag` in the example below). Each task holds attributes corresponding to an Airflow operator, and the dependencies are declared by assigning `upstream` or `downstream`. As `airfly` walks through the module, it discovers and collects all tasks, resolves the dependency tree, and generates the `DAG` in Python code, which can then be saved as `dag.py`.
68+
6869

6970
```sh
7071
main_dag
@@ -85,7 +86,7 @@ main_dag
8586

8687
### Define your task with `AirFly`
8788

88-
Declare a task as following(see [demo](https://github.com/ryanchao2012/airfly/blob/main/examples/tutorial/demo.py)):
89+
Declare a task as follows(see [demo](https://github.com/ryanchao2012/airfly/blob/main/examples/tutorial/demo.py)):
8990

9091
```python
9192
# in demo.py
@@ -97,9 +98,8 @@ class print_date(AirFly):
9798
op_params = dict(bash_command="date")
9899

99100

100-
# during dag generation,
101-
# this class will be converted to airflow operator
102-
print_date._to_ast(print_date).show()
101+
# During DAG generation,
102+
# This class will be auto-converted to the following code:
103103
# examples_tutorial_demo_print_date = BashOperator(
104104
# task_id='examples.tutorial.demo.print_date',
105105
# bash_command='date',
@@ -109,9 +109,10 @@ print_date._to_ast(print_date).show()
109109
```
110110

111111
* `op_class (str)`: specifies the airflow operator to this task.
112-
* `op_params`: keyword arguments which will be passed to the airflow operator(`op_class`), a parameter (i.e., value in the dictionary) could be one of the [primitive types](https://docs.python.org/3/library/stdtypes.html), a function or a class.
112+
* `op_params`: keyword arguments which will be passed to the operator(`op_class`), a parameter (i.e., value in the dictionary) could be one of the [primitive types](https://docs.python.org/3/library/stdtypes.html), a function or a class.
113+
114+
You can also define the attributes using `property`:
113115

114-
You can also define the attributes by `property`:
115116
```python
116117
from airfly.model import AirFly
117118

@@ -122,40 +123,44 @@ class print_date(AirFly):
122123
def op_class(self):
123124
return "BashOperator"
124125

126+
def greeting(self) -> str:
127+
return "Hello World"
128+
125129
@property
126130
def op_params(self):
127-
return dict(bash_command="date")
131+
return dict(bash_command=f"echo {self.greeting()}")
128132

129-
print_date._to_ast(print_date).show()
133+
# Corresponding generated code:
130134
# examples_tutorial_demo_print_date = BashOperator(
131135
# task_id='examples.tutorial.demo.print_date',
132-
# bash_command='date',
136+
# bash_command='echo Hello World',
133137
# task_group=group_examples_tutorial_demo
134138
# )
135139

136140
```
137141

138-
By default, the class name(`print_date`) maps to `task_id` to the applied operator after dag generation. You can change this behavior by overriding `_get_taskid` as a classmethod, you have to make sure the task id is globally unique:
142+
By default, the class name (`print_date`) is used as the `task_id` for the applied operator after DAG generation. You can change this behavior by overriding `_get_taskid` as a class method. Make sure that the `task_id` is globally unique:
139143

140144
```python
141-
145+
import re
142146
from airfly.model import AirFly
143147

144148

145-
class print_date(AirFly):
149+
class PrintDate(AirFly):
146150
@classmethod
147151
def _get_taskid(cls):
148152
# customize the task id
149-
return f"my_task_{cls.__qualname__}"
153+
return "task_" + re.sub(r'(?<!^)(?=[A-Z])', '_', cls.__name__).lower()
154+
150155
op_class = "BashOperator"
151156
op_params = dict(bash_command="date")
152157

153158

154-
print_date._to_ast(print_date).show()
155-
# my_task_print_date = BashOperator(
156-
# task_id='my_task_print_date',
159+
# Corresponding generated code:
160+
# task_print_date = BashOperator(
161+
# task_id='task_print_date',
157162
# bash_command='date',
158-
# task_group=group_my_task_print_date
163+
# task_group=group_task_print_date
159164
# )
160165

161166
```
@@ -372,10 +377,11 @@ with DAG("demo_dag", **dag_kwargs) as dag:
372377

373378
```
374379

375-
`airfly` wraps required information including variables and imports into output python script, and pass the specified value to `DAG` object.
380+
As you can see, `airfly` wraps required information including variables and import dependencies into output code, and pass the specified value to `DAG` object.
376381

377382

378383
## Exclude tasks from codegen
384+
379385
By passing `--exclude-pattern` to match any unwanted objects with their `__qualname__`. then filter them out.
380386

381387
```
@@ -410,12 +416,99 @@ with DAG("demo_dag") as dag:
410416
The `templated` task is gone.
411417

412418

419+
### Operators Support
420+
421+
#### Built-in Operators
422+
423+
Operators defined in the official Airflow package, such as `BashOperator`, `PythonOperator`, and `KubernetesPodOperator`, are considered built-in, including those contributed by the community through various providers (e.g., Google, Facebook, OpenAI).
424+
425+
To use a built-in operator, assign `op_class` to its name and specify corresponding parameters using `op_params`:
426+
427+
```python
428+
from airfly.model import AirFly
429+
430+
def log_sql(**kwargs):
431+
print("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
432+
433+
class Task1(AirFly):
434+
435+
op_class = "KubernetesPodOperator"
436+
op_params = dict(
437+
image="debian",
438+
cmds=["bash", "-cx"],
439+
arguments=["echo", "10"],
440+
labels={"foo": "bar"},
441+
)
442+
443+
444+
class Task2(AirFly):
445+
446+
op_class = "PythonOperator"
447+
op_params = dict(
448+
python_callable=log_sql,
449+
templates_dict={"query": "sql/sample.sql"},
450+
templates_exts=[".sql"],
451+
)
452+
453+
```
454+
455+
Sometimes, operators may have a naming ambiguity. For instance, `EmailOperator` could refer to either [`airflow.operators.email.EmailOperator`](https://github.com/apache/airflow/blob/2.10.4/airflow/operators/email.py#L29) or [`airflow.providers.smtp.operators.smtp.EmailOperator`](https://github.com/apache/airflow/blob/2.10.4/airflow/providers/smtp/operators/smtp.py#L29). To resolve such ambiguities, specify the correct module using `op_module`:
456+
457+
458+
```python
459+
460+
from airfly.model import AirFly
461+
462+
class Task3(AirFly):
463+
464+
op_class = "EmailOperator"
465+
op_module = "airflow.providers.smtp.operators.smtp"
466+
op_params = dict(
467+
subject="Hello World",
468+
from_email="[email protected]",
469+
470+
)
471+
472+
```
473+
474+
This approach ensures that `Task3` explicitly references the `EmailOperator` from the `airflow.providers.smtp.operators.smtp` module, avoiding conflicts with similarly named operators.
475+
476+
477+
#### Private Operators
478+
479+
Operators not included in the official Airflow package are considered private. Developers often create custom operators by extending existing built-in ones to meet their use cases. Since these custom operators are not registered within Airflow, `airfly` cannot automatically infer them by name.
480+
481+
To use a private operator, provide its class definition directly in `op_class`:
482+
483+
```python
484+
# in my_package/operators.py
485+
from airflow.operators.bash import BashOperator
486+
487+
class EchoOperator(BashOperator):
488+
489+
def __init__(self, text: str, **kwargs):
490+
cmd = f"echo {text}"
491+
super().__init__(bash_command=cmd, **kwargs)
492+
493+
# in my_package/tasks.py
494+
from airfly.model import AirFly
495+
from my_package.operators import EchoOperator
496+
497+
class Task4(AirFly):
498+
op_class = EchoOperator
499+
op_params = dict(text="Hello World")
500+
501+
```
502+
503+
This approach enables seamless integration of private, custom-built operators with `airfly`.
504+
505+
413506
### Task Group
414507

415508
`airfly` defines `TaskGroup` in the DAG context and assigns `task_group` to each operator for you.
416509
It maps the module hierarchy to the nested group structure,
417510
so the tasks in the same python module will be grouped closer.
418-
If you don't like this feature, pass `--task-group`/`-g` with `False` to disable it.
511+
If you don't like this feature, pass `--task-group`/`-g` with `0` to disable it.
419512

420513

421514
## Duck Typing

pyproject.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "airfly"
3-
version = "1.0.0"
3+
version = "1.1.0"
44
description = "Auto generate Airflow's dag.py on the fly"
55
authors = [
66
{name = "ryanchao2012", email = "[email protected]"},
@@ -33,7 +33,7 @@ dependencies = [
3333
[tool.pdm]
3434
distribution = true
3535

36-
[tool.pdm.dev-dependencies]
36+
[project.optional-dependencies]
3737
dev = [
3838
"ipython>=8.12.3",
3939
"pytest>=8.1.1",

src/airfly/_meta.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
name = "airfly"
5-
version = "1.0.0"
5+
version = "1.1.0"
66
description = "Auto generate Airflow's dag.py on the fly"
77
authors = [{"name": "ryanchao2012", "email": "[email protected]"}]
88
readme = "README.md"
@@ -11,7 +11,10 @@
1111
"Programming Language :: Python :: 3.8",
1212
"Programming Language :: Python :: 3.9",
1313
"Programming Language :: Python :: 3.10",
14+
"Programming Language :: Python :: 3.11",
15+
"Programming Language :: Python :: 3.12",
1416
]
17+
requires_python = ">=3.8"
1518
dependencies = [
1619
"attrs",
1720
"cattrs",
@@ -22,5 +25,17 @@
2225
"click",
2326
"libcst",
2427
"asttrs",
28+
"loguru",
2529
]
30+
optional_dependencies = {
31+
"dev": [
32+
"ipython>=8.12.3",
33+
"pytest>=8.1.1",
34+
"invoke>=2.2.0",
35+
"pytest-cov>=4.1.0",
36+
"gutt>=1.1.0",
37+
"toml>=0.10.2",
38+
"pytest-mock>=3.14.0",
39+
]
40+
}
2641
scripts = {"airfly": "airfly.cli.main:main"}

src/airfly/_vendor/abc.py

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Auto generated by 'inv collect-airflow'
2+
3+
4+
class ABC:
5+
pass

src/airfly/_vendor/airflow/decorators/base.py

-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@ class DecoratedOperator(BaseOperator):
77
task_id: "str"
88
op_args: "Collection[Any] | None"
99
op_kwargs: "Mapping[str, Any] | None"
10-
multiple_outputs: "bool"
1110
kwargs_to_upstream: "dict[str, Any] | None"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Auto generated by 'inv collect-airflow'
2+
from airfly._vendor.airflow.decorators.base import DecoratedOperator
3+
from airfly._vendor.airflow.operators.bash import BashOperator
4+
5+
6+
class _BashDecoratedOperator(DecoratedOperator, BashOperator):
7+
python_callable: "Callable"
8+
op_args: "Collection[Any] | None"
9+
op_kwargs: "Mapping[str, Any] | None"

src/airfly/_vendor/airflow/decorators/branch_virtualenv.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55

66
class _BranchPythonVirtualenvDecoratedOperator(
7-
_PythonDecoratedOperator, BranchPythonVirtualenvOperator
7+
_PythonDecoratedOperator,
8+
BranchPythonVirtualenvOperator,
89
):
910
python_callable: "_empty"
1011
op_args: "_empty"

src/airfly/_vendor/airflow/example_dags/example_skip_dag.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class EmptySkipOperator(BaseOperator):
2222
params: "collections.abc.MutableMapping | None"
2323
default_args: "dict | None"
2424
priority_weight: "int"
25-
weight_rule: "str"
25+
weight_rule: "str | PriorityWeightStrategy"
2626
queue: "str"
2727
pool: "str | None"
2828
pool_slots: "int"
@@ -38,16 +38,22 @@ class EmptySkipOperator(BaseOperator):
3838
"None | TaskStateChangeCallback | list[TaskStateChangeCallback]"
3939
)
4040
on_retry_callback: "None | TaskStateChangeCallback | list[TaskStateChangeCallback]"
41+
on_skipped_callback: (
42+
"None | TaskStateChangeCallback | list[TaskStateChangeCallback]"
43+
)
4144
pre_execute: "TaskPreExecuteHook | None"
4245
post_execute: "TaskPostExecuteHook | None"
4346
trigger_rule: "str"
4447
resources: "dict[str, Any] | None"
4548
run_as_user: "str | None"
4649
task_concurrency: "int | None"
50+
map_index_template: "str | None"
4751
max_active_tis_per_dag: "int | None"
4852
max_active_tis_per_dagrun: "int | None"
53+
executor: "str | None"
4954
executor_config: "dict | None"
5055
do_xcom_push: "bool"
56+
multiple_outputs: "bool"
5157
inlets: "Any | None"
5258
outlets: "Any | None"
5359
task_group: "TaskGroup | None"
@@ -56,4 +62,6 @@ class EmptySkipOperator(BaseOperator):
5662
doc_json: "str | None"
5763
doc_yaml: "str | None"
5864
doc_rst: "str | None"
65+
task_display_name: "str | None"
5966
logger_name: "str | None"
67+
allow_nested_operators: "bool"

0 commit comments

Comments
 (0)