Skip to content

Commit

Permalink
TES batch system prototype (DataBiosphere#3821)
Browse files Browse the repository at this point in the history
* Start on a stub TES batch system

* Fill in TES stub with code that should mostly work

* Get jobs issued but be unable to poll them because of ohsu-comp-bio/py-tes#31

* Turn on (bring back?) Python 3.8 appliance since that matches my Python

* Update to unreleased pytes that can understand an empty log

* Accept the py-tes issues we cannot change

* Change to testing py-tes that might tolerate newer dateutil

* Fix all the MyPy errors except for py-tes not having stubs

* Add hacked-up autogenerated stubs for py-tes public API

* Fix missing TypeVar

* Accept falsey responses because they are errors

* Actually import URLError

* Call test teardown and setup to fix DataBiosphere#3815

* Go find base class setup and teardown methods

* Stop showing canceled TES tasks as updated jobs

* Allow waiting and synthesize exit codes to get job running test to pass

* Add classifiers and point at py-tes 0.4.2 code

* Handle not having a prepare argument

* Don't del from a set

* Redesign batch system option parsing

* Add TES variables to docs

* Set up Funnel for the test stage that will need it

* Actually fill in base Config from the environment

* Lower logging levels

* Cache Docker images we know to exist

* Break off the Kubernetes executor for re-use

* Use released py-tes

* Don't clobber Config defaults

* Parse whatever object type is there

* Correct docstring

* Prevent batch system plugin test from permanently adding a batch system

* Tolerate new default of linkImports true whether we used argparse or not

* Limit pool size to effective CPU count

* Reorganize shared batch system options into one place

* Fix other multiprocessing pools to not swamp the system

* Clean up background server processes in a better way

* Fix some pylint errors

* Accept old names in both option setting functions

* Emit a DeprecationWarning when using an old option field

* Fix whitespace

* Don't claim to work with Python 3.9

* Don't obey Funnel environment variables, only Toil ones

* Rename batch system utility modules

* Document when --statePollingWait ought to work

* Explain why someone might --scale

* Explain how someone might --scale

* Drop toil- from job prefix

* Don't require TES server to 404

* Type the environment

* Only mount existing local paths

* Stop early when looking for runtime when there's no start_time

* Improve spelling

* Actually save --statePollingWait docs

* snake_case another batch system utility

* Complete merge with 48db82e

* Require pytz type hints which MyPy suddenly demands

* Add back type: ignore somehow lost in f8d6778

* Don't add duplicate Python 3.8 tests

* Revert "Add back type: ignore somehow lost in f8d6778"

This reverts commit e5dc8b4.

* Revert "Require pytz type hints which MyPy suddenly demands"

This reverts commit 3baa621.

* Add types-pytz again but as a dev dependency

* Make sure to report tracebacks in the worker

* Use more descriptive TypeVars

* Stop making per-TES-workflow IDs and just use jobs' self-identified names
  • Loading branch information
adamnovak authored Oct 25, 2021
1 parent 9126a4e commit 312b6e1
Show file tree
Hide file tree
Showing 40 changed files with 1,527 additions and 451 deletions.
27 changes: 27 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,33 @@ batch_systems:
script:
- pwd
- virtualenv -p ${MAIN_PYTHON_PKG} venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all] packages='htcondor awscli'
- wget https://github.com/ohsu-comp-bio/funnel/releases/download/0.10.1/funnel-linux-amd64-0.10.1.tar.gz
- tar -xvf funnel-linux-amd64-0.10.1.tar.gz funnel
- export FUNNEL_SERVER_USER=toil
- export FUNNEL_SERVER_PASSWORD=$(openssl rand -hex 256)
- |
cat >funnel.conf <<EOF
Server:
BasicAuth:
- User: ${FUNNEL_SERVER_USER}
Password: ${FUNNEL_SERVER_PASSWORD}
RPCClient:
User: ${FUNNEL_SERVER_USER}
Password: ${FUNNEL_SERVER_PASSWORD}
LocalStorage:
AllowedDirs:
- $HOME/.aws
- ./
Compute: manual
EOF
- ./funnel server run -c funnel.conf &
- ./funnel node run -c funnel.conf &
- export TOIL_TES_ENDPOINT="http://127.0.0.1:8000"
- export TOIL_TES_USER="${FUNNEL_SERVER_USER}"
- export TOIL_TES_PASSWORD="${FUNNEL_SERVER_PASSWORD}"
- make test tests=src/toil/test/batchSystems/batchSystemTest.py
- make test tests=src/toil/test/mesos/MesosDataStructuresTest.py
- kill $(jobs -p) || true

