Skip to content

Commit 398c7d2

Browse files
committed
Merged new_data_file_format branch into dev branch. This changes the output data file formats from .txt and .pca files to .tsv and .npy files.
2 parents 67abe71 + 54a9241 commit 398c7d2

15 files changed

+476
-284
lines changed

com/data_logger.py

+97-54
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import os
2-
import json
2+
import numpy as np
33
from datetime import datetime
44
from shutil import copyfile
55

@@ -8,39 +8,48 @@ class Data_logger():
88

99
def __init__(self, sm_info=None, print_func=None, data_consumers=[]):
1010
self.data_file = None
11-
self.analog_files = {}
11+
self.analog_writers = {}
1212
self.print_func = print_func
1313
self.data_consumers = data_consumers
1414
if sm_info:
1515
self.set_state_machine(sm_info)
1616

1717
def set_state_machine(self, sm_info):
1818
self.sm_info = sm_info
19-
self.ID2name_fw = self.sm_info['ID2name'] # Dict mapping framework IDs to names.
20-
self.ID2name_hw = {ai['ID']: name for name, ai # Dict mapping hardware IDs to names.
21-
in self.sm_info['analog_inputs'].items()}
22-
self.analog_files = {ai['ID']: None for ai in self.sm_info['analog_inputs'].values()}
23-
19+
self.ID2name_fw = self.sm_info['ID2name'] # Dict mapping framework IDs to names.
20+
2421
def open_data_file(self, data_dir, experiment_name, setup_ID, subject_ID, datetime_now=None):
25-
'''Open data file and write header information.'''
22+
'''Open file tsv/txt file for event data and write header information.
23+
If state machine uses analog inputs instantiate analog data writers.'''
2624
self.data_dir = data_dir
2725
self.experiment_name = experiment_name
2826
self.subject_ID = subject_ID
2927
self.setup_ID = setup_ID
3028
if datetime_now is None: datetime_now = datetime.now()
31-
file_name = os.path.join(self.subject_ID + datetime_now.strftime('-%Y-%m-%d-%H%M%S') + '.txt')
29+
self.end_timestamp = -1
30+
file_name = self.subject_ID + datetime_now.strftime('-%Y-%m-%d-%H%M%S') + '.tsv'
3231
self.file_path = os.path.join(self.data_dir, file_name)
3332
self.data_file = open(self.file_path, 'w', newline = '\n')
34-
self.data_file.write('I Experiment name : {}\n'.format(self.experiment_name))
35-
self.data_file.write('I Task name : {}\n'.format(self.sm_info['name']))
36-
self.data_file.write('I Task file hash : {}\n'.format(self.sm_info['task_hash']))
37-
self.data_file.write('I Setup ID : {}\n'.format(self.setup_ID))
38-
self.data_file.write('I Framework version : {}\n'.format(self.sm_info['framework_version']))
39-
self.data_file.write('I Micropython version : {}\n'.format(self.sm_info['micropython_version']))
40-
self.data_file.write('I Subject ID : {}\n'.format(self.subject_ID))
41-
self.data_file.write('I Start date : ' + datetime_now.strftime('%Y/%m/%d %H:%M:%S') + '\n\n')
42-
self.data_file.write('S {}\n\n'.format(json.dumps(self.sm_info['states'])))
43-
self.data_file.write('E {}\n\n'.format(json.dumps(self.sm_info['events'])))
33+
self.data_file.write(self.tsv_row_str( # Write header with row names.
34+
rtype='type', time='time', name='name', value='value'))
35+
self.write_info_line('experiment_name', self.experiment_name)
36+
self.write_info_line('task_name', self.sm_info['name'])
37+
self.write_info_line('task_file_hash', self.sm_info['task_hash'])
38+
self.write_info_line('setup_id', self.setup_ID)
39+
self.write_info_line('framework_version', self.sm_info['framework_version'])
40+
self.write_info_line('micropython_version', self.sm_info['micropython_version'])
41+
self.write_info_line('subject_id', self.subject_ID)
42+
self.write_info_line('start_time', datetime.utcnow().isoformat(timespec='milliseconds'))
43+
self.analog_writers = {ID:
44+
Analog_writer(ai['name'], ai['fs'], ai['dtype'], self.file_path)
45+
for ID, ai in self.sm_info['analog_inputs'].items()}
46+
47+
def write_info_line(self, name, value, time=0):
48+
self.data_file.write(self.tsv_row_str('info', time=time, name=name, value=value))
49+
50+
def tsv_row_str(self, rtype, time='', name='', value=''):
51+
time_str = f'{time/1000:.3f}' if type(time) == int else time
52+
return f'{time_str}\t{rtype}\t{name}\t{value}\n'
4453

