This repository was archived by the owner on May 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathheatmap.py
201 lines (174 loc) · 6.5 KB
/
heatmap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
'''
Genera un mapa de calor basado en datos de contaminantes,
viento y coordenadas de estaciones de calidad del aire.
'''
from math import copysign
import pandas as pd
import numpy as np
import plotly
import plotly.graph_objects as go
from kriging import interpolate
def plot_heatmap(pollutant: str, date: str, tokenfile: str, output: str = '') -> None:
'''
Genera un mapa de calor de un contaminante con marcadores
de dirección y velocidad del viento del día especificado.
:param pollutant: Nombre del contaminante.
:param date: Fecha en formato `<día>-<mes corto>-<año corto>` (`'d-b-y'`), ejemplo:
'1-Dec-18'
:param tokenfile: Archivo con token de Mapbox.
:param output: Ruta relativa del archivo HTML para guardar el mapa de calor.
'''
# si no se especificó nombre de archivo, generar uno
filepath = output if output else f'{pollutant}_{date}.html'
print(f'{filepath}: Preparando datos...', flush=True)
# columnas a extraer del CSV
columns = ['timestamp', 'station', pollutant, 'velocity', 'direction']
dataframe = pd.read_csv('resources/filled.csv', usecols=columns).dropna()
# leer coordenadas de estaciones
coords = pd.read_csv('resources/estaciones.dat')
# iterar sobre índice y datos de cada fila
for i, r in coords.iterrows():
# signo de latitud: + Norte, - Sur
sign = copysign(1, r[0])
# calcular coordenadas decimales a partir de GMS
coords.loc[i, 'lat'] = sign * (abs(r[0]) + r[1] / 60 + r[2] / 3600)
# signo de longitud: + Este, - Oeste
sign = copysign(1, r[3])
# calcular coordenadas decimales a partir de GMS
coords.loc[i, 'lon'] = sign * (abs(r[3]) + r[4] / 60 + r[5] / 3600)
dataset = coords.merge(
# filtrar registros del día elegido
dataframe.loc[dataframe['timestamp'].str.startswith(date)],
# unir DataFrames de datos con coordenadas
on='station'
# eliminar columnas GMS
).drop(coords.columns[range(6)], axis=1)
# convertir strings a objeto datetime
strfdt = '%d-%b-%y %H'
dataset['timestamp'] = pd.to_datetime(dataset['timestamp'], format=strfdt)
# escala de densidad
pollutionmin, pollutionmax = min(dataset[pollutant]), max(dataset[pollutant])
velocitymin, velocitymax = min(dataset['velocity']), max(dataset['velocity'])
frames, steps = [], []
# filtrar horas del día elegido
hours = dataset.timestamp.unique()
hours.sort()
print(f'{filepath}: Generando mapa de calor...', flush=True)
for hour in hours:
# datos leídos en la hora epecífica
data = dataset.loc[dataset['timestamp'] == hour]
## método de kringing
# interpolar contaminante
xpollution, ypollution, zpollution = interpolate(data.lon, data.lat, data[pollutant], range(5, 41, 5))
# rango para interpolación recursiva para valores de viento (20x20 puntos)
grid = range(5, 21, 5)
# interpolar velocidad de viento
xvelocity, yvelocity, zvelocity = interpolate(data.lon, data.lat, data['velocity'], grid)
# interpolar dirección de viento
xdirection, ydirection, zdirection = interpolate(data.lon, data.lat, data['direction'], grid)
strhour = pd.to_datetime(hour).strftime(strfdt)
frames.append({
'name': f'frame_{strhour}',
'data': [
# mapa de dirección de viento
dict(
type='scattermapbox',
lon=xdirection,
lat=ydirection,
mode='markers',
marker=dict(
symbol='marker',
size=12,
allowoverlap=True,
angle=[angle + 180 for angle in zdirection]
),
text=zdirection
),
# mapa de velocidad de viento
dict(
type='scattermapbox',
lon=xvelocity,
lat=yvelocity,
mode='markers',
marker=dict(
symbol='circle',
size=12,
allowoverlap=True,
color='white',
opacity=np.interp(zvelocity, (velocitymin, velocitymax), (0, 1)),
),
text=zvelocity
),
# mapa de calor de densidad de contaminante
dict(
type='densitymapbox',
lon=xpollution,
lat=ypollution,
z=zpollution,
opacity=0.5,
zmin=pollutionmin,
zmax=pollutionmax
)
]
})
steps.append({
'label': strhour,
'method': 'animate',
'args': [
[f'frame_{strhour}'],
{
'mode': 'immediate',
'frame': {
'duration': 500,
'redraw': True
},
'transition': {'duration': 300}
}
]
})
sliders = [{
'transition': {'duration': 300},
'x': 0.08,
'len': 0.88,
'currentvalue': {'xanchor': 'center'},
'steps': steps
}]
playbtn = [{
'type': 'buttons',
'showactive': True,
'x': 0.045, 'y': -0.08,
'buttons': [{
'label': 'Play',
'method': 'animate',
'args': [
None,
{
'mode': 'immediate',
'frame': {
'duration': 500,
'redraw': True
},
'transition': {'duration': 300},
'fromcurrent': True
}
]
}]
}]
with open(tokenfile, 'r') as file:
token = file.read()
layout = go.Layout(
sliders=sliders,
updatemenus=playbtn,
autosize=True,
mapbox=dict(
accesstoken=token,
center=dict(lat=25.67, lon=-100.338),
zoom=9.3
)
)
data = frames[0]['data']
figure = go.Figure(data=data, layout=layout, frames=frames)
print(f'{filepath}: Guardando mapa en archivo...', flush=True)
# guardar mapa en archivo
plotly.offline.plot(figure, filename=filepath)
plot_heatmap('PM10', '31-Dec-18', '')