Skip to content

Commit 4b79339

Browse files
committed
Add dfreader_pandas
1 parent 2d2d811 commit 4b79339

File tree

2 files changed

+212
-0
lines changed

2 files changed

+212
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ include/mavlink/v1.0
2828
dfindexer/dfindexer_cy.c
2929
dfindexer/*.so
3030
dfindexer/*.pyd
31+
.dfreader_pandas_cache

dfreader_pandas.py

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
"""
2+
Module to parse ArduPilot logs into pandas DataFrames with optional caching.
3+
4+
- Accepts a log file, list of messages/fields, and resample frequency.
5+
- Aligns output rows to clean time intervals relative to the start of the log.
6+
- Uses a lightweight caching mechanism keyed by file content and module hash.
7+
- Automatically invalidates cache on module updates or parameter changes.
8+
9+
Intended for efficient, repeatable log analysis workflows.
10+
"""
11+
12+
import os
13+
import json
14+
import hashlib
15+
import pandas as pd
16+
from pymavlink.DFReader import DFReader_binary
17+
18+
HASH_SIZE_BYTES = 1024 * 1024 # 1MB
19+
20+
21+
def parse_log_to_df(path, specs, frequency, cache_dir=None):
22+
manager = LogCacheManager(cache_dir)
23+
24+
if manager:
25+
df = manager.try_load_dataframe(path, specs, frequency)
26+
if df is not None:
27+
return df
28+
29+
reader = DFReader_binary(path)
30+
fields = expand_field_specs(specs, reader)
31+
# Dump the messages dict, so we get NaNs until the first valid message of each type
32+
reader.rewind()
33+
34+
PERIOD = 1 / frequency
35+
first_timestamp = None
36+
next_emit_time = None
37+
38+
rows = []
39+
while True:
40+
msg = reader.recv_match(type=fields.keys())
41+
if msg is None:
42+
break
43+
if not first_timestamp:
44+
first_timestamp = reader.clock.timestamp
45+
next_emit_time = first_timestamp
46+
if reader.clock.timestamp >= next_emit_time:
47+
n_period = (reader.clock.timestamp - first_timestamp) // PERIOD
48+
n_period += 1
49+
next_emit_time = first_timestamp + n_period * PERIOD
50+
rows.append(new_row(reader, fields))
51+
52+
df = pd.DataFrame(rows)
53+
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
54+
df.set_index("timestamp", inplace=True)
55+
df = df[[f"{m}.{f}" for m in fields.keys() for f in fields[m]]]
56+
57+
if manager:
58+
manager.save_dataframe(path, df, specs, frequency)
59+
60+
return df
61+
62+
63+
def new_row(reader: DFReader_binary, fields):
64+
row = {"timestamp": reader.clock.timestamp}
65+
for msg in fields.keys():
66+
if msg not in reader.messages:
67+
continue
68+
m = reader.messages[msg]
69+
for field in fields[msg]:
70+
row[f"{msg}.{field}"] = getattr(m, field, None)
71+
return row
72+
73+
74+
def expand_field_specs(specs, reader: DFReader_binary):
75+
out = {}
76+
for spec in specs:
77+
if "." in spec:
78+
msg, field = spec.split(".")
79+
if msg not in reader.name_to_id:
80+
raise ValueError(f"Message {msg} not found in log file")
81+
fmt = reader.formats[reader.name_to_id[msg]]
82+
if field not in fmt.columns:
83+
raise ValueError(f"Field {field} not found in message {msg}")
84+
out.setdefault(msg, []).append(field)
85+
else:
86+
msg = spec
87+
if msg not in reader.name_to_id:
88+
raise ValueError(f"Message {msg} not found in log file")
89+
fmt = reader.formats[reader.name_to_id[msg]]
90+
out.setdefault(msg, []).extend(fmt.columns)
91+
return out
92+
93+
94+
class LogCacheManager:
95+
def __init__(self, cache_dir):
96+
self.cache_dir = cache_dir
97+
if cache_dir is not None:
98+
os.makedirs(cache_dir, exist_ok=True)
99+
100+
def __bool__(self):
101+
return self.cache_dir is not None
102+
103+
def _compute_key(self, path):
104+
stat = os.stat(path)
105+
size = stat.st_size
106+
with open(path, "rb") as f:
107+
data = f.read(HASH_SIZE_BYTES)
108+
h = hashlib.sha256(data).hexdigest()
109+
h = h[:16] # 16 characters is plenty to prevent collisions
110+
return f"{h}_{size}"
111+
112+
def _module_hash(self):
113+
with open(__file__, "rb") as f:
114+
return hashlib.sha256(f.read()).hexdigest()[:8]
115+
116+
@staticmethod
117+
def _specs_equal(a, b):
118+
return set(a) == set(b)
119+
120+
def try_load_dataframe(self, path, specs, frequency):
121+
key = self._compute_key(path)
122+
cache_path = os.path.join(self.cache_dir, f"{key}.feather")
123+
meta_path = os.path.join(self.cache_dir, f"{key}.meta.json")
124+
125+
if os.path.exists(cache_path) and os.path.exists(meta_path):
126+
with open(meta_path, "r") as f:
127+
meta = json.load(f)
128+
if (
129+
self._specs_equal(meta.get("specs", []), specs)
130+
and meta.get("frequency") == frequency
131+
and meta.get("module_hash") == self._module_hash()
132+
):
133+
return pd.read_feather(cache_path)
134+
return None
135+
136+
def save_dataframe(self, path, df, specs, frequency):
137+
key = self._compute_key(path)
138+
cache_path = os.path.join(self.cache_dir, f"{key}.feather")
139+
meta_path = os.path.join(self.cache_dir, f"{key}.meta.json")
140+
141+
df.reset_index(drop=True).to_feather(cache_path)
142+
meta = {
143+
"specs": specs,
144+
"frequency": frequency,
145+
"module_hash": self._module_hash(),
146+
}
147+
with open(meta_path, "w") as f:
148+
json.dump(meta, f)
149+
150+
151+
def main():
152+
import argparse
153+
154+
parser = argparse.ArgumentParser(
155+
description="Parse a log file to a DataFrame.",
156+
usage="python -m pymavlink.dfreader_pandas <log_file> [options]",
157+
epilog="Example usage: python -m pymavlink.dfreader_pandas log.bin --fields TECS EFI CTUN.E2T --frequency 10.0",
158+
)
159+
parser.add_argument("path", type=str, help="Path to the log file")
160+
parser.add_argument(
161+
"--fields",
162+
type=str,
163+
nargs="+",
164+
default=[],
165+
help="Space-separated list of message types and fields to include in the DataFrame",
166+
)
167+
parser.add_argument(
168+
"--frequency",
169+
type=float,
170+
default=10.0,
171+
help="Frequency in Hz for sampling the log file",
172+
)
173+
parser.add_argument(
174+
"--cache_dir",
175+
type=str,
176+
default=None,
177+
help="Directory to cache the DataFrame",
178+
)
179+
parser.add_argument(
180+
"--profile",
181+
action="store_true",
182+
help="Enable profiling of the parsing process",
183+
)
184+
args = parser.parse_args()
185+
186+
if args.profile:
187+
from line_profiler import LineProfiler
188+
189+
profiler = LineProfiler()
190+
profiler.add_function(parse_log_to_df)
191+
profiler.add_function(new_row)
192+
profiler.enable_by_count()
193+
194+
if args.fields is None or len(args.fields) == 0:
195+
raise ValueError(
196+
"No fields provided. Use --fields to specify message types and/or fields."
197+
)
198+
199+
df = parse_log_to_df(
200+
args.path, args.fields, args.frequency, args.cache_dir
201+
)
202+
print(df.head())
203+
print("...")
204+
print(df.tail())
205+
206+
if args.profile:
207+
profiler.print_stats()
208+
209+
210+
if __name__ == "__main__":
211+
main()

0 commit comments

Comments
 (0)