-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparse_logs.py
266 lines (234 loc) · 10.1 KB
/
parse_logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# -*- coding: utf-8 -*-
"""Parse NMEA logs from Alliance."""
from datetime import datetime, timedelta
import metpy.calc as mcalc
import metpy.units as metunits
import numpy as np
import pandas as pd
from pathlib import Path
import pynmea2
import xarray as xr
try:
# If tqdm is installed
try:
# Check if it's Jupyter Notebook
ipy_str = str(type(get_ipython()))
if 'zmqshell' in ipy_str.lower():
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
except NameError:
from tqdm import tqdm
from functools import partial
pbar = partial(tqdm, leave=False)
except ImportError:
def pbar(obj, **tqdm_kw):
"""Empty progress bar."""
return obj
# TODO: docstrings!!!
# Typical message list
MSG_LIST = [
dict(talker='GGA', fields=(('datetime_str',), ('longitude',), ('latitude',))),
dict(talker='HDT', fields=(('datetime_str',), ('heading', float))),
dict(talker='MWV', fields=(('datetime_str',), ('status', str), ('reference', str),
('wind_speed', float), ('wind_angle', float))),
dict(talker='VHW', fields=(('datetime_str',), ('water_speed_knots', float), )),
]
class AllianceComposite:
"""
Class for processing Alliance NMEA logs
Useful for combining several messages together, aligning data along time axis,
averaging over time, and saving to netCDF files.
Typical workflow:
>>> ac = AllianceComposite(Path('/path/to/file.log'), datetime.datetime(2018, 3, 1))
>>> ac.process(msg_list)
>>> averaged_dataset = ac.average_over_time(freq='1H')
>>> ac.to_netcdf(averaged_dataset, path='myfile.nc')
"""
TSTART = datetime(1970, 1, 1)
def __init__(self, fname, date):
"""
Initialise the AllianceComposite object
Arguments
---------
fname: pathlib.Path
Path to the log file to process
date: datetime.datetime
Log file date
"""
assert isinstance(fname, Path), 'fname should be a Path object!'
self.fname = fname
self.date = date
# Assume 1 second frequency of the log files
# TODO: make it flexible
self.time_range = (pd.date_range(start=date,
freq='S',
periods=86400)
.to_series()
.to_frame(name='time'))
self.data_d = dict()
def read(self, msg_req_list):
"""
Read the log file and store results as `.ds` attribute (xarray.Dataset).
Arguments
---------
msg_req_list: list
List of dictionaries with fields to extract from NMEA messages
"""
for msg_req in msg_req_list:
self.data_d[msg_req['talker']] = dict()
for fld in msg_req['fields']:
self.data_d[msg_req['talker']][fld[0]] = []
with self.fname.open('r') as f:
for line in tqdm(f.readlines()):
try:
msg = pynmea2.NMEASentence.parse(line)
for msg_req in msg_req_list:
if isinstance(msg, getattr(pynmea2.talker,
msg_req['talker'])):
for fld in msg_req['fields']:
assert isinstance(fld, tuple), 'Each field must be tuple!'
value = getattr(msg, fld[0])
if len(fld) == 2:
# if the tuple contains two elements, assume the second one
# is a function to convert the field value
try:
value = fld[1](value)
except (ValueError, TypeError):
value = np.nan
self.data_d[msg_req['talker']][fld[0]].append(value)
except pynmea2.ParseError:
pass
# Convert dictionaries of lists to dataframes and merge them together
# using the time_range dataframe of 86400 seconds
df = self.time_range
for val in self.data_d.values():
msg_df = pd.DataFrame(data=val)
msg_df.rename(dict(datetime_str='datetime'), axis=1, inplace=True)
msg_df['datetime'] = (msg_df['datetime']
.astype(int)
.apply(lambda x: self.TSTART + timedelta(seconds=x)))
msg_df = msg_df.drop_duplicates('datetime').set_index('datetime')
df = pd.merge(df, msg_df,
how='outer', left_index=True, right_index=True)
self.ds = df.to_xarray()
def clean_up(self, drop_time=True,
mask_invalid_wind=True, mask_relative_wind=True,
convert_to_uv=True, convert_to_mps=True):
"""
Clean up the dataset and add essential attributes.
Arguments
---------
drop_time: bool
Remove additional time variable (and leave only the index)
mask_invalid_wind: bool
Mask out wind speed and wind angle values if $INMWV Status is not "A"
mask_relative_wind: bool
Mask out wind speed and wind angle values if $INMWV Reference is not "T"
convert_to_uv: bool
Convert wind speed and wind angle to u- and v-components
convert_to_mps: bool
Convert units of wind speed and water speed from knots to m/s
"""
self.ds.longitude.attrs['units'] = 'degrees_east'
self.ds.latitude.attrs['units'] = 'degrees_north'
if drop_time:
self.ds = self.ds.drop(labels=['time'])
self.ds.rename(dict(index='time'), inplace=True)
if mask_invalid_wind:
self.ds.wind_angle.values[self.ds.status != 'A'] = np.nan
self.ds.wind_speed.values[self.ds.status != 'A'] = np.nan
self.ds = self.ds.drop(labels=['status'])
if mask_relative_wind:
self.ds.wind_angle.values[self.ds.reference != 'T'] = np.nan
self.ds.wind_speed.values[self.ds.reference != 'T'] = np.nan
self.ds = self.ds.drop(labels=['reference'])
if convert_to_mps:
kt2mps = metunits.units('knots').to('m/s')
self.ds['wind_speed'] *= kt2mps
self.ds['water_speed_knots'] *= kt2mps
self.ds.rename(dict(water_speed_knots='water_speed'), inplace=True)
else:
kt2mps = metunits.units('knots')
if convert_to_uv:
u, v = mcalc.get_wind_components(self.ds.wind_speed.values * kt2mps,
self.ds.wind_angle.values * metunits.units('degrees'))
self.ds = self.ds.drop(labels=['wind_speed', 'wind_angle'])
self.ds = self.ds.assign(u=xr.Variable(dims='time', data=u,
attrs=dict(units='m s**-1',
long_name='U component of wind',
short_name='eastward_wind')),
v=xr.Variable(dims='time', data=v,
attrs=dict(units='m s**-1',
long_name='V component of wind',
short_name='northward_wind')))
def process(self, msg_req_list, **kwargs):
"""Shortcut for read() and clean_up() methods."""
self.read(msg_req_list)
self.clean_up(**kwargs)
def time_ave(self, freq):
"""
Average the dataset over constant periods of time
Arguments
---------
freq: string or pandas.DateOffset
Size of time chunks. E.g. 10T is 10 minutes
Returns
-------
ave_ds: xarray.Dataset
Dataset of averaged data
"""
return self.ds.resample(time=freq).reduce(np.nanmean)
@classmethod
def to_netcdf(cls, ds, path, encoding=None, **kwargs):
"""Save xarray dataset to netCDF file and ensure that calendar uses the same start date."""
if encoding is None:
encoding = dict(time=dict(units=f'seconds since {cls.TSTART}',
calendar='gregorian'))
ds.to_netcdf(path=path, encoding=encoding, **kwargs)
def average_ds_over_time(ds, date, freq, mark='end', time_res='S'):
"""
Average the dataset over constant periods of time
Arguments
---------
ds: xarray.Dataset
Dataset to average
date: datetime.datetime
Start date
freq: string or pandas.DateOffset
Size of time chunks. E.g. 10T is 10 minutes
mark: string, optional
Time index mark. Can be one "start" or "end", e.g. the start
or the end of time chunks.
time_res: string, optional
Can be seconds (S), minutes (M)
Returns
-------
ave_ds: xarray.Dataset
Dataset of averaged data
"""
# create time index with the given frequency
new_time = pd.date_range(start=date,
end=date+timedelta(hours=23, minutes=59, seconds=59),
freq=freq)
if mark == 'end':
tstep = new_time[1] - new_time[0]
new_time += tstep
# TODO: add "middle" option
if time_res == 'S':
# TODO: rewrite this
ts = tstep.total_seconds()
elif time_res == 'M':
ts = tstep.total_seconds() / 60
# save attributes before averaging
_attrs = {k: ds[k].attrs for k in ds.data_vars}
# average over time chunks
ave_ds = (ds.groupby(xr.IndexVariable(dims='time',
data=np.arange(len(ds.time)) // ts))
.mean())
# reset time index
ave_ds['time'] = new_time
# after groupby operation, the attributes are lost, so the saved are used
for k in ds.data_vars:
ave_ds[k].attrs.update(_attrs[k])
return ave_ds