cwl_v1.0:
stage: main_tests
Expand Down Expand Up @@ -284,6 +309,8 @@ py37_appliance_build:
- python setup_gitlab_docker.py
- make push_docker

# Python 3.8 is the "main" python already and doesn't need its own tests.

# Cactus-on-Kubernetes integration (as a script and not a pytest test)
cactus_integration:
stage: integration
Expand Down
1 change: 0 additions & 1 deletion contrib/admin/mypy-with-ignore.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def main():
'src/toil/batchSystems/slurm.py',
'src/toil/batchSystems/gridengine.py',
'src/toil/batchSystems/singleMachine.py',
'src/toil/batchSystems/abstractBatchSystem.py',
'src/toil/batchSystems/parasol.py',
'src/toil/batchSystems/kubernetes.py',
'src/toil/batchSystems/torque.py',
Expand Down
Empty file.
19 changes: 19 additions & 0 deletions contrib/typeshed/tes/client.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from tes.models import CancelTaskRequest, CreateTaskResponse, GetTaskRequest, ListTasksRequest, ListTasksResponse, ServiceInfo, Task, strconv
from tes.utils import TimeoutError, raise_for_status, unmarshal
from typing import Any, Optional, Union

def process_url(value: str) -> str: ...

class HTTPClient:
url: str = ...
timeout: int = ...
user: Optional[str] = ...
password: Optional[str] = ...
token: Optional[str] = ...
def get_service_info(self) -> ServiceInfo: ...
def create_task(self, task: Task) -> CreateTaskResponse: ...
def get_task(self, task_id: str, view: str = ...) -> Task: ...
def cancel_task(self, task_id: str) -> None: ...
def list_tasks(self, view: str = ..., page_size: Optional[int] = ..., page_token: Optional[str] = ...) -> ListTasksResponse: ...
def wait(self, task_id: str, timeout: Union[int, float, None] = ...) -> None: ...
def __init__(self, url: str, timeout: Optional[int], user: Optional[str], password: Optional[str], token: Optional[str]) -> None: ...
118 changes: 118 additions & 0 deletions contrib/typeshed/tes/models.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from typing import Any

class Base:
def as_dict(self, drop_empty: bool = ...) -> Dict[str, Any]: ...
def as_json(self, drop_empty: bool = ...) -> str: ...
def __init__(self) -> None: ...

class Input(Base):
url: Optional[str] = ...
path: Optional[str] = ...
type: str = ...
name: Optional[str] = ...
description: Optional[str] = ...
content: Any = ...
def __init__(self, url: Optional[str], path: Optional[str], type: Optional[str], name: Optional[str], description: Optional[str], content: Any) -> None: ...

class Output(Base):
url: Optional[str] = ...
path: Optional[str] = ...
type: str = ...
name: Optional[str] = ...
description: Optional[str] = ...
def __init__(self, url: Optional[str], path: Optional[str], type: Optional[str], name: Optional[str], description: Optional[str]) -> None: ...

class Resources(Base):
cpu_cores: int = ...
ram_gb: Union[float, int, None] = ...
disk_gb: Union[float, int, None] = ...
preemptible: Optional[bool] = ...
zones: Optional[List[str]] = ...
def __init__(self, cpu_cores: int, ram_gb: Union[float, int, None], disk_gb: Union[float, int, None], preemptible: Optional[bool], zones: Optional[List[str]]) -> None: ...

