-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerator.py
147 lines (121 loc) · 4.3 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
This function generates the individual (or parallel) operators
to be used in a flow.
It returns the following dictionary, with information
about the single operator that has been computed.
{
'operator': Operator[] | Operator,
'operator_id': str,
'entities': {}<string, Operator>
}
Params:
- dag: The Dag object to be used.
- operator: The operator to be computed. This operator object has
the following structure:
{
'id': str, # Operator identifier.
'type': Operator, # Operator to be used.
'params': {} # Operator specific parameters.
}
"""
def generate_operator(dag, operator):
# This operator is parallel.
if 'parallel' in operator:
# Output object.
output = {
# List of parallel operators.
'operator': [],
# Operator ID, so it can be referenced later.
'operator_id': operator['id'],
# Dict containing all the different operators used in this parallel.
'entities': {}
}
# Compute all the operators in the parallel.
for op in operator['parallel']:
# Compute the operator.
computed = generate_operator(dag, op)
# Update the parallel operators list and the entities.
output['operator'].append(computed['operator'])
output['entities'].update(computed['entities'])
return output
# Operator default parameters.
params = {
'dag': dag,
'task_id': operator['id']
}
# Update default parameters with individual operator parameters.
params.update(operator['params'])
# Generate the operator.
computed_operator = operator['type'](**params)
# Return operator information.
return {
'operator': computed_operator,
'operator_id': operator['id'],
'entities': {
operator['id']: computed_operator
}
}
"""
This function generates a flow by the given tasks.
In this case this function returns the following dictionary,
with all the related information.
{
'flow': Operator, # Generated flow.
'entities': {} # All the Operator entities used.
}
"""
def generate_flow(dag, tasks, dependencies):
n_tasks = len(tasks)
# There are no tasks, return.
if n_tasks == 0:
return
# There is only one task, compute it and return it.
elif n_tasks == 1:
computed_operator = generate_operator(dag, tasks[0])
flow = computed_operator['operator']
if len(dependencies) > 0:
return {
'flow': dependencies[0] >> flow,
'entities': computed_operator['entities']
}
return {
'flow': flow,
'entities': computed_operator['entities']
}
else:
entities = {}
generated_operator = generate_operator(dag, tasks[0])
flow = generated_operator['operator']
if len(dependencies) > 0:
flow = dependencies[0] >> flow
entities.update(generated_operator['entities'])
del tasks[0]
for operator in tasks:
generated_operator = generate_operator(dag, operator)
entities.update(generated_operator['entities'])
flow = flow >> generated_operator['operator']
return {
'flow': flow,
'entities': entities
}
"""
This function generates the airflow flows.
"""
def generate_airflows(dag, flows):
n_flows = len(flows)
if n_flows == 0:
yield None
elif n_flows == 1:
generated_flow = generate_flow(dag, flows[0], [])
yield generated_flow['flow']
else:
entities = {}
generated_flows = []
for flow in flows:
dependencies = []
if 'depends_on' in flow:
dependencies.append(entities[flow['depends_on']])
generated_flow = generate_flow(dag, flow['tasks'], dependencies)
current_flow = generated_flow['flow']
entities.update(generated_flow['entities'])
yield current_flow