-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl_pipeline.py
76 lines (66 loc) · 2.16 KB
/
etl_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import psycopg2
import json
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def extract():
"""Fetch data from the weather API"""
url = "https://api.open-meteo.com/v1/forecast?latitude=35&longitude=139&hourly=temperature_2m"
response = requests.get(url)
data = response.json()
with open('/tmp/weather_data.json', 'w') as f:
json.dump(data, f)
def transform():
"""Transform the raw JSON data"""
with open('/tmp/weather_data.json', 'r') as f:
data = json.load(f)
transformed_data = [{"timestamp": hour["time"], "temperature": hour["temperature_2m"]} for hour in data["hourly"]["time"]]
with open('/tmp/transformed_weather_data.json', 'w') as f:
json.dump(transformed_data, f)
def load():
"""Load the transformed data into PostgreSQL"""
with open('/tmp/transformed_weather_data.json', 'r') as f:
data = json.load(f)
conn = psycopg2.connect(
host="postgres",
database="airflow",
user="airflow",
password="airflow"
)
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS weather (timestamp TEXT, temperature FLOAT);")
for record in data:
cursor.execute("INSERT INTO weather (timestamp, temperature) VALUES (%s, %s)", (record["timestamp"], record["temperature"]))
conn.commit()
cursor.close()
conn.close()
with DAG(
'etl_pipeline',
default_args=default_args,
description='ETL pipeline for weather data',
schedule_interval=timedelta(hours=1),
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
extract_task = PythonOperator(
task_id='extract',
python_callable=extract
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform
)
load_task = PythonOperator(
task_id='load',
python_callable=load
)
extract_task >> transform_task >> load_task