class Executor(Base):
image: str = ...
command: str = ...
workdir: Optional[str] = ...
stdin: Optional[str] = ...
stdout: Optional[str] = ...
stderr: Optional[str] = ...
env: Optional[Dict[str, str]] = ...
def __init__(self, image: str, command: str, workdir: Optional[str], stdin: Optional[str], stdout: Optional[str], stderr: Optional[str], env: Optional[Dict[str, str]]) -> None: ...

class ExecutorLog(Base):
start_time: Optional[datetime] = ...
end_time: Optional[datetime] = ...
stdout: Optional[str] = ...
stderr: Optional[str] = ...
exit_code: Optional[int] = ...
def __init__(self, start_time: Optional[datetime], end_time: Optional[datetime], stdout: Optional[str], stderr: Optional[str], exit_code: Optional[int]) -> None: ...

class OutputFileLog(Base):
url: Optional[str] = ...
path: Optional[str] = ...
size_bytes: Optional[int] = ...
def __init__(self, url: Optional[str], path: Optional[str], size_bytes: Optional[int]) -> None: ...

class TaskLog(Base):
start_time: Optional[datetime] = ...
end_time: Optional[datetime] = ...
metadata: Optional[Dict[str, Any]] = ...
logs: Optional[List[ExecutorLog]] = ...
outputs: Optional[List[OutputFileLog]] = ...
system_logs: Optional[List[str]] = ...
def __init__(self, start_time: Optional[datetime], end_time: Optional[datetime], metadata: Optional[Dict[str, Any]], logs: Optional[List[ExecutorLog]], outputs: Optional[List[OutputFileLog]], system_logs: Optional[List[str]]) -> None: ...

class Task(Base):
id: Optional[str] = ...
state: Optional[str] = ...
name: Optional[str] = ...
description: Optional[str] = ...
inputs: Optional[List[Input]] = ...
outputs: Optional[List[Output]] = ...
resources: Optional[Resources] = ...
executors: Optional[List[Executor]] = ...
volumes: Optional[List[str]] = ...
tags: Optional[List[str]] = ...
logs: Optional[List[TaskLog]] = ...
creation_time: Optional[datetime] = ...
def is_valid(self) -> Tuple[bool, Optional[BaseException]]: ...
def __init__(self, id: Optional[str], state: Optional[str], name: Optional[str], description: Optional[str], inputs: Optional[List[Input]], outputs: Optional[List[Output]], resources: Optional[Resources], executors: Optional[List[Executor]], volumes: Optional[List[str]], tags: Optional[List[str]], logs: Optional[List[TaskLog]], creation_time: Optional[datetime]) -> None: ...

class GetTaskRequest(Base):
id: Optional[str] = ...
view: Optional[str] = ...
def __init__(self, id: Optional[str], view: Optional[str]) -> None: ...

class CreateTaskResponse(Base):
id: Optional[str] = ...
def __init__(self, id: Optional[str]) -> None: ...

class ServiceInfoRequest(Base):
def __init__(self) -> None: ...

class ServiceInfo(Base):
name: Optional[str] = ...
doc: Optional[str] = ...
storage: Optional[List[str]] = ...
def __init__(self, name: Optional[str], doc: Optional[str], storage: Optional[List[str]]) -> None: ...

class CancelTaskRequest(Base):
id: Optional[str] = ...
def __init__(self, id: Optional[str]) -> None: ...

class CancelTaskResponse(Base):
def __init__(self) -> None: ...

class ListTasksRequest(Base):
project: Optional[str] = ...
name_prefix: Optional[str] = ...
page_size: Optional[int] = ...
page_token: Optional[str] = ...
view: Optional[str] = ...
def __init__(self, project: Optional[str], name_prefix: Optional[str], page_size: Optional[int], page_token: Optional[str], view: Optional[str]) -> None: ...

