-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathupstream_dag_1.py
229 lines (198 loc) · 7.26 KB
/
upstream_dag_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""
Retrieve weather information for a list of cities.
This DAG retrieves the latitude and longitude coordinates for each city provided
in a DAG param and uses these coordinates to get weather information from
a weather API.
"""
from airflow.decorators import dag, task
from airflow.providers.http.operators.http import HttpOperator
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.datasets import Dataset
from pendulum import datetime, duration
import logging
t_log = logging.getLogger("airflow.task")
@dag(
dag_display_name="Solution upstream DAG 1 🌤️",
start_date=datetime(2024, 6, 1),
schedule=None,
max_consecutive_failed_dag_runs=10,
catchup=False,
doc_md=__doc__,
description="Retrieve weather information for a list of cities.",
default_args={
"owner": "Astro",
"retries": 3,
"retry_delay": duration(minutes=1),
"retry_exponential_backoff": True,
},
params={
"my_cities": Param(
["Bern", "Zurich", "Lausanne"],
type="array",
title="Cities of interest:",
description="Enter the cities you want to retrieve weather info for. One city per line.",
),
"weather_parameter": Param(
"temperature_2m",
type="string",
enum=[
"temperature_2m",
"relative_humidity_2m",
"precipitation",
"cloud_cover",
"wind_speed_10m",
],
),
"timeframe": Param(
"today",
type="string",
enum=["today", "yesterday", "today_and_tomorrow"],
title="Forecast vs. Past Data",
description="Choose whether you want to retrieve weather data/forecasts for yesterday, today or tomorrow.",
),
"simulate_api_failure": Param(
False,
type="boolean",
title="Simulate API failure",
description="Set to true to simulate an API failure in the get_lat_long_for_one_city task.",
),
"simulate_task_delay": Param(
0,
type="number",
title="Simulate task delay",
description="Set the number of seconds to delay the create_weather_table task of this DAG.",
),
},
tags=["solution"],
)
def upstream_dag_1_solution():
@task
def get_cities(**context) -> list:
return context["params"]["my_cities"]
cities = get_cities()
@task
def get_lat_long_for_one_city(city: str, **context) -> dict:
"""Converts a string of a city name provided into
lat/long coordinates."""
import requests
if context["params"]["simulate_api_failure"]:
raise Exception("Simulated API failure.")
r = requests.get(f"https://photon.komoot.io/api/?q={city}")
long = r.json()["features"][0]["geometry"]["coordinates"][0]
lat = r.json()["features"][0]["geometry"]["coordinates"][1]
t_log.info(f"Coordinates for {city}: {lat}/{long}")
return {"city": city, "lat": lat, "long": long}
# One dynamically mapped task instance per city in the list.
cities_coordinates = get_lat_long_for_one_city.expand(city=cities)
@task.branch
def decide_timeframe(**context):
if context["params"]["timeframe"] == "yesterday":
return "get_weather_yesterday"
elif context["params"]["timeframe"] == "today":
return "get_weather_today"
elif context["params"]["timeframe"] == "today_and_tomorrow":
return "get_weather_today_and_tomorrow"
else:
raise ValueError("Invalid timeframe parameter.")
get_weather_yesterday = HttpOperator(
task_id="get_weather_yesterday",
endpoint="forecast",
method="GET",
http_conn_id="weather_api_conn",
log_response=True,
data={
"latitude": cities_coordinates["lat"],
"longitude": cities_coordinates["long"],
"hourly": "{{ params.weather_parameter }}",
"past_days": 1,
},
)
get_weather_today = HttpOperator(
task_id="get_weather_today",
endpoint="forecast",
method="GET",
http_conn_id="weather_api_conn",
log_response=True,
data={
"latitude": cities_coordinates["lat"],
"longitude": cities_coordinates["long"],
"hourly": "{{ params.weather_parameter }}",
"forecast_days": 1,
},
)
get_weather_today_and_tomorrow = HttpOperator(
task_id="get_weather_today_and_tomorrow",
endpoint="forecast",
method="GET",
http_conn_id="weather_api_conn",
log_response=True,
data={
"latitude": cities_coordinates["lat"],
"longitude": cities_coordinates["long"],
"hourly": "{{ params.weather_parameter }}",
"forecast_days": 2,
},
)
@task(
trigger_rule="none_failed",
)
def get_weather_from_response(
weather_yesterday: list,
weather_today: list,
weather_today_and_tomorrow: list,
**context,
):
if weather_yesterday:
return weather_yesterday
elif weather_today:
return weather_today
elif weather_today_and_tomorrow:
return weather_today_and_tomorrow
else:
raise ValueError("No weather data found.")
@task(outlets=[Dataset("current_weather_data")])
def create_weather_table(
weather: list | dict, cities_coordinates: list | dict, **context
):
"""
Saves a table of the weather for the cities of interest to the logs and a CSV file.
Args:
weather: The weather data for the cities of interest, in JSON format.
cities_coordinates: The coordinates of the cities of interest.
"""
from airflow.models.xcom import LazyXComSelectSequence
import json
from tabulate import tabulate
from include.helper_functions import (
map_cities_to_weather,
)
import time
time.sleep(context["params"]["simulate_task_delay"])
weather = json.loads(weather)
weather_parameter = context["params"]["weather_parameter"]
weather = weather if isinstance(weather, list) else [weather]
cities_coordinates = (
list(cities_coordinates)
if isinstance(cities_coordinates, LazyXComSelectSequence)
else [cities_coordinates]
)
city_weather_info = map_cities_to_weather(
weather, cities_coordinates, weather_parameter
)
t_log.info(
tabulate(city_weather_info, headers="keys", tablefmt="grid", showindex=True)
)
return city_weather_info
weather_data = get_weather_from_response(
weather_today=get_weather_today.output,
weather_today_and_tomorrow=get_weather_today_and_tomorrow.output,
weather_yesterday=get_weather_yesterday.output,
)
create_weather_table(weather=weather_data, cities_coordinates=cities_coordinates)
chain(
cities_coordinates,
decide_timeframe(),
[get_weather_yesterday, get_weather_today, get_weather_today_and_tomorrow],
)
upstream_dag_1_solution()