Skip to content

Workflow progress bar. #1510

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ lint: ## check style using tox and flake8 for Python 2 and Python 3
test: ## run tests with the default Python (faster than tox)
$(IN_VENV) pytest $(TESTS)

format: ## format Python code with black
$(IN_VENV) black planemo tests

quick-test: ## run quickest tests with the default Python
$(IN_VENV) PLANEMO_SKIP_SLOW_TESTS=1 PLANEMO_SKIP_GALAXY_TESTS=1 pytest $(TESTS)

Expand Down
1 change: 0 additions & 1 deletion planemo/commands/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
@options.run_download_outputs_option()
@options.engine_options()
@options.test_options()
@options.no_early_termination_option()
@command_function
def cli(ctx, runnable_identifier, job_path, **kwds):
"""Planemo command for running tools and jobs.
Expand Down
4 changes: 1 addition & 3 deletions planemo/commands/cmd_workflow_test_on_invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

@click.command("workflow_test_on_invocation")
@options.optional_tools_arg(multiple=False, allow_uris=False, metavar="TEST.YML")
@options.required_invocation_id_arg()
@options.galaxy_url_option(required=True)
@options.galaxy_user_key_option(required=True)
@options.invocation_target_options()
@options.test_index_option()
@options.test_output_options()
@command_function
Expand Down
26 changes: 26 additions & 0 deletions planemo/commands/cmd_workflow_track.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Module describing the planemo ``workflow_track`` command."""

import click

from planemo import options
from planemo.cli import command_function
from planemo.engine.factory import engine_context
from planemo.galaxy.activity import wait_for_invocation_and_jobs


@click.command("workflow_track")
@options.invocation_target_options()
@command_function
def cli(ctx, invocation_id, **kwds):
"""Follow the progress of a workflow invocation."""
with engine_context(ctx, engine="external_galaxy", **kwds) as engine, engine.ensure_runnables_served([]) as config:
user_gi = config.user_gi
wait_for_invocation_and_jobs(
ctx,
invocation_id,
history_id=None,
user_gi=user_gi,
polling_backoff=5,
)

ctx.exit(0)
159 changes: 44 additions & 115 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import sys
import tempfile
import time
import traceback
from datetime import datetime
from typing import (
Expand Down Expand Up @@ -36,12 +35,16 @@
unicodify,
)
from pathvalidate import sanitize_filename
from requests.exceptions import (
HTTPError,
RequestException,
)
from requests.exceptions import HTTPError

from planemo.galaxy.api import summarize_history
from planemo.galaxy.api import (
retry_on_timeouts,
summarize_history,
)
from planemo.galaxy.invocations.api import BioblendInvocationApi
from planemo.galaxy.invocations.polling import PollingTrackerImpl
from planemo.galaxy.invocations.polling import wait_for_invocation_and_jobs as polling_wait_for_invocation_and_jobs
from planemo.galaxy.invocations.progress import WorkflowProgressDisplay
from planemo.io import wait_on
from planemo.runnable import (
ErrorRunResponse,
Expand Down Expand Up @@ -215,7 +218,6 @@ def _execute( # noqa C901
no_wait=kwds.get("no_wait", False),
start_datetime=start_datetime,
log=log_contents_str(config),
early_termination=not kwds.get("no_early_termination", False),
)

else:
Expand Down Expand Up @@ -249,7 +251,6 @@ def invocation_to_run_response(
no_wait=False,
start_datetime=None,
log=None,
early_termination=True,
):
start_datetime = start_datetime or datetime.now()
invocation_id = invocation["id"]
Expand All @@ -258,19 +259,22 @@ def invocation_to_run_response(

ctx.vlog("Waiting for invocation [%s]" % invocation_id)

final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=invocation_id,
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
early_termination=early_termination,
)
if final_invocation_state not in ("ok", "skipped", "scheduled"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)
if not no_wait:
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=invocation_id,
history_id=history_id,
user_gi=user_gi,
polling_backoff=polling_backoff,
)
if final_invocation_state not in ("ok", "skipped", "scheduled"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)
else:
final_invocation_state = invocation["state"]
job_state = None
error_message = None

return GalaxyWorkflowRunResponse(
ctx,
Expand Down Expand Up @@ -776,86 +780,26 @@ def _history_id(gi, **kwds) -> str:
def wait_for_invocation_and_jobs(
ctx,
invocation_id: str,
history_id: str,
history_id: Optional[str],
user_gi: GalaxyInstance,
no_wait: bool,
polling_backoff: int,
early_termination: bool,
):
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
final_invocation_state = "new"

# TODO: hook in invocation["messages"]
error_message = ""
job_state = "ok"
try:
final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff)
assert final_invocation_state == "scheduled"
except Exception as e:
ctx.vlog(f"Problem waiting on invocation: {str(e)}")
summarize_history(ctx, user_gi, history_id)
error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]"

ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")

if not no_wait:
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff, early_termination)
if job_state not in ("ok", "skipped"):
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
error_message = msg if not error_message else f"{error_message}. {msg}"
else:
# wait for possible subworkflow invocations
invocation = user_gi.invocations.show_invocation(invocation_id)
for step in invocation["steps"]:
if step.get("subworkflow_invocation_id") is not None:
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=step["subworkflow_invocation_id"],
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
early_termination=early_termination,
)
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
return final_invocation_state, job_state, error_message

ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'")
return final_invocation_state, job_state, error_message


