Skip to content

Commit 9371ae5

Browse files
authored
Move invocation of output callback for workflows (#396)
* Move invocation of output callback for workflows from the job() method to the receive_output() method. This closes a gap whereby sometimes all steps of a subworkflow have completed, but it has not yet called its output callback. This can lead to the parent workflow incorrectly believing no progress has been made and returning 'None' (which indicates a workflow deadlock if there are no jobs currently executing) instead of re-evaluating pending steps to see if they can be run. * Restore call to output on workflow completion, for cases where workflow exits early and returns partial results.
1 parent 3de4159 commit 9371ae5

File tree

1 file changed

+37
-20
lines changed

1 file changed

+37
-20
lines changed

cwltool/workflow.py

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,9 @@ def job(self, joborder, output_callback, **kwargs):
245245
# type: (Dict[Text, Text], functools.partial[None], **Any) -> Generator
246246
kwargs["part_of"] = self.name
247247
kwargs["name"] = shortname(self.id)
248+
249+
_logger.info(u"[%s] start", self.name)
250+
248251
for j in self.step.job(joborder, output_callback, **kwargs):
249252
yield j
250253

@@ -257,6 +260,8 @@ def __init__(self, workflow, **kwargs):
257260
self.steps = [WorkflowJobStep(s) for s in workflow.steps]
258261
self.state = None # type: Dict[Text, WorkflowStateItem]
259262
self.processStatus = None # type: Text
263+
self.did_callback = False
264+
260265
if "outdir" in kwargs:
261266
self.outdir = kwargs["outdir"]
262267
elif "tmp_outdir_prefix" in kwargs:
@@ -270,8 +275,27 @@ def __init__(self, workflow, **kwargs):
270275
_logger.debug(u"[%s] initialized from %s", self.name,
271276
self.tool.get("id", "workflow embedded in %s" % kwargs.get("part_of")))
272277

273-
def receive_output(self, step, outputparms, jobout, processStatus):
274-
# type: (WorkflowJobStep, List[Dict[Text,Text]], Dict[Text,Text], Text) -> None
278+
def do_output_callback(self, final_output_callback):
279+
# type: (Callable[[Any, Any], Any]) -> None
280+
281+
supportsMultipleInput = bool(self.workflow.get_requirement("MultipleInputFeatureRequirement")[0])
282+
283+
try:
284+
wo = object_from_state(self.state, self.tool["outputs"], True, supportsMultipleInput, "outputSource",
285+
incomplete=True)
286+
except WorkflowException as e:
287+
_logger.error(u"[%s] Cannot collect workflow output: %s", self.name, e)
288+
wo = {}
289+
self.processStatus = "permanentFail"
290+
291+
_logger.info(u"[%s] completed %s", self.name, self.processStatus)
292+
293+
self.did_callback = True
294+
295+
final_output_callback(wo, self.processStatus)
296+
297+
def receive_output(self, step, outputparms, final_output_callback, jobout, processStatus):
298+
# type: (WorkflowJobStep, List[Dict[Text,Text]], Callable[[Any, Any], Any], Dict[Text,Text], Text) -> None
275299

276300
for i in outputparms:
277301
if "id" in i:
@@ -295,8 +319,12 @@ def receive_output(self, step, outputparms, jobout, processStatus):
295319
step.completed = True
296320
self.made_progress = True
297321

298-
def try_make_job(self, step, **kwargs):
299-
# type: (WorkflowJobStep, **Any) -> Generator
322+
completed = sum(1 for s in self.steps if s.completed)
323+
if completed == len(self.steps):
324+
self.do_output_callback(final_output_callback)
325+
326+
def try_make_job(self, step, final_output_callback, **kwargs):
327+
# type: (WorkflowJobStep, Callable[[Any, Any], Any], **Any) -> Generator
300328
inputparms = step.tool["inputs"]
301329
outputparms = step.tool["outputs"]
302330

@@ -315,7 +343,7 @@ def try_make_job(self, step, **kwargs):
315343

316344
_logger.debug(u"[%s] starting %s", self.name, step.name)
317345

318-
callback = functools.partial(self.receive_output, step, outputparms)
346+
callback = functools.partial(self.receive_output, step, outputparms, final_output_callback)
319347

320348
valueFrom = {
321349
i["id"]: i["valueFrom"] for i in step.tool["inputs"]
@@ -394,7 +422,7 @@ def valueFromFunc(k, v): # type: (Any, Any) -> Any
394422
step.completed = True
395423

396424
def run(self, **kwargs):
397-
_logger.debug(u"[%s] workflow starting", self.name)
425+
_logger.info(u"[%s] start", self.name)
398426

399427
def job(self, joborder, output_callback, **kwargs):
400428
# type: (Dict[Text, Any], Callable[[Any, Any], Any], **Any) -> Generator
@@ -429,7 +457,7 @@ def job(self, joborder, output_callback, **kwargs):
429457

430458
if not step.submitted:
431459
try:
432-
step.iterable = self.try_make_job(step, **kwargs)
460+
step.iterable = self.try_make_job(step, output_callback, **kwargs)
433461
except WorkflowException as e:
434462
_logger.error(u"[%s] Cannot make job: %s", step.name, e)
435463
_logger.debug("", exc_info=True)
@@ -458,19 +486,8 @@ def job(self, joborder, output_callback, **kwargs):
458486
else:
459487
yield None
460488

461-
supportsMultipleInput = bool(self.workflow.get_requirement("MultipleInputFeatureRequirement")[0])
462-
463-
try:
464-
wo = object_from_state(self.state, self.tool["outputs"], True, supportsMultipleInput, "outputSource",
465-
incomplete=True)
466-
except WorkflowException as e:
467-
_logger.error(u"[%s] Cannot collect workflow output: %s", self.name, e)
468-
wo = {}
469-
self.processStatus = "permanentFail"
470-
471-
_logger.info(u"[%s] outdir is %s", self.name, self.outdir)
472-
473-
output_callback(wo, self.processStatus)
489+
if not self.did_callback:
490+
self.do_output_callback(output_callback)
474491

475492

476493
class Workflow(Process):

0 commit comments

Comments
 (0)