4554
def copy_task_file(self, data_dir, tasks_dir, dir_name='task_files'):
4655
'''If not already present, copy task file to data_dir/dir_name
@@ -55,21 +64,21 @@ def copy_task_file(self, data_dir, tasks_dir, dir_name='task_files'):
5564

5665
def close_files(self):
5766
if self.data_file:
67+
self.write_info_line('end_time', self.end_datetime.isoformat(timespec='milliseconds'), self.end_timestamp)
5868
self.data_file.close()
5969
self.data_file = None
6070
self.file_path = None
61-
for analog_file in self.analog_files.values():
62-
if analog_file:
63-
analog_file.close()
64-
analog_file = None
71+
for analog_writer in self.analog_writers.values():
72+
analog_writer.close_files()
73+
self.analog_writers = {}
6574

6675
def process_data(self, new_data):
6776
'''If data _file is open new data is written to file. If print_func is specified
6877
human readable data strings are passed to it.'''
6978
if self.data_file:
7079
self.write_to_file(new_data)
7180
if self.print_func:
72-
self.print_func(self.data_to_string(new_data, verbose=True), end='')
81+
self.print_func(self.data_to_string(new_data).replace('\t\t', '\t'), end='')
7382
if self.data_consumers:
7483
for data_consumer in self.data_consumers:
7584
data_consumer.process_data(new_data)
@@ -80,40 +89,74 @@ def write_to_file(self, new_data):
8089
self.data_file.write(data_string)
8190
self.data_file.flush()
8291
for nd in new_data:
83-
if nd[0] == 'A':
84-
self.save_analog_chunk(*nd[1:])
92+
if nd.type == 'A':
93+
self.analog_writers[nd.ID].save_analog_chunk(timestamp=nd.time, data_array=nd.data)
8594

86-
def data_to_string(self, new_data, verbose=False):
95+
def data_to_string(self, new_data):
8796
'''Convert list of data tuples into a string. If verbose=True state and event names are used,
8897
if verbose=False state and event IDs are used.'''
8998
data_string = ''
9099
for nd in new_data:
91-
if nd[0] == 'D': # State entry or event.
92-
if verbose: # Print state or event name.
93-
data_string += 'D {} {}\n'.format(nd[1], self.ID2name_fw[nd[2]])
94-
else: # Print state or event ID.
95-
data_string += 'D {} {}\n'.format(nd[1], nd[2])
96-
elif nd[0] in ('P', 'V'): # User print output or set variable.
97-
data_string += '{} {} {}\n'.format(*nd)
98-
elif nd[0] == '!': # Warning
99-
data_string = '! {}\n'.format(nd[1])
100-
elif nd[0] == '!!': # Crash traceback.
101-
error_string = nd[1]
102-
if not verbose: # In data files multi-line tracebacks have ! prepended to all lines aid parsing data file.
103-
error_string = '! ' + error_string.replace('\n', '\n! ')
104-
data_string += '\n' + error_string + '\n'
100+
if nd.type == 'D': # State entry or event.
101+
if nd.ID in self.sm_info['states'].values():
102+
data_string += self.tsv_row_str('state', time=nd.time, name=self.ID2name_fw[nd.ID])
103+
else:
104+
data_string += self.tsv_row_str('event', time=nd.time, name=self.ID2name_fw[nd.ID])
105+
elif nd.type == 'P': # User print output.
106+
data_string += self.tsv_row_str('print', time=nd.time, value=nd.data)
107+
elif nd.type == 'V': # Variable.
108+
data_string += self.tsv_row_str('variable', time=nd.time, name=nd.ID, value=nd.data)
109+
elif nd.type == '!': # Warning
110+
data_string += self.tsv_row_str('warning', value=nd.data)
111+
elif nd.type == '!!': # Error
112+
data_string += self.tsv_row_str('error', value=nd.data.replace('\n','|').replace('\r','|'))
113+
elif nd.type == 'S': # Framework stop.
114+
self.end_datetime = datetime.utcnow()
115+
self.end_timestamp = nd.time
105116
return data_string
106117

107-
def save_analog_chunk(self, ID, sampling_rate, timestamp, data_array):
108-
'''Save a chunk of analog data to .pca data file. File is created if not
109-
already open for that analog input.'''
110-
if not self.analog_files[ID]:
111-
file_name = os.path.splitext(self.file_path)[0] + '_' + \
112-
self.ID2name_hw[ID] + '.pca'
113-
self.analog_files[ID] = open(file_name, 'wb')
114-
ms_per_sample = 1000 / sampling_rate
115-
for i, x in enumerate(data_array):
116-
t = int(timestamp + i*ms_per_sample)
117-
self.analog_files[ID].write(t.to_bytes(4,'little', signed=True))
118-
self.analog_files[ID].write(x.to_bytes(4,'little', signed=True))
119-
self.analog_files[ID].flush()
118+
119+
class Analog_writer():
120+
'''Class for writing data from one analog input to disk.'''
121+
122+
def __init__(self, name, sampling_rate, data_type, session_filepath):
123+
self.name = name
124+
self.sampling_rate = sampling_rate
125+
self.data_type = data_type
126+
self.open_data_files(session_filepath)
127+
128+
def open_data_files(self, session_filepath):
129+
ses_path_stem, file_ext = os.path.splitext(session_filepath)
130+
self.path_stem = ses_path_stem + f'_{self.name}'
131+
self.t_tempfile_path = self.path_stem + '.time.temp'
132+
self.d_tempfile_path = self.path_stem + f'.data-1{self.data_type}.temp'
133+
self.time_tempfile = open(self.t_tempfile_path, 'wb')
134+
self.data_tempfile = open(self.d_tempfile_path, 'wb')
135+
self.next_chunk_start_time = 0
136+
137+
def close_files(self):
138+
'''Close data files. Convert temp files to numpy.'''
139+
self.time_tempfile.close()
140+
self.data_tempfile.close()
141+
with open(self.t_tempfile_path, 'rb') as f:
142+
times = np.frombuffer(f.read(), dtype='float64')
143+
np.save(self.path_stem + '.time.npy', times)
144+
with open(self.d_tempfile_path, 'rb') as f:
145+
data = np.frombuffer(f.read(), dtype=self.data_type)
146+
np.save(self.path_stem + '.data.npy', data)
147+
os.remove(self.t_tempfile_path)
148+
os.remove(self.d_tempfile_path)
149+
150+
def save_analog_chunk(self, timestamp, data_array):
151+
'''Save a chunk of analog data to .pca data file.'''
152+
if np.abs(self.next_chunk_start_time - timestamp/1000)<0.001:
153+
chunk_start_time = self.next_chunk_start_time
154+
else:
155+
chunk_start_time = timestamp/1000
156+
times = (np.arange(len(data_array), dtype='float64')
157+
/ self.sampling_rate) + chunk_start_time # Seconds
158+
self.time_tempfile.write(times.tobytes())
159+
self.data_tempfile.write(data_array.tobytes())
160+
self.time_tempfile.flush()
161+
self.data_tempfile.flush()
162+
self.next_chunk_start_time = chunk_start_time+len(data_array)/self.sampling_rate

0 commit comments

Comments
 (0)