def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0):
def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id))

return _wait_on_state(state_func, polling_backoff)


def _retry_on_timeouts(ctx, gi, f):
gi.timeout = 60
try_count = 5
try:
for try_num in range(try_count):
start_time = time.time()
try:
return f(gi)
except RequestException:
end_time = time.time()
if end_time - start_time > 45 and (try_num + 1) < try_count:
ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.")
continue
else:
raise
finally:
gi.timeout = None


def has_jobs_in_states(ctx, gi, history_id, states):
params = {"history_id": history_id}
jobs_url = gi.url + "/jobs"
jobs = gi.jobs._get(url=jobs_url, params=params)
target_jobs = [j for j in jobs if j["state"] in states]
return len(target_jobs) > 0
polling_tracker = PollingTrackerImpl(polling_backoff)
invocation_api = BioblendInvocationApi(ctx, user_gi)
with WorkflowProgressDisplay(invocation_id, galaxy_url=user_gi.base_url) as workflow_progress_display:
final_invocation_state, job_state, error_message = polling_wait_for_invocation_and_jobs(
ctx,
invocation_id,
invocation_api,
polling_tracker,
workflow_progress_display,
)
if error_message:
if not history_id:
invocation = invocation_api.get_invocation(invocation_id)
history_id = invocation["history_id"]
summarize_history(ctx, user_gi, history_id)
return final_invocation_state, job_state, error_message


def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
Expand All @@ -864,32 +808,19 @@ def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
# no need to wait for active jobs anymore I think.

def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))
return retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))

return _wait_on_state(state_func, polling_backoff)


def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0, early_termination=True):
# Wait for invocation jobs to finish. Less brittle than waiting for a history to finish,
# as you could have more than one invocation in a history, or an invocation without
# steps that produce history items.

ctx.log(f"waiting for invocation {invocation_id}")

def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id))

return _wait_on_state(state_func, polling_backoff, early_termination=early_termination)


def _wait_for_job(gi, job_id, timeout=None):
def state_func():
return gi.jobs.show_job(job_id, full_details=True)

return _wait_on_state(state_func, timeout=timeout)


def _wait_on_state(state_func, polling_backoff=0, timeout=None, early_termination=True):
def _wait_on_state(state_func, polling_backoff=0, timeout=None):
def get_state():
response = state_func()
if not isinstance(response, list):
Expand All @@ -911,8 +842,6 @@ def get_state():
"cancelled",
"failed",
]
if not early_termination and current_non_terminal_states:
return None
for terminal_state in hierarchical_fail_states:
if terminal_state in current_states:
# If we got here something has failed and we can return (early)
Expand Down
21 changes: 21 additions & 0 deletions planemo/galaxy/api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""A high-level interface to local Galaxy instances using bioblend."""

import time
from io import StringIO
from typing import Optional

from bioblend.galaxy import GalaxyInstance
from requests.exceptions import RequestException

DEFAULT_ADMIN_API_KEY = "test_key"

Expand Down Expand Up @@ -136,6 +138,25 @@ def _dataset_provenance(gi, history_id, id):
return provenance


def retry_on_timeouts(ctx, gi, f):
gi.timeout = 60
try_count = 5
try:
for try_num in range(try_count):
start_time = time.time()
try:
return f(gi)
except RequestException:
end_time = time.time()
if end_time - start_time > 45 and (try_num + 1) < try_count:
ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.")
continue
else:
raise
finally:
gi.timeout = None


__all__ = (
"DEFAULT_ADMIN_API_KEY",
"gi",
Expand Down
Empty file.
62 changes: 62 additions & 0 deletions planemo/galaxy/invocations/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""API interaction for Galaxy's workflow invocation API.

Gives a mockable surface for testing, type contract consumed by Planemo,
and builtin utilities around bioblend for working around transient request
issues that have been observed in practice.
"""

from typing import (
Dict,
List,
Optional,
Protocol,
)

from typing_extensions import TypedDict

from planemo.galaxy.api import retry_on_timeouts


class InvocationStep(TypedDict, total=False):
state: Optional[str]
subworkflow_invocation_id: Optional[str]


class Invocation(TypedDict, total=False):
id: str
state: str
steps: List[InvocationStep]
history_id: Optional[str]


class InvocationJobsSummary(TypedDict, total=False):
states: Dict[str, int]


class InvocationApi(Protocol):

def get_invocation(self, invocation_id: str) -> Invocation: ...

def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary: ...


class BioblendInvocationApi(InvocationApi):

def __init__(self, ctx, user_gi):
self._ctx = ctx
self._user_gi = user_gi

def get_invocation(self, invocation_id: str) -> Invocation:
return retry_on_timeouts(self._ctx, self._user_gi, lambda gi: gi.invocations.show_invocation(invocation_id))

def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary:
return retry_on_timeouts(
self._ctx, self._user_gi, lambda gi: gi.invocations.get_invocation_summary(invocation_id)
)


def invocation_state_terminal(state: str):
return state in ["scheduled", "cancelled", "failed"]


JOB_ERROR_STATES = ["error", "deleted", "failed", "stopped", "stop", "deleting"]
Loading
Loading