Skip to content

Commit 6379437

Browse files
author
Martin
committed
Various changes to utils.py, e.g. to optimize for larger log files (> 5 MB) on S3
1 parent cf64bc0 commit 6379437

File tree

6 files changed

+44
-35
lines changed

6 files changed

+44
-35
lines changed
Binary file not shown.

examples/data-processing/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ Download this folder, enter it, open your command prompt and run below:
2222
### Regarding local disk vs S3
2323
The examples load data from local disk by default. If you want to load data from your S3 server, modify `devices` to include a list of S3 device paths (e.g. `"my_bucket/device_id"`). In addition, you'll modify the `fs` initialization to include your S3 details as below:
2424

25+
```
2526
fs = setup_fs(s3=True, key="access_key", secret="secret_key", endpoint="endpoint")
27+
```
2628

2729
If you're using AWS S3, your endpoint would e.g. be `https://s3.us-east-2.amazonaws.com` (if your region is `us-east-2`). A MinIO S3 endpoint would e.g. be `http://192.168.0.1:9000`.
2830

examples/data-processing/process_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@ def ratio(s1, s2):
3939

4040
# --------------------------------------------
4141
# example: resample and restructure data (parameters in columns)
42-
df_phys_join = restructure_data(df_phys=df_phys_all, res="1S")
42+
df_phys_join = restructure_data(df_phys=df_phys_all, res="1S", full_col_names=True)
4343
df_phys_join.to_csv("output_joined.csv")
4444
print(df_phys_join)

examples/data-processing/process_tp_data.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def process_tp_example(devices, dbc_path, tp_type):
2323
df_raw.to_csv(f"{output_folder}/tp_raw_data_combined.csv")
2424

2525
# extract physical values as normal, but add tp_type
26-
df_phys = proc.extract_phys(df_raw, tp_type=tp_type)
26+
df_phys = proc.extract_phys(df_raw)
2727
df_phys.to_csv(f"{output_folder}/tp_physical_values.csv")
2828

2929
print("Finished saving CSV output for devices:", devices)
@@ -43,9 +43,9 @@ def process_tp_example(devices, dbc_path, tp_type):
4343
process_tp_example(devices, dbc_paths, "j1939")
4444

4545
# NMEA 2000 fast packet data (with GNSS position)
46-
# devices = ["LOG_TP/94C49784"]
47-
# dbc_paths = [r"dbc_files/tp_nmea_2.dbc"]
48-
# process_tp_example(devices, dbc_paths, "nmea")
46+
devices = ["LOG_TP/94C49784"]
47+
dbc_paths = [r"dbc_files/tp_nmea_2.dbc"]
48+
process_tp_example(devices, dbc_paths, "nmea")
4949

5050
# UDS data across two CAN channels
5151
devices = ["LOG_TP/FE34E37D"]

examples/data-processing/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ typing-extensions==3.10.0.0
2929
urllib3==1.26.5
3030
wrapt==1.12.1
3131
yarl==1.6.3
32-
zipp==3.4.1
32+
zipp==3.4.1
33+
j1939_pgn==0.4

examples/data-processing/utils.py

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
11
def setup_fs(s3, key="", secret="", endpoint="", cert=""):
22
"""Given a boolean specifying whether to use local disk or S3, setup filesystem
33
Syntax examples: AWS (http://s3.us-east-2.amazonaws.com), MinIO (http://192.168.0.1:9000)
4-
The cert input is relevant if you're using MinIO with TLS enabled, for specifying the path to the certficiate
4+
The cert input is relevant if you're using MinIO with TLS enabled, for specifying the path to the certficiate.
5+
6+
The block_size is set to accomodate files up to 55 MB in size. If your log files are larger, adjust this value accordingly
57
"""
68

79
if s3:
810
import s3fs
911

12+
block_size = 55 * 1024 * 1024
13+
1014
if "amazonaws" in endpoint:
11-
fs = s3fs.S3FileSystem(key=key, secret=secret)
15+
fs = s3fs.S3FileSystem(key=key, secret=secret, default_block_size=block_size)
1216
elif cert != "":
13-
fs = s3fs.S3FileSystem(key=key, secret=secret, client_kwargs={"endpoint_url": endpoint, "verify": cert})
17+
fs = s3fs.S3FileSystem(
18+
key=key, secret=secret, client_kwargs={"endpoint_url": endpoint, "verify": cert}, default_block_size=block_size
19+
)
1420
else:
15-
fs = s3fs.S3FileSystem(key=key, secret=secret, client_kwargs={"endpoint_url": endpoint},)
21+
fs = s3fs.S3FileSystem(
22+
key=key, secret=secret, client_kwargs={"endpoint_url": endpoint}, default_block_size=block_size
23+
)
1624