class ListTasksResponse(Base):
tasks: Optional[List[Task]] = ...
next_page_token: Optional[str] = ...
def __init__(self, tasks: Optional[List[Task]], next_page_token: Optional[str]) -> None: ...
11 changes: 11 additions & 0 deletions docs/appendices/environment_vars.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ There are several environment variables that affect the way Toil runs.
| | instead of polling for running jobs. Default |
| | value is set to False. |
+----------------------------------+----------------------------------------------------+
| TOIL_TES_ENDPOINT | URL to the TES server to run against when using |
| | the ``tes`` batch system. |
+----------------------------------+----------------------------------------------------+
| TOIL_TES_USER | Username to use with HTTP Basic Authentication to |
| | log into the TES server. |
+----------------------------------+----------------------------------------------------+
| TOIL_TES_PASSWORD | Password to use with HTTP Basic Authentication to |
| | log into the TES server. |
+----------------------------------+----------------------------------------------------+
| TOIL_TES_BEARER_TOKEN | Token to use to authenticate to the TES server. |
+----------------------------------+----------------------------------------------------+
| TOIL_APPLIANCE_SELF | The fully qualified reference for the Toil |
| | Appliance you wish to use, in the form |
| | ``REPO/IMAGE:TAG``. |
Expand Down
42 changes: 29 additions & 13 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,11 @@ the logging module:

--batchSystem BATCHSYSTEM
The type of batch system to run the job(s) with,
currently can be one of lsf, Mesos, slurm, torque,
htcondor, single_machine, parasol, grid_engine', kubernetes.
currently can be one of lsf, mesos, slurm, torque,
htcondor, single_machine, parasol, grid_engine,
kubernetes, tes.
(default: single_machine)

--parasolCommand PARASOLCOMMAND
The name or path of the parasol program. Will be
looked up on PATH unless it starts with a
Expand All @@ -128,19 +130,31 @@ the logging module:
Maximum number of job batches the Parasol batch is
allowed to create. One batch is created for jobs with
a unique set of resource requirements. (default: 1000)

--mesosEndpoint MESOSENDPOINT
The host and port of the Mesos server separated by a
colon. (default: <leader IP>:5050)

