@@ -27,7 +27,8 @@ class JobState(Enum):
27
27
FINISHED = "finished"
28
28
29
29
30
- class JobData ():
30
+ class JobData :
31
+
31
32
def __init__ (self , cache = True ):
32
33
self .MAX_ROWS = 1000
33
34
@@ -66,7 +67,7 @@ def cache(self):
66
67
67
68
location = path .join (self .location , str (self .cache_count ))
68
69
try :
69
- with open (location , 'wb' ) as f :
70
+ with open (location , "wb" ) as f :
70
71
dump (self .data , f )
71
72
self .data = []
72
73
self .cache_count += 1
@@ -76,7 +77,8 @@ def cache(self):
76
77
def read (self ):
77
78
return self .JobDataIter (self )
78
79
79
- class JobDataIter ():
80
+ class JobDataIter :
81
+
80
82
def __init__ (self , job_data ):
81
83
self .job_data = job_data
82
84
self .cache_count = 0
@@ -92,7 +94,7 @@ def read_cache_i(self, i):
92
94
location = path .join (self .job_data .location , str (i ))
93
95
94
96
try :
95
- with open (location , 'rb' ) as f :
97
+ with open (location , "rb" ) as f :
96
98
self .data = load (f )
97
99
self .i = 0
98
100
self .cache_count += 1
@@ -111,7 +113,7 @@ def read_row(self):
111
113
112
114
for key in self .job_data .keys :
113
115
if not row .get (key ):
114
- row [key ] = ''
116
+ row [key ] = ""
115
117
116
118
return row
117
119
@@ -120,7 +122,9 @@ def clean_up(self):
120
122
121
123
def __next__ (self ):
122
124
if not self .finished_cache :
123
- if (self .cache_count <= self .job_data .cache_count ) and self .job_data .cache_count != 0 :
125
+ if (
126
+ self .cache_count <= self .job_data .cache_count
127
+ ) and self .job_data .cache_count != 0 :
124
128
if self .i < len (self .data ):
125
129
return self .read_row ()
126
130
else :
@@ -140,11 +144,23 @@ def __next__(self):
140
144
raise StopIteration
141
145
142
146
143
- class Job () :
147
+ class Job :
144
148
error_log = QtCore .pyqtSignal (str )
145
149
146
- def __init__ (self , outputPath , sourceName , sourceFunction , functionArgs , sourceKeys , append , keyColumn , encoding ,
147
- cache , job_update , job_error_log ):
150
+ def __init__ (
151
+ self ,
152
+ outputPath ,
153
+ sourceName ,
154
+ sourceFunction ,
155
+ functionArgs ,
156
+ sourceKeys ,
157
+ append ,
158
+ keyColumn ,
159
+ encoding ,
160
+ cache ,
161
+ job_update ,
162
+ job_error_log ,
163
+ ):
148
164
self .source = eval (f"socialreaper.{ sourceName } (**{ sourceKeys } )" )
149
165
self .source .api .log_function = self .log
150
166
self .log_function = job_error_log
@@ -189,9 +205,16 @@ def inc_data(self):
189
205
def end_job (self ):
190
206
self .state = JobState .SAVING
191
207
self .job_update .emit (self )
192
- socialreaper .tools .CSV (self .data .read (), file_name = self .outputPath , flat = False , append = self .append ,
193
- key_column = self .keyColumn , encoding = self .encoding , fill_gaps = False ,
194
- field_names = sorted (self .data .keys ))
208
+ socialreaper .tools .CSV (
209
+ self .data .read (),
210
+ file_name = self .outputPath ,
211
+ flat = False ,
212
+ append = self .append ,
213
+ key_column = self .keyColumn ,
214
+ encoding = self .encoding ,
215
+ fill_gaps = False ,
216
+ field_names = sorted (self .data .keys ),
217
+ )
195
218
self .state = JobState .FINISHED
196
219
self .job_update .emit (self )
197
220
return False
@@ -214,7 +237,7 @@ def pickle(self):
214
237
if not path .exists (dir ):
215
238
makedirs (dir )
216
239
217
- with open (f"{ dir } /out.pickle" , 'wb' ) as f :
240
+ with open (f"{ dir } /out.pickle" , "wb" ) as f :
218
241
dump (self , f )
219
242
220
243
@@ -289,7 +312,14 @@ def add_jobs(self, details):
289
312
try :
290
313
for params in details :
291
314
self .jobs .append (
292
- Job (* params , self .window .encoding , self .window .cache_enabled , self .job_update , self .job_error_log ))
315
+ Job (
316
+ * params ,
317
+ self .window .encoding ,
318
+ self .window .cache_enabled ,
319
+ self .job_update ,
320
+ self .job_error_log ,
321
+ )
322
+ )
293
323
except Exception as e :
294
324
self .job_error_log .emit (format_exc ())
295
325
@@ -329,7 +359,6 @@ def inc_job(self):
329
359
self .currentJobState = None
330
360
self .queue_update .emit (self .jobs )
331
361
332
-
333
362
else :
334
363
self .state = QueueState .STOPPED
335
364
self .queue_update .emit (self .jobs )
0 commit comments