1725
else:
1826
from pathlib import Path
@@ -52,37 +60,31 @@ def list_log_files(fs, devices, start_times, verbose=True):
5260
for idx, device in enumerate(devices):
5361
start = start_times[idx]
5462
log_files_device = canedge_browser.get_log_files(fs, [device], start_date=start)
55-
56-
# exclude the 1st log file if the last timestamp is before the start timestamp
57-
if len(log_files_device) > 0:
58-
with fs.open(log_files_device[0], "rb") as handle:
59-
mdf_file = mdf_iter.MdfFile(handle)
60-
df_raw_lin = mdf_file.get_data_frame_lin()
61-
df_raw_lin["IDE"] = 0
62-
df_raw_can = mdf_file.get_data_frame()
63-
df_raw = df_raw_can.append(df_raw_lin)
64-
end_time = df_raw.index[-1]
65-
66-
if end_time < start:
67-
log_files_device = log_files_device[1:]
68-
69-
log_files.extend(log_files_device)
63+
log_files.extend(log_files_device)
7064

7165
if verbose:
7266
print(f"Found {len(log_files)} log files\n")
7367

7468
return log_files
7569

7670

77-
def restructure_data(df_phys, res, full_col_names=True):
71+
def restructure_data(df_phys, res, full_col_names=False, pgn_names=False):
7872
import pandas as pd
73+
from J1939_PGN import J1939_PGN
7974

8075
df_phys_join = pd.DataFrame({"TimeStamp": []})
8176
if not df_phys.empty:
8277
for message, df_phys_message in df_phys.groupby("CAN ID"):
8378
for signal, data in df_phys_message.groupby("Signal"):
84-
if full_col_names == True:
85-
col_name = str(hex(int(message))).upper()[2:] + "." + str(signal)
79+
80+
pgn = J1939_PGN(int(message)).pgn
81+
82+
if full_col_names == True and pgn_names == False:
83+
col_name = str(hex(int(message))).upper()[2:] + "." + signal
84+
elif full_col_names == True and pgn_names == True:
85+
col_name = str(hex(int(message))).upper()[2:] + "." + str(pgn) + "." + signal
86+
elif full_col_names == False and pgn_names == True:
87+
col_name = str(pgn) + "." + signal
8688
else:
8789
col_name = signal
8890

@@ -178,18 +180,23 @@ def filter_signals(self, df_phys):
178180

179181
return df_phys
180182

181-
def get_raw_data(self, log_file):
182-
"""Extract a df of raw data and device ID from log file
183+
def get_raw_data(self, log_file, lin=False):
184+
"""Extract a df of raw data and device ID from log file.
185+
Optionally include LIN bus data by setting lin=True
183186
"""
184187
import mdf_iter
185188

186189
with self.fs.open(log_file, "rb") as handle:
187190
mdf_file = mdf_iter.MdfFile(handle)
188191
device_id = self.get_device_id(mdf_file)
189-
df_raw_lin = mdf_file.get_data_frame_lin()
190-
df_raw_lin["IDE"] = 0
191-
df_raw_can = mdf_file.get_data_frame()
192-
df_raw = df_raw_can.append(df_raw_lin)
192+
193+
if lin:
194+
df_raw_lin = mdf_file.get_data_frame_lin()
195+
df_raw_lin["IDE"] = 0
196+
df_raw_can = mdf_file.get_data_frame()
197+
df_raw = df_raw_can.append(df_raw_lin)
198+
else:
199+
df_raw = mdf_file.get_data_frame()
193200

194201
return df_raw, device_id
195202

@@ -342,7 +349,6 @@ def construct_new_tp_frame(self, base_frame, payload_concatenated, can_id):
342349

343350
def combine_tp_frames(self, df_raw):
344351
import pandas as pd
345-
import sys
346352

347353
bam_pgn_hex = self.frame_struct["bam_pgn_hex"]
348354
res_id_list = [int(res_id, 16) for res_id in self.frame_struct["res_id_list_hex"]]

0 commit comments

Comments
 (0)