-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathextract_csv_files.py
135 lines (119 loc) · 6.32 KB
/
extract_csv_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import zipfile
import xml.etree.ElementTree as ET
import csv
# Define the directories
root_dir = '.'
gps_dir = os.path.join(root_dir, 'gps')
csv_dir = os.path.join(root_dir, 'csv')
locations_dir = os.path.join(root_dir, 'locations')
# Create the csv directory if it doesn't exist
os.makedirs(csv_dir, exist_ok=True)
# Define the CSV headers
csv_headers = [
"time", "lat", "lon", "elevation", "accuracy", "bearing", "speed", "satellites", "provider", "hdop", "vdop", "pdop", "geoidheight", "ageofdgpsdata", "dgpsid", "activity", "battery", "annotation", "timestamp_ms", "time_offset", "distance", "starttimestamp_ms", "profile_name", "battery_charging"
]
# Introduce the overwrite variable
overwrite = False
# Function to parse GPX and create CSV
def parse_gpx_to_csv(gpx_file, csv_file):
tree = ET.parse(gpx_file)
root = tree.getroot()
# Detect namespace
namespace = root.tag.split('}')[0].strip('{')
ns = {'default': namespace}
with open(csv_file, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
for trkpt in root.findall('.//default:trkpt', ns):
row = {
"time": trkpt.find('default:time', ns).text if trkpt.find('default:time', ns) is not None else '',
"lat": trkpt.get('lat'),
"lon": trkpt.get('lon'),
"elevation": trkpt.find('default:ele', ns).text if trkpt.find('default:ele', ns) is not None else '',
"provider": trkpt.find('default:src', ns).text if trkpt.find('default:src', ns) is not None else '',
"speed": trkpt.find('default:speed', ns).text if trkpt.find('default:speed', ns) is not None else '',
"hdop": trkpt.find('default:hdop', ns).text if trkpt.find('default:hdop', ns) is not None else '',
"vdop": trkpt.find('default:vdop', ns).text if trkpt.find('default:vdop', ns) is not None else '',
"pdop": trkpt.find('default:pdop', ns).text if trkpt.find('default:pdop', ns) is not None else '',
"geoidheight": trkpt.find('default:geoidheight', ns).text if trkpt.find('default:geoidheight', ns) is not None else '',
}
writer.writerow(row)
# Function to parse KML and create CSV
def parse_kml_to_csv(kml_file, csv_file):
tree = ET.parse(kml_file)
root = tree.getroot()
ns = {'kml': 'http://www.opengis.net/kml/2.2'}
with open(csv_file, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
for placemark in root.findall('.//kml:Placemark', ns):
# Extract activity from Category
activity = placemark.find('.//kml:ExtendedData/kml:Data[@name="Category"]/kml:value', ns)
activity_value = activity.text if activity is not None else ''
# Filter only 'Walking' activities
if activity_value != 'Walking':
continue
# Extract Point coordinates
point = placemark.find('.//kml:Point/kml:coordinates', ns)
if point is not None:
coords = point.text.strip().split(',')
row = {
"time": placemark.find('.//kml:begin', ns).text if placemark.find('.//kml:begin', ns) is not None else '',
"lat": coords[1],
"lon": coords[0],
"elevation": coords[2] if len(coords) > 2 else '',
"activity": activity_value
}
writer.writerow(row)
# Extract LineString coordinates
linestring = placemark.find('.//kml:LineString/kml:coordinates', ns)
if linestring is not None:
for coord in linestring.text.strip().split():
coords = coord.split(',')
row = {
"time": placemark.find('.//kml:begin', ns).text if placemark.find('.//kml:begin', ns) is not None else '',
"lat": coords[1],
"lon": coords[0],
"elevation": coords[2] if len(coords) > 2 else '',
"activity": activity_value
}
writer.writerow(row)
# Main function to process files
def main():
# Iterate over all files in the gps directory
for filename in os.listdir(gps_dir):
if filename.endswith('.zip'):
zip_path = os.path.join(gps_dir, filename)
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# Check for CSV files
csv_files = [file for file in zip_ref.namelist() if file.endswith('.csv')]
if csv_files:
# Extract all CSV files to the csv directory
for file in csv_files:
csv_file_path = os.path.join(csv_dir, file)
if not os.path.exists(csv_file_path) or overwrite:
zip_ref.extract(file, csv_dir)
else:
# Check for GPX files
gpx_files = [file for file in zip_ref.namelist() if file.endswith('.gpx')]
for gpx_file in gpx_files:
extracted_gpx_path = zip_ref.extract(gpx_file, gps_dir)
csv_filename = os.path.splitext(os.path.basename(gpx_file))[0] + '.csv'
csv_file_path = os.path.join(csv_dir, csv_filename)
# Check if the file exists and overwrite is False
if not os.path.exists(csv_file_path) or overwrite:
parse_gpx_to_csv(extracted_gpx_path, csv_file_path)
except zipfile.BadZipFile:
print(f"Skipping {filename}: not a valid zip file")
# Process KML files
for kml_file in os.listdir(locations_dir):
if kml_file.endswith('.kml'):
date_str = kml_file.replace('history', '').replace('-', '').split('.')[0]
csv_file = os.path.join(csv_dir, f'{date_str}.csv')
# Check if the file exists and overwrite is False
if not os.path.exists(csv_file) or overwrite:
parse_kml_to_csv(os.path.join(locations_dir, kml_file), csv_file)
if __name__ == "__main__":
main()