Skip to content

Commit 6a1fab2

Browse files
committed
workflow progress bar.
1 parent 7c178bb commit 6a1fab2

13 files changed

+1499
-69
lines changed

planemo/commands/cmd_workflow_test_on_invocation.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515

1616
@click.command("workflow_test_on_invocation")
1717
@options.optional_tools_arg(multiple=False, allow_uris=False, metavar="TEST.YML")
18-
@options.required_invocation_id_arg()
19-
@options.galaxy_url_option(required=True)
20-
@options.galaxy_user_key_option(required=True)
18+
@options.invocation_target_options()
2119
@options.test_index_option()
2220
@options.test_output_options()
2321
@command_function
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""Module describing the planemo ``workflow_track`` command."""
2+
3+
import click
4+
5+
from planemo import options
6+
from planemo.cli import command_function
7+
from planemo.engine.factory import engine_context
8+
from planemo.galaxy.workflow_progress import WorkflowProgress
9+
10+
11+
@click.command("workflow_track")
12+
@options.invocation_target_options()
13+
@command_function
14+
def cli(ctx, invocation_id, **kwds):
15+
"""Run defined tests against existing workflow invocation."""
16+
with WorkflowProgress() as workflow_progress:
17+
workflow_progress.add_bars()
18+
import time
19+
20+
time.sleep(1)
21+
new_step = {"state": "new"}
22+
scheduled_step = {"state": "scheduled"}
23+
new_steps = [new_step, new_step, new_step]
24+
one_scheduled_steps = [scheduled_step, new_step, new_step]
25+
two_scheduled_steps = [scheduled_step, scheduled_step, new_step]
26+
all_scheduled_steps = [scheduled_step, scheduled_step, scheduled_step]
27+
state_pairs = [
28+
({"state": "new"}, {}),
29+
({"state": "ready", "steps": new_steps}, {}),
30+
({"state": "ready", "steps": one_scheduled_steps}, {"states": {"new": 1}}),
31+
({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 2}}),
32+
({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "running": 1}}),
33+
({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "ok": 1}}),
34+
({"state": "ready", "steps": two_scheduled_steps}, {"states": {"ok": 2}}),
35+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 2, "new": 3}}),
36+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 2, "running": 1, "new": 2}}),
37+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 3, "running": 1, "new": 1}}),
38+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 4, "running": 1}}),
39+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}),
40+
]
41+
for invocation, job_states_summary in state_pairs:
42+
workflow_progress.handle_invocation(invocation, job_states_summary)
43+
time.sleep(1)
44+
45+
with engine_context(ctx, engine="external_galaxy", **kwds) as engine, engine.ensure_runnables_served([]) as config:
46+
user_gi = config.user_gi
47+
invocation = user_gi.invocations.show_invocation(invocation_id)
48+
# https://stackoverflow.com/questions/23113494/double-progress-bar-in-python
49+
50+
ctx.exit(0)

planemo/galaxy/activity.py

Lines changed: 15 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import os
44
import sys
55
import tempfile
6-
import time
76
import traceback
87
from datetime import datetime
98
from typing import (
@@ -42,6 +41,12 @@
4241
retry_on_timeouts,
4342
summarize_history,
4443
)
44+
from planemo.galaxy.invocations.api import BioblendInvocationApi
45+
from planemo.galaxy.invocations.polling import (
46+
PollingTrackerImpl,
47+
)
48+
from planemo.galaxy.invocations.polling import wait_for_invocation_and_jobs as polling_wait_for_invocation_and_jobs
49+
from planemo.galaxy.invocations.progress import WorkflowProgressDisplay
4550
from planemo.io import wait_on
4651
from planemo.runnable import (
4752
ErrorRunResponse,
@@ -770,58 +775,15 @@ def _history_id(gi, **kwds) -> str:
770775
def wait_for_invocation_and_jobs(
771776
ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, polling_backoff: int
772777
):
773-
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
774-
final_invocation_state = "new"
775-
776-
# TODO: hook in invocation["messages"]
777-
error_message = ""
778-
job_state = "ok"
779-
try:
780-
final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff)
781-
assert final_invocation_state == "scheduled"
782-
except Exception as e:
783-
ctx.vlog(f"Problem waiting on invocation: {str(e)}")
784-
summarize_history(ctx, user_gi, history_id)
785-
error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]"
786-
787-
ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")
788-
789-
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
790-
if job_state not in ("ok", "skipped"):
791-
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
792-
error_message = msg if not error_message else f"{error_message}. {msg}"
793-
else:
794-
# wait for possible subworkflow invocations
795-
invocation = user_gi.invocations.show_invocation(invocation_id)
796-
for step in invocation["steps"]:
797-
if step.get("subworkflow_invocation_id") is not None:
798-
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
799-
ctx,
800-
invocation_id=step["subworkflow_invocation_id"],
801-
history_id=history_id,
802-
user_gi=user_gi,
803-
polling_backoff=polling_backoff,
804-
)
805-
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
806-
return final_invocation_state, job_state, error_message
807-
808-
ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'")
809-
return final_invocation_state, job_state, error_message
810-
811-
812-
def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0):
813-
def state_func():
814-
return retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id))
815-
816-
return _wait_on_state(state_func, polling_backoff)
817-
818-
819-
def has_jobs_in_states(ctx, gi, history_id, states):
820-
params = {"history_id": history_id}
821-
jobs_url = gi.url + "/jobs"
822-
jobs = gi.jobs._get(url=jobs_url, params=params)
823-
target_jobs = [j for j in jobs if j["state"] in states]
824-
return len(target_jobs) > 0
778+
polling_tracker = PollingTrackerImpl(polling_backoff)
779+
invocation_api = BioblendInvocationApi(ctx, user_gi)
780+
with WorkflowProgressDisplay(invocation_id) as workflow_progress_display:
781+
final_invocation_state, job_state, error_message = polling_wait_for_invocation_and_jobs(
782+
ctx, invocation_id, invocation_api, polling_tracker, workflow_progress_display
783+
)
784+
if error_message:
785+
summarize_history(ctx, user_gi, history_id)
786+
return final_invocation_state, job_state, error_message
825787

826788

827789
def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
@@ -835,19 +797,6 @@ def state_func():
835797
return _wait_on_state(state_func, polling_backoff)
836798

837799

838-
def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0):
839-
# Wait for invocation jobs to finish. Less brittle than waiting for a history to finish,
840-
# as you could have more than one invocation in a history, or an invocation without
841-
# steps that produce history items.
842-
843-
ctx.log(f"waiting for invocation {invocation_id}")
844-
845-
def state_func():
846-
return retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id))
847-
848-
return _wait_on_state(state_func, polling_backoff)
849-
850-
851800
def _wait_for_job(gi, job_id, timeout=None):
852801
def state_func():
853802
return gi.jobs.show_job(job_id, full_details=True)

planemo/galaxy/invocations/__init__.py

Whitespace-only changes.

planemo/galaxy/invocations/api.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""API interaction for Galaxy's workflow invocation API.
2+
3+
Gives a mockable surface for testing, type contract consumed by Planemo,
4+
and builtin utilities around bioblend for working around transient request
5+
issues that have been observed in practice.
6+
"""
7+
8+
from typing import (
9+
Dict,
10+
List,
11+
Optional,
12+
Protocol,
13+
)
14+
15+
from typing_extensions import TypedDict
16+
17+
from planemo.galaxy.api import retry_on_timeouts
18+
19+
20+
class InvocationStep(TypedDict, total=False):
21+
state: Optional[str]
22+
subworkflow_invocation_id: Optional[str]
23+
24+
25+
class Invocation(TypedDict, total=False):
26+
id: str
27+
state: str
28+
steps: List[InvocationStep]
29+
30+
31+
class InvocationJobsSummary(TypedDict, total=False):
32+
states: Dict[str, int]
33+
34+
35+
class InvocationApi(Protocol):
36+
37+
def get_invocation(self, invocation_id: str) -> Invocation: ...
38+
39+
def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary: ...
40+
41+
42+
class BioblendInvocationApi(InvocationApi):
43+
44+
def __init__(self, ctx, user_gi):
45+
self._ctx = ctx
46+
self._user_gi = user_gi
47+
48+
def get_invocation(self, invocation_id: str) -> Invocation:
49+
return retry_on_timeouts(self._ctx, self._user_gi, lambda gi: gi.invocations.show_invocation(invocation_id))
50+
51+
def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary:
52+
return retry_on_timeouts(
53+
self._ctx, self._user_gi, lambda gi: gi.invocations.get_invocation_summary(invocation_id)
54+
)
55+
56+
57+
def invocation_state_terminal(state: str):
58+
return state in ["scheduled", "cancelled", "failed"]
59+
60+
61+
JOB_ERROR_STATES = ["error", "deleted", "failed", "stopped", "stop", "deleting"]

0 commit comments

Comments
 (0)