--tesEndpoint TES_ENDPOINT
The http(s) URL of the TES server.
(default: http://<leader IP>:8000)
--tesUser TES_USER User name to use for basic authentication to TES server.
--tesPassword TES_PASSWORD
Password to use for basic authentication to TES server.
--tesBearerToken TES_BEARER_TOKEN
Bearer token to use for authentication to TES server.

--scale SCALE A scaling factor to change the value of all submitted
tasks' submitted cores. Used in singleMachine batch
system. (default: 1)
system. Useful for running workflows on smaller
machines than they were designed for, by setting a
value less than 1. (default: 1)
--linkImports When using Toil's importFile function for staging,
input files are copied to the job store. Specifying
this option saves space by sym-linking imported files.
As long as caching is enabled Toil will protect the
file automatically by changing the permissions to
read-only.
--mesosMaster MESOSMASTERADDRESS
The host and port of the Mesos master separated by a
colon. (default: 169.233.147.202:5050)

--coalesceStatusCalls Coalese status calls to prevent the batch system from
being overloaded. Currently only supported for LSF.

Expand All @@ -151,10 +165,10 @@ the logging module:
currently supported choices are 'aws' or 'gce'. The
default is None.
--nodeTypes NODETYPES
Specifies a list of comma-separated node types, each of which is
composed of slash-separated instance types, and an optional spot
bid set off by a colon, making the node type preemptable. Instance
types may appear in multiple node types, and the same node type
Specifies a list of comma-separated node types, each of which is
composed of slash-separated instance types, and an optional spot
bid set off by a colon, making the node type preemptable. Instance
types may appear in multiple node types, and the same node type
may appear as both preemptable and non-preemptable.
Valid argument specifying two node types:
c5.4xlarge/c5a.4xlarge:0.42,t2.large
Expand Down Expand Up @@ -265,7 +279,9 @@ the logging module:
--statePollingWait STATEPOLLINGWAIT
Time, in seconds, to wait before doing a scheduler
query for job state. Return cached results if within
the waiting period.
the waiting period. Only works for grid engine batch
systems such as gridengine, htcondor, torque, slurm,
and lsf.

**Miscellaneous Options**

Expand Down Expand Up @@ -295,7 +311,7 @@ the logging module:
--writeLogsGzip FILEPATH
Identical to -\\-writeLogs except the logs files are
gzipped on the leader.
--realTimeLogging Enable real-time logging from workers to masters.
--realTimeLogging Enable real-time logging from workers to leader.
--sseKey SSEKEY Path to file containing 32 character key to be used
for server-side encryption on awsJobStore or
googleJobStore. SSE will not be used if this flag is
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mypy==0.910
types-requests
types-setuptools
types-boto
types-pytz
flake8>=3.8.4,<5
flake8-bugbear>=20.11.1,<21
black
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ requests>=2, <3
docker>=3.7.2, <6
python-dateutil
psutil >= 3.0.1, <6
py-tes>=0.4.2,<1
addict>=2.2.1, <2.5
pytz>=2012
enlighten>=1.5.2, <2
Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def run_setup():
'Operating System :: POSIX',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Scientific/Engineering',
'Topic :: Scientific/Engineering :: Bio-Informatics',
'Topic :: Scientific/Engineering :: Astronomy',
Expand Down Expand Up @@ -111,7 +113,7 @@ def run_setup():
'toil-cwl-runner = toil.cwl.cwltoil:main [cwl]',
'toil-wdl-runner = toil.wdl.toilwdl:main',
'_toil_mesos_executor = toil.batchSystems.mesos.executor:main [mesos]',
'_toil_kubernetes_executor = toil.batchSystems.kubernetes:executor [kubernetes]']})
'_toil_contained_executor = toil.batchSystems.contained_executor:executor']})


def import_version():
Expand Down
15 changes: 15 additions & 0 deletions src/toil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ def __init__(self, origAppliance, url, statusCode):
"" % (origAppliance, url, str(statusCode)))
super().__init__(msg)

# Cache images we know exist so we don't have to ask the registry about them
# all the time.
KNOWN_EXTANT_IMAGES = set()

def requestCheckRegularDocker(origAppliance, registryName, imageName, tag):
"""
Expand All @@ -379,12 +382,18 @@ def requestCheckRegularDocker(origAppliance, registryName, imageName, tag):
:param str tag: The tag used at that docker image's registry. e.g. "latest"
:return: Return True if match found. Raise otherwise.
"""

if origAppliance in KNOWN_EXTANT_IMAGES:
# Check the cache first
return origAppliance

ioURL = 'https://{webhost}/v2/{pathName}/manifests/{tag}' \
''.format(webhost=registryName, pathName=imageName, tag=tag)
response = requests.head(ioURL)
if not response.ok:
raise ApplianceImageNotFound(origAppliance, ioURL, response.status_code)
else:
KNOWN_EXTANT_IMAGES.add(origAppliance)
return origAppliance


Expand All @@ -400,6 +409,11 @@ def requestCheckDockerIo(origAppliance, imageName, tag):
:param str tag: The tag used at that docker image's registry. e.g. "latest"
:return: Return True if match found. Raise otherwise.
"""

if origAppliance in KNOWN_EXTANT_IMAGES:
# Check the cache first
return origAppliance

# only official images like 'busybox' or 'ubuntu'
if '/' not in imageName:
imageName = 'library/' + imageName
Expand All @@ -415,6 +429,7 @@ def requestCheckDockerIo(origAppliance, imageName, tag):
if not response.ok:
raise ApplianceImageNotFound(origAppliance, requests_url, response.status_code)
else:
KNOWN_EXTANT_IMAGES.add(origAppliance)
return origAppliance


Expand Down
Loading

0 comments on commit 312b6e1

Please sign in to comment.