3
3
from datetime import datetime
4
4
from shutil import copyfile
5
5
6
- class Data_logger ():
7
- '''Class for logging data from a pyControl setup to disk'''
6
+
7
+ class Data_logger :
8
+ """Class for logging data from a pyControl setup to disk"""
8
9
9
10
def __init__ (self , sm_info = None , print_func = None , data_consumers = []):
10
11
self .data_file = None
@@ -16,55 +17,58 @@ def __init__(self, sm_info=None, print_func=None, data_consumers=[]):
16
17
17
18
def set_state_machine (self , sm_info ):
18
19
self .sm_info = sm_info
19
- self .ID2name_fw = self .sm_info [' ID2name' ] # Dict mapping framework IDs to names.
20
-
20
+ self .ID2name_fw = self .sm_info [" ID2name" ] # Dict mapping framework IDs to names.
21
+
21
22
def open_data_file (self , data_dir , experiment_name , setup_ID , subject_ID , datetime_now = None ):
22
- ''' Open file tsv/txt file for event data and write header information.
23
- If state machine uses analog inputs instantiate analog data writers.'''
23
+ """ Open file tsv/txt file for event data and write header information.
24
+ If state machine uses analog inputs instantiate analog data writers."""
24
25
self .data_dir = data_dir
25
26
self .experiment_name = experiment_name
26
27
self .subject_ID = subject_ID
27
28
self .setup_ID = setup_ID
28
- if datetime_now is None : datetime_now = datetime .now ()
29
+ if datetime_now is None :
30
+ datetime_now = datetime .now ()
29
31
self .end_timestamp = - 1
30
- file_name = self .subject_ID + datetime_now .strftime (' -%Y-%m-%d-%H%M%S' ) + ' .tsv'
32
+ file_name = self .subject_ID + datetime_now .strftime (" -%Y-%m-%d-%H%M%S" ) + " .tsv"
31
33
self .file_path = os .path .join (self .data_dir , file_name )
32
- self .data_file = open (self .file_path , 'w' , newline = '\n ' )
33
- self .data_file .write (self .tsv_row_str ( # Write header with row names.
34
- rtype = 'type' , time = 'time' , name = 'name' , value = 'value' ))
35
- self .write_info_line ('experiment_name' , self .experiment_name )
36
- self .write_info_line ('task_name' , self .sm_info ['name' ])
37
- self .write_info_line ('task_file_hash' , self .sm_info ['task_hash' ])
38
- self .write_info_line ('setup_id' , self .setup_ID )
39
- self .write_info_line ('framework_version' , self .sm_info ['framework_version' ])
40
- self .write_info_line ('micropython_version' , self .sm_info ['micropython_version' ])
41
- self .write_info_line ('subject_id' , self .subject_ID )
42
- self .write_info_line ('start_time' , datetime .utcnow ().isoformat (timespec = 'milliseconds' ))
43
- self .analog_writers = {ID :
44
- Analog_writer (ai ['name' ], ai ['fs' ], ai ['dtype' ], self .file_path )
45
- for ID , ai in self .sm_info ['analog_inputs' ].items ()}
34
+ self .data_file = open (self .file_path , "w" , newline = "\n " )
35
+ self .data_file .write (
36
+ self .tsv_row_str (rtype = "type" , time = "time" , name = "name" , value = "value" ) # Write header with row names.
37
+ )
38
+ self .write_info_line ("experiment_name" , self .experiment_name )
39
+ self .write_info_line ("task_name" , self .sm_info ["name" ])
40
+ self .write_info_line ("task_file_hash" , self .sm_info ["task_hash" ])
41
+ self .write_info_line ("setup_id" , self .setup_ID )
42
+ self .write_info_line ("framework_version" , self .sm_info ["framework_version" ])
43
+ self .write_info_line ("micropython_version" , self .sm_info ["micropython_version" ])
44
+ self .write_info_line ("subject_id" , self .subject_ID )
45
+ self .write_info_line ("start_time" , datetime .utcnow ().isoformat (timespec = "milliseconds" ))
46
+ self .analog_writers = {
47
+ ID : Analog_writer (ai ["name" ], ai ["fs" ], ai ["dtype" ], self .file_path )
48
+ for ID , ai in self .sm_info ["analog_inputs" ].items ()
49
+ }
46
50
47
51
def write_info_line (self , name , value , time = 0 ):
48
- self .data_file .write (self .tsv_row_str (' info' , time = time , name = name , value = value ))
52
+ self .data_file .write (self .tsv_row_str (" info" , time = time , name = name , value = value ))
49
53
50
- def tsv_row_str (self , rtype , time = '' , name = '' , value = '' ):
51
- time_str = f' { time / 1000 :.3f} ' if type (time ) == int else time
52
- return f' { time_str } \t { rtype } \t { name } \t { value } \n '
54
+ def tsv_row_str (self , rtype , time = "" , name = "" , value = "" ):
55
+ time_str = f" { time / 1000 :.3f} " if type (time ) == int else time
56
+ return f" { time_str } \t { rtype } \t { name } \t { value } \n "
53
57
54
- def copy_task_file (self , data_dir , tasks_dir , dir_name = ' task_files' ):
55
- ''' If not already present, copy task file to data_dir/dir_name
56
- appending the files djb2 hash to the file name.'''
58
+ def copy_task_file (self , data_dir , tasks_dir , dir_name = " task_files" ):
59
+ """ If not already present, copy task file to data_dir/dir_name
60
+ appending the files djb2 hash to the file name."""
57
61
exp_tasks_dir = os .path .join (data_dir , dir_name )
58
62
if not os .path .exists (exp_tasks_dir ):
59
63
os .mkdir (exp_tasks_dir )
60
- task_file_path = os .path .join (tasks_dir , self .sm_info [' name' ] + ' .py' )
61
- task_save_name = os .path .split (self .sm_info [' name' ])[1 ] + ' _{}.py' .format (self .sm_info [' task_hash' ])
64
+ task_file_path = os .path .join (tasks_dir , self .sm_info [" name" ] + " .py" )
65
+ task_save_name = os .path .split (self .sm_info [" name" ])[1 ] + " _{}.py" .format (self .sm_info [" task_hash" ])
62
66
if not task_save_name in os .listdir (exp_tasks_dir ):
63
67
copyfile (task_file_path , os .path .join (exp_tasks_dir , task_save_name ))
64
-
68
+
65
69
def close_files (self ):
66
70
if self .data_file :
67
- self .write_info_line (' end_time' , self .end_datetime .isoformat (timespec = ' milliseconds' ), self .end_timestamp )
71
+ self .write_info_line (" end_time" , self .end_datetime .isoformat (timespec = " milliseconds" ), self .end_timestamp )
68
72
self .data_file .close ()
69
73
self .data_file = None
70
74
self .file_path = None
@@ -73,12 +77,12 @@ def close_files(self):
73
77
self .analog_writers = {}
74
78
75
79
def process_data (self , new_data ):
76
- ''' If data _file is open new data is written to file. If print_func is specified
77
- human readable data strings are passed to it.'''
80
+ """ If data _file is open new data is written to file. If print_func is specified
81
+ human readable data strings are passed to it."""
78
82
if self .data_file :
79
83
self .write_to_file (new_data )
80
84
if self .print_func :
81
- self .print_func (self .data_to_string (new_data ).replace (' \t \t ' , ' \t ' ), end = '' )
85
+ self .print_func (self .data_to_string (new_data ).replace (" \t \t " , " \t " ), end = "" )
82
86
if self .data_consumers :
83
87
for data_consumer in self .data_consumers :
84
88
data_consumer .process_data (new_data )
@@ -89,35 +93,35 @@ def write_to_file(self, new_data):
89
93
self .data_file .write (data_string )
90
94
self .data_file .flush ()
91
95
for nd in new_data :
92
- if nd .type == 'A' :
96
+ if nd .type == "A" :
93
97
self .analog_writers [nd .ID ].save_analog_chunk (timestamp = nd .time , data_array = nd .data )
94
98
95
99
def data_to_string (self , new_data ):
96
- ''' Convert list of data tuples into a string. If verbose=True state and event names are used,
97
- if verbose=False state and event IDs are used.'''
98
- data_string = ''
100
+ """ Convert list of data tuples into a string. If verbose=True state and event names are used,
101
+ if verbose=False state and event IDs are used."""
102
+ data_string = ""
99
103
for nd in new_data :
100
- if nd .type == 'D' : # State entry or event.
101
- if nd .ID in self .sm_info [' states' ].values ():
102
- data_string += self .tsv_row_str (' state' , time = nd .time , name = self .ID2name_fw [nd .ID ])
104
+ if nd .type == "D" : # State entry or event.
105
+ if nd .ID in self .sm_info [" states" ].values ():
106
+ data_string += self .tsv_row_str (" state" , time = nd .time , name = self .ID2name_fw [nd .ID ])
103
107
else :
104
- data_string += self .tsv_row_str (' event' , time = nd .time , name = self .ID2name_fw [nd .ID ])
105
- elif nd .type == 'P' : # User print output.
106
- data_string += self .tsv_row_str (' print' , time = nd .time , value = nd .data )
107
- elif nd .type == 'V' : # Variable.
108
- data_string += self .tsv_row_str (' variable' , time = nd .time , name = nd .ID , value = nd .data )
109
- elif nd .type == '!' : # Warning
110
- data_string += self .tsv_row_str (' warning' , value = nd .data )
111
- elif nd .type == '!!' : # Error
112
- data_string += self .tsv_row_str (' error' , value = nd .data .replace (' \n ' , '|' ).replace (' \r ' , '|' ))
113
- elif nd .type == 'S' : # Framework stop.
108
+ data_string += self .tsv_row_str (" event" , time = nd .time , name = self .ID2name_fw [nd .ID ])
109
+ elif nd .type == "P" : # User print output.
110
+ data_string += self .tsv_row_str (" print" , time = nd .time , value = nd .data )
111
+ elif nd .type == "V" : # Variable.
112
+ data_string += self .tsv_row_str (" variable" , time = nd .time , name = nd .ID , value = nd .data )
113
+ elif nd .type == "!" : # Warning
114
+ data_string += self .tsv_row_str (" warning" , value = nd .data )
115
+ elif nd .type == "!!" : # Error
116
+ data_string += self .tsv_row_str (" error" , value = nd .data .replace (" \n " , "|" ).replace (" \r " , "|" ))
117
+ elif nd .type == "S" : # Framework stop.
114
118
self .end_datetime = datetime .utcnow ()
115
119
self .end_timestamp = nd .time
116
120
return data_string
117
121
118
122
119
- class Analog_writer () :
120
- ''' Class for writing data from one analog input to disk.'''
123
+ class Analog_writer :
124
+ """ Class for writing data from one analog input to disk."""
121
125
122
126
def __init__ (self , name , sampling_rate , data_type , session_filepath ):
123
127
self .name = name
@@ -127,36 +131,35 @@ def __init__(self, name, sampling_rate, data_type, session_filepath):
127
131
128
132
def open_data_files (self , session_filepath ):
129
133
ses_path_stem , file_ext = os .path .splitext (session_filepath )
130
- self .path_stem = ses_path_stem + f' _{ self .name } '
131
- self .t_tempfile_path = self .path_stem + ' .time.temp'
132
- self .d_tempfile_path = self .path_stem + f' .data-1{ self .data_type } .temp'
133
- self .time_tempfile = open (self .t_tempfile_path , 'wb' )
134
- self .data_tempfile = open (self .d_tempfile_path , 'wb' )
134
+ self .path_stem = ses_path_stem + f" _{ self .name } "
135
+ self .t_tempfile_path = self .path_stem + " .time.temp"
136
+ self .d_tempfile_path = self .path_stem + f" .data-1{ self .data_type } .temp"
137
+ self .time_tempfile = open (self .t_tempfile_path , "wb" )
138
+ self .data_tempfile = open (self .d_tempfile_path , "wb" )
135
139
self .next_chunk_start_time = 0
136
140
137
141
def close_files (self ):
138
- ''' Close data files. Convert temp files to numpy.'''
142
+ """ Close data files. Convert temp files to numpy."""
139
143
self .time_tempfile .close ()
140
144
self .data_tempfile .close ()
141
- with open (self .t_tempfile_path , 'rb' ) as f :
142
- times = np .frombuffer (f .read (), dtype = ' float64' )
143
- np .save (self .path_stem + ' .time.npy' , times )
144
- with open (self .d_tempfile_path , 'rb' ) as f :
145
+ with open (self .t_tempfile_path , "rb" ) as f :
146
+ times = np .frombuffer (f .read (), dtype = " float64" )
147
+ np .save (self .path_stem + " .time.npy" , times )
148
+ with open (self .d_tempfile_path , "rb" ) as f :
145
149
data = np .frombuffer (f .read (), dtype = self .data_type )
146
- np .save (self .path_stem + ' .data.npy' , data )
150
+ np .save (self .path_stem + " .data.npy" , data )
147
151
os .remove (self .t_tempfile_path )
148
152
os .remove (self .d_tempfile_path )
149
153
150
154
def save_analog_chunk (self , timestamp , data_array ):
151
- ''' Save a chunk of analog data to .pca data file.'''
152
- if np .abs (self .next_chunk_start_time - timestamp / 1000 )< 0.001 :
155
+ """ Save a chunk of analog data to .pca data file."""
156
+ if np .abs (self .next_chunk_start_time - timestamp / 1000 ) < 0.001 :
153
157
chunk_start_time = self .next_chunk_start_time
154
158
else :
155
- chunk_start_time = timestamp / 1000
156
- times = (np .arange (len (data_array ), dtype = 'float64' )
157
- / self .sampling_rate ) + chunk_start_time # Seconds
159
+ chunk_start_time = timestamp / 1000
160
+ times = (np .arange (len (data_array ), dtype = "float64" ) / self .sampling_rate ) + chunk_start_time # Seconds
158
161
self .time_tempfile .write (times .tobytes ())
159
162
self .data_tempfile .write (data_array .tobytes ())
160
163
self .time_tempfile .flush ()
161
164
self .data_tempfile .flush ()
162
- self .next_chunk_start_time = chunk_start_time + len (data_array )/ self .sampling_rate
165
+ self .next_chunk_start_time = chunk_start_time + len (data_array ) / self .sampling_rate
0 commit comments