Skip to content

Commit 23bd828

Browse files
committed
Defer using toil status for now and use the log to determine toil's state.
1 parent f64b3bc commit 23bd828

File tree

1 file changed

+31
-38
lines changed

1 file changed

+31
-38
lines changed

wes_service/toil_wes.py

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -217,59 +217,52 @@ def run(self, request, tempdir, opts):
217217

218218
def getstate(self):
219219
"""
220-
Returns INITIALIZING, -1
221-
RUNNING, -1
222-
COMPLETE, 0
220+
Returns QUEUED, -1
221+
INITIALIZING, -1
222+
RUNNING, -1
223+
COMPLETE, 0
223224
or
224225
EXECUTOR_ERROR, 255
225226
"""
226-
state = "RUNNING"
227-
exit_code = -1
228-
229227
# the jobstore never existed
230228
if not os.path.exists(self.jobstorefile):
231-
logging.info('Workflow ' + self.run_id + ': ' + "QUEUED")
229+
logging.info('Workflow ' + self.run_id + ': QUEUED')
232230
return "QUEUED", -1
233231

234232
# completed earlier
235233
if os.path.exists(self.statcompletefile):
236-
logging.info('Workflow ' + self.run_id + ': ' + "COMPLETE")
234+
logging.info('Workflow ' + self.run_id + ': COMPLETE')
237235
return "COMPLETE", 0
238236

239237
# errored earlier
240238
if os.path.exists(self.staterrorfile):
241-
logging.info('Workflow ' + self.run_id + ': ' + "EXECUTOR_ERROR")
239+
logging.info('Workflow ' + self.run_id + ': EXECUTOR_ERROR')
242240
return "EXECUTOR_ERROR", 255
243241

244-
# query toil for status
245-
with open(self.jobstorefile, 'r') as f:
246-
self.jobstore = f.read()
247-
p = subprocess.Popen(['toil', 'status', self.jobstore, '--printLogs'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
248-
logs, stderr = p.communicate()
249-
250-
if 'ERROR:toil.worker:Exiting' in logs or \
251-
'ERROR:toil.worker:Exiting' in stderr:
252-
open(self.staterrorfile, 'a').close()
253-
state = "EXECUTOR_ERROR"
254-
exit_code = 255
255-
# the jobstore existed once, but was deleted
256-
elif 'No job store found.' in logs or \
257-
'No job store found.' in stderr:
258-
with open(self.errfile, 'r') as f:
259-
for line in f:
260-
if 'Finished toil run successfully.' in line:
261-
logging.info('Workflow ' + self.run_id + ': ' + "COMPLETE")
262-
open(self.statcompletefile, 'a').close()
263-
return "COMPLETE", 0
264-
if 'returned non-zero exit status' in line:
265-
logging.info('Workflow ' + self.run_id + ': ' + "COMPLETE")
266-
open(self.staterrorfile, 'a').close()
267-
return "EXECUTOR_ERROR", 255
268-
state = "INITIALIZING"
269-
exit_code = -1
270-
271-
logging.info('Workflow ' + self.run_id + ': ' + state)
272-
return state, exit_code
242+
# the workflow is staged but has not run yet
243+
if not os.path.exists(self.stderr):
244+
logging.info('Workflow ' + self.run_id + ': INITIALIZING')
245+
return "INITIALIZING", -1
246+
247+
# TODO: Query with "toil status"
248+
completed = False
249+
with open(self.errfile, 'r') as f:
250+
for line in f:
251+
if 'Traceback (most recent call last)' in line:
252+
logging.info('Workflow ' + self.run_id + ': EXECUTOR_ERROR')
253+
open(self.staterrorfile, 'a').close()
254+
return "EXECUTOR_ERROR", 255
255+
# run can complete successfully but fail to upload outputs to cloud buckets
256+
# so save the completed status and make sure there was no error elsewhere
257+
if 'Finished toil run successfully.' in line:
258+
completed = True
259+
if completed:
260+
logging.info('Workflow ' + self.run_id + ': COMPLETE')
261+
open(self.statcompletefile, 'a').close()
262+
return "COMPLETE", 0
263+
264+
logging.info('Workflow ' + self.run_id + ': RUNNING')
265+
return "RUNNING", -1
273266

274267
def getstatus(self):
275268
state, exit_code = self.getstate()

0 commit comments

Comments
 (0)