-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl_1.py
111 lines (89 loc) · 4.01 KB
/
etl_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import glob
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime
"""Funzione che cicla su file csv, json e XML in una cartella per estrarre e trasformare informazioni e caricarle
in file csv.
Unici parametri da compilare sono i seguenti."""
#PARAMETRI DA COMPILARE
log_file = "log_file.txt"
source_files_folder="./source/"
target_file = "transformed_data.csv"
"""DEFINIZIONE FUNZIONI"""
"""Funzioni per fase di estrazione informazioni"""
#Funzione per leggere CSV
def extract_from_csv(file_to_process):
dataframe = pd.read_csv(file_to_process)
return dataframe
#Funzione per leggere JSON
def extract_from_json(file_to_process):
dataframe = pd.read_json(file_to_process, lines=True)
return dataframe
#Funzione per leggere XML
#Note: You must know the headers of the extracted data to write this function.
#In this data, you extract "name", "height", and "weight" headers for different persons.
def extract_from_xml(file_to_process):
dataframe = pd.DataFrame(columns=["name", "height", "weight"])
tree = ET.parse(file_to_process)
root = tree.getroot()
for person in root:
name = person.find("name").text
height = float(person.find("height").text)
weight = float(person.find("weight").text)
dataframe = pd.concat([dataframe, pd.DataFrame([{"name":name, "height":height, "weight":weight}])], ignore_index=True)
return dataframe
#Funzione per passare in rassegna tutti i file e ritornare dataframe
def extract(folder):
extracted_data = pd.DataFrame(columns=['name','height','weight']) # create an empty data frame to hold extracted data
# process all csv files
for csvfile in glob.glob(folder+"*.csv"):
extracted_data = pd.concat([extracted_data, pd.DataFrame(extract_from_csv(csvfile))], ignore_index=True)
# process all json files
for jsonfile in glob.glob(folder+"*.json"):
extracted_data = pd.concat([extracted_data, pd.DataFrame(extract_from_json(jsonfile))], ignore_index=True)
# process all xml files
for xmlfile in glob.glob(folder+"*.xml"):
extracted_data = pd.concat([extracted_data, pd.DataFrame(extract_from_xml(xmlfile))], ignore_index=True)
return extracted_data
"""Funzioni per fase di trasformazione informazioni"""
def transform(data):
'''Convert inches to meters and round off to two decimals
1 inch is 0.0254 meters '''
data['height'] = round(data.height * 0.0254,2)
'''Convert pounds to kilograms and round off to two decimals
1 pound is 0.45359237 kilograms '''
data['weight'] = round(data.weight * 0.45359237,2)
return data
"""Funzioni per fase di load informazioni"""
#Funzione per caricare dati su file csv
def load_data(target_file, transformed_data):
transformed_data.to_csv(target_file)
#Funzione che scrive un file di log
def log_progress(message):
timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second
now = datetime.now() # get current timestamp
timestamp = now.strftime(timestamp_format)
with open(log_file,"a") as f:
f.write(timestamp + ',' + message + '\n')
"""SEQUENZA FUNZIONI"""
# Log the initialization of the ETL process
log_progress("ETL Job Started")
# Log the beginning of the Extraction process
log_progress("Extract phase Started")
extracted_data = extract(source_files_folder)
# Log the completion of the Extraction process
log_progress("Extract phase Ended")
# Log the beginning of the Transformation process
log_progress("Transform phase Started")
transformed_data = transform(extracted_data)
print("Transformed Data")
print(transformed_data)
# Log the completion of the Transformation process
log_progress("Transform phase Ended")
# Log the beginning of the Loading process
log_progress("Load phase Started")
load_data(target_file,transformed_data)
# Log the completion of the Loading process
log_progress("Load phase Ended")
# Log the completion of the ETL process
log_progress("ETL Job Ended")