diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4aff012b58..98967f2f5f 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -109,7 +109,9 @@ quick_test_offline: py38_appliance_build: rules: - - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG || $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH + - if: $CI_PIPELINE_SOURCE == "schedule" + - if: $CI_COMMIT_TAG + - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH stage: basic_tests script: - pwd @@ -120,7 +122,9 @@ py38_appliance_build: py39_appliance_build: rules: - - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG || $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH + - if: $CI_PIPELINE_SOURCE == "schedule" + - if: $CI_COMMIT_TAG + - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH stage: basic_tests script: - pwd @@ -131,7 +135,9 @@ py39_appliance_build: py310_appliance_build: rules: - - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG || $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH + - if: $CI_PIPELINE_SOURCE == "schedule" + - if: $CI_COMMIT_TAG + - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH stage: basic_tests script: - pwd @@ -166,36 +172,11 @@ batch_systems: script: - pwd - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all] packages='htcondor==10.2.3' - - wget https://github.com/ohsu-comp-bio/funnel/releases/download/0.10.1/funnel-linux-amd64-0.10.1.tar.gz - - tar -xvf funnel-linux-amd64-0.10.1.tar.gz funnel - - export FUNNEL_SERVER_USER=toil - - export FUNNEL_SERVER_PASSWORD=$(openssl rand -hex 256) - - | - cat >funnel.conf <:8000) - --tesUser TES_USER User name to use for basic authentication to TES server. - --tesPassword TES_PASSWORD - Password to use for basic authentication to TES server. - --tesBearerToken TES_BEARER_TOKEN - Bearer token to use for authentication to TES server. --awsBatchRegion AWS_BATCH_REGION The AWS region containing the AWS Batch queue to submit to. diff --git a/requirements-cwl.txt b/requirements-cwl.txt index d978e83b16..0068351039 100644 --- a/requirements-cwl.txt +++ b/requirements-cwl.txt @@ -4,5 +4,5 @@ galaxy-tool-util<23 galaxy-util<23 ruamel.yaml>=0.15,<=0.18.3 ruamel.yaml.clib>=0.2.6 -networkx>=2,<2.8.9 +networkx!=2.8.1,<4 CacheControl[filecache] diff --git a/requirements-wdl.txt b/requirements-wdl.txt index 3ca204aaf6..314ee398b4 100644 --- a/requirements-wdl.txt +++ b/requirements-wdl.txt @@ -1,3 +1,3 @@ -miniwdl==1.10.0 +miniwdl==1.11.1 wdlparse==0.1.0 graphlib-backport==1.0 ; python_version < '3.9' diff --git a/requirements.txt b/requirements.txt index e79fe7ebcc..568f1ce1e4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,6 @@ docker>=3.7.2, <7 urllib3>=1.26.0,<3 python-dateutil psutil >= 3.0.1, <6 -py-tes>=0.4.2,<1 PyPubSub >=4.0.3, <5 addict>=2.2.1, <2.5 pytz>=2012 diff --git a/setup.cfg b/setup.cfg index 8b4ba740ca..f603a662d0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,6 @@ markers = slow slurm singularity - tes torque wes_server cwl_small_log_dir @@ -68,7 +67,3 @@ no_warn_no_return = True [mypy-toil.cwl.*] strict = True - -[mypy-tes] -ignore_errors = True -follow_imports=skip diff --git a/src/toil/batchSystems/contained_executor.py b/src/toil/batchSystems/contained_executor.py index b46ecefabf..699f78e332 100644 --- a/src/toil/batchSystems/contained_executor.py +++ b/src/toil/batchSystems/contained_executor.py @@ -14,7 +14,7 @@ """ Executor for running inside a container. -Useful for Kubernetes and TES batch systems. +Useful for Kubernetes batch system and TES batch system plugin. """ import base64 import logging diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index f028a64da9..db9c07c097 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -21,8 +21,8 @@ else: from typing_extensions import Protocol -from toil.batchSystems.registry import (BATCH_SYSTEM_FACTORY_REGISTRY, - BATCH_SYSTEMS, +from toil.batchSystems.registry import (get_batch_system, + get_batch_systems, DEFAULT_BATCH_SYSTEM) from toil.lib.threading import cpu_count @@ -55,14 +55,14 @@ def set_batchsystem_options(batch_system: Optional[str], set_option: OptionSette """ if batch_system is not None: # Use this batch system - batch_system_type = BATCH_SYSTEM_FACTORY_REGISTRY[batch_system]() + batch_system_type = get_batch_system(batch_system) batch_system_type.setOptions(set_option) else: - for factory in BATCH_SYSTEM_FACTORY_REGISTRY.values(): + for name in get_batch_systems(): # All the batch systems are responsible for setting their own options # with their setOptions() class methods. try: - batch_system_type = factory() + batch_system_type = get_batch_system(name) except ImportError: # Skip anything we can't import continue @@ -86,9 +86,9 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - "--batchSystem", dest="batchSystem", default=DEFAULT_BATCH_SYSTEM, - choices=BATCH_SYSTEMS, + choices=get_batch_systems(), help=f"The type of batch system to run the job(s) with, currently can be one " - f"of {', '.join(BATCH_SYSTEMS)}. default={DEFAULT_BATCH_SYSTEM}", + f"of {', '.join(get_batch_systems())}. default={DEFAULT_BATCH_SYSTEM}", ) parser.add_argument( "--disableHotDeployment", @@ -175,14 +175,14 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - "systems such as gridengine, htcondor, torque, slurm, and lsf." ) - for factory in BATCH_SYSTEM_FACTORY_REGISTRY.values(): + for name in get_batch_systems(): # All the batch systems are responsible for adding their own options # with the add_options class method. try: - batch_system_type = factory() + batch_system_type = get_batch_system(name) except ImportError: # Skip anything we can't import continue # Ask the batch system to create its options in the parser - logger.debug('Add options for %s', batch_system_type) - batch_system_type.add_options(parser) \ No newline at end of file + logger.debug('Add options for %s batch system', name) + batch_system_type.add_options(parser) diff --git a/src/toil/batchSystems/registry.py b/src/toil/batchSystems/registry.py index 848c6d6166..ce4f1f36a9 100644 --- a/src/toil/batchSystems/registry.py +++ b/src/toil/batchSystems/registry.py @@ -12,14 +12,58 @@ # See the License for the specific language governing permissions and # limitations under the License. +import importlib +import pkgutil import logging -from typing import TYPE_CHECKING, Callable, Dict, List, Tuple, Type +import warnings +from typing import TYPE_CHECKING, Callable, Dict, List, Sequence, Tuple, Type + +from toil.lib.compatibility import deprecated +from toil.lib.memoize import memoize if TYPE_CHECKING: from toil.batchSystems.abstractBatchSystem import AbstractBatchSystem logger = logging.getLogger(__name__) +##### +# Plugin system/API +##### + +def add_batch_system_factory(key: str, class_factory: Callable[[], Type['AbstractBatchSystem']]): + """ + Adds a batch system to the registry for workflow or plugin-supplied batch systems. + + :param class_factory: A function that returns a batch system class (NOT an instance), which implements :class:`toil.batchSystems.abstractBatchSystem.AbstractBatchSystem`. + """ + _registry_keys.append(key) + _registry[key] = class_factory + +def get_batch_systems() -> Sequence[str]: + """ + Get the names of all the availsble batch systems. + """ + _load_all_plugins() + + return _registry_keys + +def get_batch_system(key: str) -> Type['AbstractBatchSystem']: + """ + Get a batch system class by name. + + :raises: KeyError if the key is not the name of a batch system, and + ImportError if the batch system's class cannot be loaded. + """ + + return _registry[key]() + + +DEFAULT_BATCH_SYSTEM = 'single_machine' + +##### +# Built-in batch systems +##### + def aws_batch_batch_system_factory(): from toil.batchSystems.awsBatch import AWSBatchBatchSystem return AWSBatchBatchSystem @@ -53,10 +97,6 @@ def slurm_batch_system_factory(): from toil.batchSystems.slurm import SlurmBatchSystem return SlurmBatchSystem -def tes_batch_system_factory(): - from toil.batchSystems.tes import TESBatchSystem - return TESBatchSystem - def torque_batch_system_factory(): from toil.batchSystems.torque import TorqueBatchSystem return TorqueBatchSystem @@ -71,8 +111,11 @@ def kubernetes_batch_system_factory(): from toil.batchSystems.kubernetes import KubernetesBatchSystem return KubernetesBatchSystem +##### +# Registry implementation +##### -BATCH_SYSTEM_FACTORY_REGISTRY: Dict[str, Callable[[], Type["AbstractBatchSystem"]]] = { +_registry: Dict[str, Callable[[], Type["AbstractBatchSystem"]]] = { 'aws_batch' : aws_batch_batch_system_factory, 'parasol' : parasol_batch_system_factory, 'single_machine' : single_machine_batch_system_factory, @@ -80,20 +123,64 @@ def kubernetes_batch_system_factory(): 'lsf' : lsf_batch_system_factory, 'mesos' : mesos_batch_system_factory, 'slurm' : slurm_batch_system_factory, - 'tes' : tes_batch_system_factory, 'torque' : torque_batch_system_factory, 'htcondor' : htcondor_batch_system_factory, 'kubernetes' : kubernetes_batch_system_factory } -BATCH_SYSTEMS = list(BATCH_SYSTEM_FACTORY_REGISTRY.keys()) -DEFAULT_BATCH_SYSTEM = 'single_machine' +_registry_keys = list(_registry.keys()) + +# We will load any packages starting with this prefix and let them call +# add_batch_system_factory() +_PLUGIN_NAME_PREFIX = "toil_batch_system_" + +@memoize +def _load_all_plugins() -> None: + """ + Load all the batch system plugins that are installed. + """ + + for finder, name, is_pkg in pkgutil.iter_modules(): + # For all installed packages + if name.startswith(_PLUGIN_NAME_PREFIX): + # If it is a Toil batch system plugin, import it + importlib.import_module(name) + +##### +# Deprecated API +##### +# We used to directly access these constants, but now the Right Way to use this +# module is add_batch_system_factory() to register and get_batch_systems() to +# get the list/get_batch_system() to get a class by name. + + +def __getattr__(name): + """ + Implement a fallback attribute getter to handle deprecated constants. + + See . + """ + if name == "BATCH_SYSTEM_FACTORY_REGISTRY": + warnings.warn("BATCH_SYSTEM_FACTORY_REGISTRY is deprecated; use get_batch_system() or add_batch_system_factory()", DeprecationWarning) + return _registry + elif name == "BATCH_SYSTEMS": + warnings.warn("BATCH_SYSTEMS is deprecated; use get_batch_systems()", DeprecationWarning) + return _registry_keys + else: + raise AttributeError(f"Module {__name__} ahs no attribute {name}") + + +@deprecated(new_function_name="add_batch_system_factory") def addBatchSystemFactory(key: str, batchSystemFactory: Callable[[], Type['AbstractBatchSystem']]): """ - Adds a batch system to the registry for workflow-supplied batch systems. + Deprecated method to add a batch system. """ - BATCH_SYSTEMS.append(key) - BATCH_SYSTEM_FACTORY_REGISTRY[key] = batchSystemFactory + return add_batch_system_factory(key, batchSystemFactory) + + +##### +# Testing utilities +##### # We need a snapshot save/restore system for testing. We can't just tamper with # the globals because module-level globals are their own references, so we @@ -106,7 +193,7 @@ def save_batch_system_plugin_state() -> Tuple[List[str], Dict[str, Callable[[], tests. """ - snapshot = (list(BATCH_SYSTEMS), dict(BATCH_SYSTEM_FACTORY_REGISTRY)) + snapshot = (list(_registry_keys), dict(_registry)) return snapshot def restore_batch_system_plugin_state(snapshot: Tuple[List[str], Dict[str, Callable[[], Type['AbstractBatchSystem']]]]): @@ -118,7 +205,7 @@ def restore_batch_system_plugin_state(snapshot: Tuple[List[str], Dict[str, Calla # We need to apply the snapshot without rebinding the names, because that # won't affect modules that imported the names. wanted_batch_systems, wanted_registry = snapshot - BATCH_SYSTEMS.clear() - BATCH_SYSTEMS.extend(wanted_batch_systems) - BATCH_SYSTEM_FACTORY_REGISTRY.clear() - BATCH_SYSTEM_FACTORY_REGISTRY.update(wanted_registry) + _registry_keys.clear() + _registry_keys.extend(wanted_batch_systems) + _registry.clear() + _registry.update(wanted_registry) diff --git a/src/toil/batchSystems/tes.py b/src/toil/batchSystems/tes.py deleted file mode 100644 index 90a155e017..0000000000 --- a/src/toil/batchSystems/tes.py +++ /dev/null @@ -1,462 +0,0 @@ -# Copyright (C) 2015-2021 Regents of the University of California -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Batch system for running Toil workflows on GA4GH TES. - -Useful with network-based job stores when the TES server provides tasks with -credentials, and filesystem-based job stores when the TES server lets tasks -mount the job store. - -Additional containers should be launched with Singularity, not Docker. -""" -import datetime -import logging -import math -import os -import pickle -import time -from argparse import ArgumentParser, _ArgumentGroup -from typing import Any, Callable, Dict, List, Optional, Union - -import tes -from requests.exceptions import HTTPError - -from toil import applianceSelf -from toil.batchSystems.abstractBatchSystem import (EXIT_STATUS_UNAVAILABLE_VALUE, - BatchJobExitReason, - UpdatedBatchJobInfo) -from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport -from toil.batchSystems.contained_executor import pack_job -from toil.batchSystems.options import OptionSetter -from toil.common import Config, Toil -from toil.job import JobDescription -from toil.lib.misc import get_public_ip, slow_down, utc_now -from toil.resource import Resource - -logger = logging.getLogger(__name__) - - -# Map from TES terminal states to Toil batch job exit reasons -STATE_TO_EXIT_REASON: Dict[str, BatchJobExitReason] = { - 'COMPLETE': BatchJobExitReason.FINISHED, - 'CANCELED': BatchJobExitReason.KILLED, - 'EXECUTOR_ERROR': BatchJobExitReason.FAILED, - 'SYSTEM_ERROR': BatchJobExitReason.ERROR, - 'UNKNOWN': BatchJobExitReason.ERROR -} - - -class TESBatchSystem(BatchSystemCleanupSupport): - @classmethod - def supportsAutoDeployment(cls) -> bool: - return True - - @classmethod - def get_default_tes_endpoint(cls) -> str: - """ - Get the default TES endpoint URL to use. - - (unless overridden by an option or environment variable) - """ - return f'http://{get_public_ip()}:8000' - - def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int) -> None: - super().__init__(config, maxCores, maxMemory, maxDisk) - # Connect to TES, using Funnel-compatible environment variables to fill in credentials if not specified. - tes_endpoint = config.tes_endpoint or self.get_default_tes_endpoint() - self.tes = tes.HTTPClient(tes_endpoint, - user=config.tes_user, - password=config.tes_password, - token=config.tes_bearer_token) - - # Get service info from the TES server and pull out supported storages. - # We need this so we can tell if the server is likely to be able to - # mount any of our local files. These are URL bases that the server - # supports. - server_info = self.tes.get_service_info() - logger.debug("Detected TES server info: %s", server_info) - self.server_storages = server_info.storage or [] - - # Define directories to mount for each task, as py-tes Input objects - self.mounts: List[tes.Input] = [] - - if config.jobStore: - job_store_type, job_store_path = Toil.parseLocator(config.jobStore) - if job_store_type == 'file': - # If we have a file job store, we want to mount it at the same path, if we can - self._mount_local_path_if_possible(job_store_path, job_store_path) - - # If we have AWS credentials, we want to mount them in our home directory if we can. - aws_credentials_path = os.path.join(os.path.expanduser("~"), '.aws') - if os.path.isdir(aws_credentials_path): - self._mount_local_path_if_possible(aws_credentials_path, '/root/.aws') - - # We assign job names based on a numerical job ID. This functionality - # is managed by the BatchSystemLocalSupport. - - # Here is where we will store the user script resource object if we get one. - self.user_script: Optional[Resource] = None - - # Ge the image to deploy from Toil's configuration - self.docker_image = applianceSelf() - - # We need a way to map between our batch system ID numbers, and TES task IDs from the server. - self.bs_id_to_tes_id: Dict[int, str] = {} - self.tes_id_to_bs_id: Dict[str, int] = {} - - def _server_can_mount(self, url: str) -> bool: - """ - Internal function. Should not be called outside this class. - - Return true if the given URL is under a supported storage location for - the TES server, and false otherwise. - """ - # TODO: build some kind of fast matcher in case there are a lot of - # storages supported. - - for base_url in self.server_storages: - if url.startswith(base_url): - return True - return False - - def _mount_local_path_if_possible(self, local_path: str, container_path: str) -> None: - """ - Internal function. Should not be called outside this class. - - If a local path is somewhere the server thinks it can access, mount it - into all the tasks. - """ - # TODO: We aren't going to work well with linked imports if we're mounting the job store into the container... - - path_url = 'file://' + os.path.abspath(local_path) - if os.path.exists(local_path) and self._server_can_mount(path_url): - # We can access this file from the server. Probably. - self.mounts.append(tes.Input(url=path_url, - path=container_path, - type="DIRECTORY" if os.path.isdir(local_path) else "FILE")) - - def setUserScript(self, user_script: Resource) -> None: - logger.debug(f'Setting user script for deployment: {user_script}') - self.user_script = user_script - - # setEnv is provided by BatchSystemSupport, updates self.environment - - def issueBatchJob(self, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int: - # TODO: get a sensible self.maxCores, etc. so we can check_resource_request. - # How do we know if the cluster will autoscale? - - # Try the job as local - local_id = self.handleLocalJob(job_desc) - if local_id is not None: - # It is a local job - return local_id - else: - # We actually want to send to the cluster - - # Check resource requirements (managed by BatchSystemSupport) - self.check_resource_request(job_desc) - - # Make a batch system scope job ID - bs_id = self.getNextJobID() - # Make a vaguely human-readable name. - # TES does not require it to be unique. - # We could add a per-workflow prefix to use with ListTasks, but - # ListTasks doesn't let us filter for newly done tasks, so it's not - # actually useful for us over polling each task. - job_name = str(job_desc) - - # Launch the job on TES - - # Determine job environment - environment = self.environment.copy() - if job_environment: - environment.update(job_environment) - if 'TOIL_WORKDIR' not in environment: - # The appliance container defaults TOIL_WORKDIR to - # /var/lib/toil, but TES doesn't (always?) give us a writable - # /, so we need to use the writable space in /tmp by default - # instead when running on TES. - environment['TOIL_WORKDIR'] = '/tmp' - - # Make a command to run it in the executor - command_list = pack_job(job_desc, self.user_script) - - # Make the sequence of TES containers ("executors") to run. - # We just run one which is the Toil executor to grab the user - # script and do the job. - task_executors = [tes.Executor(image=self.docker_image, - command=command_list, - env=environment - )] - - # Prepare inputs. - task_inputs = list(self.mounts) - # If we had any per-job input files they would come in here. - - # Prepare resource requirements - task_resources = tes.Resources(cpu_cores=math.ceil(job_desc.cores), - ram_gb=job_desc.memory / (1024**3), - disk_gb=job_desc.disk / (1024**3), - # TODO: py-tes spells this differently than Toil - preemptible=job_desc.preemptible) - - # Package into a TES Task - task = tes.Task(name=job_name, - executors=task_executors, - inputs=task_inputs, - resources=task_resources) - - # Launch it and get back the TES ID that we can use to poll the task - tes_id = self.tes.create_task(task) - - # Tie it to the numeric ID - self.bs_id_to_tes_id[bs_id] = tes_id - self.tes_id_to_bs_id[tes_id] = bs_id - - logger.debug('Launched job: %s', job_name) - - return bs_id - - def _get_runtime(self, task: tes.Task) -> Optional[float]: - """ - Internal function. Should not be called outside this class. - - Get the time that the given job ran/has been running for, in seconds, - or None if that time is not available. Never returns 0. - """ - start_time = None - end_time = utc_now() - for log in (task.logs or []): - if log.start_time: - # Find the first start time that is set - start_time = log.start_time - break - - if not start_time: - # It hasn't been running for a measurable amount of time. - return None - - for log in reversed(task.logs or []): - if log.end_time: - # Find the last end time that is set, and override now - end_time = log.end_time - break - # We have a set start time, so it is/was running. Return the time - # it has been running for. - return slow_down((end_time - start_time).total_seconds()) - - def _get_exit_code(self, task: tes.Task) -> int: - """ - Internal function. Should not be called outside this class. - - Get the exit code of the last executor with a log in the task, or - EXIT_STATUS_UNAVAILABLE_VALUE if no executor has a log. - """ - for task_log in reversed(task.logs or []): - for executor_log in reversed(task_log.logs or []): - if isinstance(executor_log.exit_code, int): - # Find the last executor exit code that is a number and return it - return executor_log.exit_code - - if task.state == 'COMPLETE': - # If the task completes without error but has no code logged, the - # code must be 0. - return 0 - - # If we get here we couldn't find an exit code. - return EXIT_STATUS_UNAVAILABLE_VALUE - - def __get_log_text(self, task: tes.Task) -> Optional[str]: - """ - Get the log text (standard error) of the last executor with a log in - the task, or None. - """ - - for task_log in reversed(task.logs or []): - for executor_log in reversed(task_log.logs or []): - if isinstance(executor_log.stderr, str): - # Find the last executor log code that is a string and return it - return executor_log.stderr - - # If we get here we couldn't find a log. - return None - - def getUpdatedBatchJob(self, maxWait: int) -> Optional[UpdatedBatchJobInfo]: - # Remember when we started, for respecting the timeout - entry = datetime.datetime.now() - # This is the updated job we have found, if any - result = None - while result is None and ((datetime.datetime.now() - entry).total_seconds() < maxWait or not maxWait): - result = self.getUpdatedLocalJob(0) - - if result: - return result - - # Collect together the list of TES and batch system IDs for tasks we - # are acknowledging and don't care about anymore. - acknowledged = [] - - for tes_id, bs_id in self.tes_id_to_bs_id.items(): - # Immediately poll all the jobs we issued. - # TODO: There's no way to acknowledge a finished job, so there's no - # faster way to find the newly finished jobs than polling - task = self.tes.get_task(tes_id, view="MINIMAL") - if task.state in ["COMPLETE", "CANCELED", "EXECUTOR_ERROR", "SYSTEM_ERROR"]: - # This task is done! - logger.debug("Found stopped task: %s", task) - - # Acknowledge it - acknowledged.append((tes_id, bs_id)) - - if task.state == "CANCELED": - # Killed jobs aren't allowed to appear as updated. - continue - - # Otherwise, it stopped running and it wasn't our fault. - - # Fetch the task's full info, including logs. - task = self.tes.get_task(tes_id, view="FULL") - - # Record runtime - runtime = self._get_runtime(task) - - # Determine if it succeeded - exit_reason = STATE_TO_EXIT_REASON[task.state] - - # Get its exit code - exit_code = self._get_exit_code(task) - - if task.state == "EXECUTOR_ERROR": - # The task failed, so report executor logs. - logger.warning('Log from failed executor: %s', self.__get_log_text(task)) - - # Compose a result - result = UpdatedBatchJobInfo(jobID=bs_id, exitStatus=exit_code, wallTime=runtime, exitReason=exit_reason) - - # No more iteration needed, we found a result. - break - - # After the iteration, drop all the records for tasks we acknowledged - for (tes_id, bs_id) in acknowledged: - del self.tes_id_to_bs_id[tes_id] - del self.bs_id_to_tes_id[bs_id] - - if not maxWait: - # Don't wait at all - break - elif result is None: - # Wait a bit and poll again - time.sleep(min(maxWait/2, 1.0)) - - # When we get here we have all the result we can get - return result - - def shutdown(self) -> None: - - # Shutdown local processes first - self.shutdownLocal() - - for tes_id in self.tes_id_to_bs_id.keys(): - # Shut down all the TES jobs we issued. - self._try_cancel(tes_id) - - def _try_cancel(self, tes_id: str) -> None: - """ - Internal function. Should not be called outside this class. - - Try to cancel a TES job. - - Succeed if it can't be canceled because it has stopped, - but fail if it can't be canceled for some other reason. - """ - try: - # Kill each of our tasks in TES - self.tes.cancel_task(tes_id) - except HTTPError as e: - if e.response is not None and e.response.status_code in [409, 500]: - # TODO: This is what we probably get when trying to cancel - # something that is actually done. But can we rely on that? - pass - elif '500' in str(e) or '409' in str(e): - # TODO: drop this after merges. - # py-tes might be hiding the actual code and just putting it in a string - pass - else: - raise - - def getIssuedBatchJobIDs(self) -> List[int]: - return self.getIssuedLocalJobIDs() + list(self.bs_id_to_tes_id.keys()) - - def getRunningBatchJobIDs(self) -> Dict[int, float]: - # We need a dict from job_id (integer) to seconds it has been running - bs_id_to_runtime = {} - - for tes_id, bs_id in self.tes_id_to_bs_id.items(): - # Poll every issued task, and get the runtime info right away in - # the default BASIC view. - # TODO: use list_tasks filtering by name prefix and running state! - task = self.tes.get_task(tes_id) - logger.debug("Observed task: %s", task) - if task.state in ["INITIALIZING", "RUNNING"]: - # We count INITIALIZING tasks because they may be e.g. pulling - # Docker containers, and we don't want to time out on them in - # the tests. But they may not have any runtimes, so it might - # not really help. - runtime = self._get_runtime(task) - if runtime: - # We can measure a runtime - bs_id_to_runtime[bs_id] = runtime - # If we can't find a runtime, we can't say it's running - # because we can't say how long it has been running for. - - # Give back the times all our running jobs have been running for. - return bs_id_to_runtime - - def killBatchJobs(self, job_ids: List[int]) -> None: - # Kill all the ones that are local - self.killLocalJobs(job_ids) - - for bs_id in job_ids: - if bs_id in self.bs_id_to_tes_id: - # We sent this to TES. So try to cancel it. - self._try_cancel(self.bs_id_to_tes_id[bs_id]) - # But don't forget the mapping until we actually get the finish - # notification for the job. - - # TODO: If the kill races the collection of a finished update, do we - # have to censor the finished update even if the kill never took - # effect??? That's not implemented. - - @classmethod - def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - parser.add_argument("--tesEndpoint", dest="tes_endpoint", default=None, env_var="TOIL_TES_ENDPOINT", - help=f"The http(s) URL of the TES server. If the provided value is None, the value will be " - f"generated at runtime. " - f"(Generated default: {cls.get_default_tes_endpoint()})") - parser.add_argument("--tesUser", dest="tes_user", default=None, env_var="TOIL_TES_USER", - help="User name to use for basic authentication to TES server.") - parser.add_argument("--tesPassword", dest="tes_password", default=None, env_var="TOIL_TES_PASSWORD", - help="Password to use for basic authentication to TES server.") - parser.add_argument("--tesBearerToken", dest="tes_bearer_token", default=None, env_var="TOIL_TES_BEARER_TOKEN", - help="Bearer token to use for authentication to TES server.") - - @classmethod - def setOptions(cls, setOption: OptionSetter) -> None: - # Because we use the keyword arguments, we can't specify a type for setOption without using Protocols. - # TODO: start using Protocols, or just start returning objects to represent the options. - # When actually parsing options, remember to check the environment variables - setOption("tes_endpoint") - setOption("tes_user") - setOption("tes_password") - setOption("tes_bearer_token") diff --git a/src/toil/common.py b/src/toil/common.py index 6a196b57b2..6b718b0a74 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -711,7 +711,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any logger.warning(f'Length of workDir path "{workDir}" is {len(workDir)} characters. ' f'Consider setting a shorter path with --workPath or setting TMPDIR to something ' f'like "/tmp" to avoid overly long paths.') - setattr(namespace, self.dest, values) + setattr(namespace, self.dest, workDir) class CoordinationDirAction(Action): """ @@ -725,7 +725,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any if not os.path.exists(coordination_dir): raise RuntimeError( f"The path provided to --coordinationDir ({coordination_dir}) does not exist.") - setattr(namespace, self.dest, values) + setattr(namespace, self.dest, coordination_dir) def make_closed_interval_action(min: Union[int, float], max: Optional[Union[int, float]] = None) -> Type[ Action]: @@ -1519,13 +1519,13 @@ def createBatchSystem(config: Config) -> "AbstractBatchSystem": maxMemory=config.maxMemory, maxDisk=config.maxDisk) - from toil.batchSystems.registry import BATCH_SYSTEM_FACTORY_REGISTRY + from toil.batchSystems.registry import get_batch_system, get_batch_systems try: - batch_system = BATCH_SYSTEM_FACTORY_REGISTRY[config.batchSystem]() + batch_system = get_batch_system(config.batchSystem) except KeyError: raise RuntimeError(f'Unrecognized batch system: {config.batchSystem} ' - f'(choose from: {BATCH_SYSTEM_FACTORY_REGISTRY.keys()})') + f'(choose from: {", ".join(get_batch_systems())})') if config.caching and not batch_system.supportsWorkerCleanup(): raise RuntimeError(f'{config.batchSystem} currently does not support shared caching, because it ' diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index b61894b042..e74640a3b2 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -26,11 +26,11 @@ import json import logging import os +import pprint import shutil import socket import stat import sys -import tempfile import textwrap import uuid from threading import Thread @@ -100,6 +100,7 @@ from schema_salad.exceptions import ValidationException from schema_salad.ref_resolver import file_uri, uri_file_path from schema_salad.sourceline import SourceLine +from tempfile import NamedTemporaryFile, gettempdir from typing_extensions import Literal from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM @@ -119,6 +120,7 @@ from toil.jobStores.abstractJobStore import AbstractJobStore, NoSuchFileException from toil.jobStores.fileJobStore import FileJobStore from toil.jobStores.utils import JobStoreUnavailableException, generate_locator +from toil.lib.io import mkdtemp from toil.lib.threading import ExceptionalThread from toil.statsAndLogging import DEFAULT_LOGLEVEL from toil.version import baseVersion @@ -126,7 +128,7 @@ logger = logging.getLogger(__name__) # Find the default temporary directory -DEFAULT_TMPDIR = tempfile.gettempdir() +DEFAULT_TMPDIR = gettempdir() # And compose a CWL-style default prefix inside it. # We used to not put this inside anything and we would drop loads of temp # directories in the current directory and leave them there. @@ -351,16 +353,24 @@ def __init__( def __repr__(self) -> str: """Allow for debug printing.""" - try: - return "ResolveSource(" + repr(self.resolve()) + ")" - except Exception: - return ( - f"ResolveSource({self.name}, {self.input}, {self.source_key}, " - f"{self.promise_tuples})" - ) + + parts = [f"source key {self.source_key}"] + + if "pickValue" in self.input: + parts.append(f"pick value {self.input['pickValue']} from") + + if isinstance(self.promise_tuples, list): + names = [n for n, _ in self.promise_tuples] + parts.append(f"names {names} in promises") + else: + name, _ = self.promise_tuples + parts.append(f"name {name} in promise") + + return f"ResolveSource({', '.join(parts)})" def resolve(self) -> Any: """First apply linkMerge then pickValue if either present.""" + result: Optional[Any] = None if isinstance(self.promise_tuples, list): result = self.link_merge( @@ -384,6 +394,7 @@ def link_merge( :param values: result of step """ + link_merge_type = self.input.get("linkMerge", "merge_nested") if link_merge_type == "merge_nested": @@ -411,6 +422,7 @@ def pick_value(self, values: Union[List[Union[str, SkipNull]], Any]) -> Any: without modification. :return: """ + pick_value_type = cast(str, self.input.get("pickValue")) if pick_value_type is None: @@ -427,6 +439,7 @@ def pick_value(self, values: Union[List[Union[str, SkipNull]], Any]) -> Any: if pick_value_type == "first_non_null": if len(result) < 1: + logger.error("Could not find non-null entry for %s:\n%s", self.name, pprint.pformat(self.promise_tuples)) raise cwl_utils.errors.WorkflowException( "%s: first_non_null operator found no non-null values" % self.name ) @@ -481,6 +494,11 @@ def __init__( self.req = req self.container_engine = container_engine + def __repr__(self) -> str: + """Allow for debug printing.""" + + return f"StepValueFrom({self.expr}, {self.source}, {self.req}, {self.container_engine})" + def eval_prep( self, step_inputs: CWLObjectType, file_store: AbstractFileStore ) -> None: @@ -553,6 +571,11 @@ def __init__(self, default: Any, source: Any): self.default = default self.source = source + def __repr__(self) -> str: + """Allow for debug printing.""" + + return f"DefaultWithSource({self.default}, {self.source})" + def resolve(self) -> Any: """ Determine the final input value when the time is right. @@ -575,6 +598,11 @@ def __init__(self, val: Any): """Store the value.""" self.val = val + def __repr__(self) -> str: + """Allow for debug printing.""" + + return f"JustAValue({self.val})" + def resolve(self) -> Any: """Return the value.""" return self.val @@ -1216,7 +1244,8 @@ def _abs(self, path: str) -> str: logger.debug("ToilFsAccess downloading %s to %s", cache_key, temp_dir) - # Save it all into this new temp directory + # Save it all into this new temp directory. + # Guaranteed to fill it with real files and not symlinks. download_structure(self.file_store, {}, {}, contents, temp_dir) # Make sure we use the same temp directory if we go traversing @@ -1246,7 +1275,7 @@ def _abs(self, path: str) -> str: logger.debug( "ToilFsAccess fetching directory %s from a JobStore", path ) - dest_dir = tempfile.mkdtemp() + dest_dir = mkdtemp() # Recursively fetch all the files in the directory. def download_to(url: str, dest: str) -> None: @@ -1269,7 +1298,7 @@ def download_to(url: str, dest: str) -> None: logger.debug("ToilFsAccess fetching file %s from a JobStore", path) # Try to grab it with a jobstore implementation, and save it # somewhere arbitrary. - dest_file = tempfile.NamedTemporaryFile(delete=False) + dest_file = NamedTemporaryFile(delete=False) AbstractJobStore.read_from_url(path, dest_file) dest_file.close() self.dir_to_download[path] = dest_file.name @@ -2024,7 +2053,7 @@ def _realpath( "CreateFile", "CreateWritableFile", ]: # TODO: CreateFile for buckets is not under testing - with tempfile.NamedTemporaryFile() as f: + with NamedTemporaryFile() as f: # Make a file with the right contents f.write(file_id_or_contents.encode("utf-8")) f.close() @@ -2318,7 +2347,7 @@ def run(self, file_store: AbstractFileStore) -> Any: cwllogger.removeHandler(defaultStreamHandler) cwllogger.setLevel(logger.getEffectiveLevel()) - logger.debug("Loaded order: %s", self.cwljob) + logger.debug("Loaded order:\n%s", self.cwljob) cwljob = resolve_dict_w_promises(self.cwljob, file_store) @@ -2835,6 +2864,10 @@ def run( if self.conditional.is_false(cwljob): return self.conditional.skipped_outputs() + # Apply default values set in the workflow + fs_access = ToilFsAccess(self.runtime_context.basedir, file_store=file_store) + fill_in_defaults(self.cwlwf.tool["inputs"], cwljob, fs_access) + # `promises` dict # from: each parameter (workflow input or step output) # that may be used as a "source" for a step input workflow output @@ -2897,6 +2930,8 @@ def run( get_container_engine(self.runtime_context), ) + logger.debug("Value will come from %s", jobobj.get(key, None)) + conditional = Conditional( expression=step.tool.get("when"), outputs=step.tool["out"], @@ -3645,7 +3680,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: workdir = cwltool.utils.create_tmp_dir(options.tmpdir_prefix) else: # Use a directory in the default tmpdir - workdir = tempfile.mkdtemp() + workdir = mkdtemp() # Make sure workdir doesn't exist so it can be a job store os.rmdir(workdir) @@ -3849,10 +3884,8 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: ) raise - # We make a ToilFSAccess to access URLs with, but it has no - # FileStore so it can't do toildir: and toilfile: - fs_access = ToilFsAccess(options.basedir) - fill_in_defaults(tool.tool["inputs"], initialized_job_order, fs_access) + # Leave the defaults un-filled in the top-level order. The tool or + # workflow will fill them when it runs for inp in tool.tool["inputs"]: if ( @@ -3910,6 +3943,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: # Import all the input files, some of which may be missing optional # files. + fs_access = ToilFsAccess(options.basedir) import_files( file_import_function, fs_access, diff --git a/src/toil/cwl/utils.py b/src/toil/cwl/utils.py index 60159fb564..d45cddae1d 100644 --- a/src/toil/cwl/utils.py +++ b/src/toil/cwl/utils.py @@ -140,6 +140,9 @@ def download_structure( """ Download nested dictionary from the Toil file store to a local path. + Guaranteed to fill the structure with real files, and not symlinks out of + it to elsewhere. + :param file_store: The Toil file store to download from. :param index: Maps from downloaded file path back to input Toil URI. @@ -173,9 +176,11 @@ def download_structure( raise RuntimeError(f"Did not find a filestore file at {value}") logger.debug("Downloading contained file '%s'", name) dest_path = os.path.join(into_dir, name) - # So download the file into place + # So download the file into place. + # Make sure to get a real copy of the file because we may need to + # mount the directory into a container as a whole. file_store.readGlobalFile( - FileID.unpack(value[len("toilfile:") :]), dest_path, symlink=True + FileID.unpack(value[len("toilfile:") :]), dest_path, symlink=False ) # Update the index dicts # TODO: why? diff --git a/src/toil/fileStores/abstractFileStore.py b/src/toil/fileStores/abstractFileStore.py index fc714681ca..600a7d3f36 100644 --- a/src/toil/fileStores/abstractFileStore.py +++ b/src/toil/fileStores/abstractFileStore.py @@ -13,9 +13,9 @@ # limitations under the License. import logging import os -import tempfile from abc import ABC, abstractmethod from contextlib import contextmanager +from tempfile import mkstemp from threading import Event, Semaphore from typing import (IO, TYPE_CHECKING, @@ -40,7 +40,7 @@ from toil.job import Job, JobDescription from toil.jobStores.abstractJobStore import AbstractJobStore from toil.lib.compatibility import deprecated -from toil.lib.io import WriteWatchingStream +from toil.lib.io import WriteWatchingStream, mkdtemp logger = logging.getLogger(__name__) @@ -207,7 +207,7 @@ def getLocalTempDir(self) -> str: to be deleted once the job terminates, removing all files it contains recursively. """ - return os.path.abspath(tempfile.mkdtemp(dir=self.localTempDir)) + return os.path.abspath(mkdtemp(dir=self.localTempDir)) def getLocalTempFile(self, suffix: Optional[str] = None, prefix: Optional[str] = None) -> str: """ @@ -223,7 +223,7 @@ def getLocalTempFile(self, suffix: Optional[str] = None, prefix: Optional[str] = for the duration of the job only, and is guaranteed to be deleted once the job terminates. """ - handle, tmpFile = tempfile.mkstemp( + handle, tmpFile = mkstemp( suffix=".tmp" if suffix is None else suffix, prefix="tmp" if prefix is None else prefix, dir=self.localTempDir diff --git a/src/toil/fileStores/cachingFileStore.py b/src/toil/fileStores/cachingFileStore.py index e216d1a57a..feb57346d3 100644 --- a/src/toil/fileStores/cachingFileStore.py +++ b/src/toil/fileStores/cachingFileStore.py @@ -20,10 +20,10 @@ import shutil import sqlite3 import stat -import tempfile import threading import time from contextlib import contextmanager +from tempfile import mkstemp from typing import Any, Callable, Generator, Iterator, Optional, Sequence, Tuple from toil.common import cacheDirName, getDirSizeRecursively, getFileSystemSize @@ -36,6 +36,7 @@ from toil.lib.io import (atomic_copy, atomic_copyobj, make_public_dir, + mkdtemp, robust_rmtree) from toil.lib.retry import ErrorCondition, retry from toil.lib.threading import get_process_name, process_name_exists @@ -600,7 +601,7 @@ def cachingIsFree(self): emptyID = self.jobStore.getEmptyFileStoreID() # Read it out to a generated name. - destDir = tempfile.mkdtemp(dir=self.localCacheDir) + destDir = mkdtemp(dir=self.localCacheDir) cachedFile = os.path.join(destDir, 'sniffLinkCount') self.jobStore.read_file(emptyID, cachedFile, symlink=False) @@ -644,7 +645,7 @@ def _getNewCachingPath(self, fileStoreID): # sure we can never collide even though we are going to remove the # file. # TODO: use a de-slashed version of the ID instead? - handle, path = tempfile.mkstemp(dir=self.localCacheDir, suffix=hasher.hexdigest()) + handle, path = mkstemp(dir=self.localCacheDir, suffix=hasher.hexdigest()) os.close(handle) os.unlink(path) diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index 142c9b0f36..4298e2ed7f 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -20,7 +20,6 @@ import shutil import stat import sys -import tempfile import time import uuid from contextlib import contextmanager @@ -42,6 +41,7 @@ from toil.lib.io import (AtomicFileCreate, atomic_copy, atomic_copyobj, + mkdtemp, robust_rmtree) logger = logging.getLogger(__name__) @@ -147,8 +147,8 @@ def assign_job_id(self, job_description): # Make a unique temp directory under a directory for this job name, # possibly sprayed across multiple levels of subdirectories. - absJobDir = tempfile.mkdtemp(prefix=self.JOB_DIR_PREFIX, - dir=self._get_arbitrary_jobs_dir_for_name(usefulFilename)) + absJobDir = mkdtemp(prefix=self.JOB_DIR_PREFIX, + dir=self._get_arbitrary_jobs_dir_for_name(usefulFilename)) job_description.jobStoreID = self._get_job_id_from_dir(absJobDir) diff --git a/src/toil/lib/io.py b/src/toil/lib/io.py index 8f9389c205..69fd60b68d 100644 --- a/src/toil/lib/io.py +++ b/src/toil/lib/io.py @@ -2,6 +2,7 @@ import os import shutil import stat +import tempfile import uuid from contextlib import contextmanager from io import BytesIO @@ -9,6 +10,26 @@ logger = logging.getLogger(__name__) +def mkdtemp(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None) -> str: + """ + Make a temporary directory like tempfile.mkdtemp, but with relaxed permissions. + + The permissions on the directory will be 711 instead of 700, allowing the + group and all other users to traverse the directory. This is necessary if + the direcotry is on NFS and the Docker daemon would like to mount it or a + file inside it into a container, because on NFS even the Docker daemon + appears bound by the file permissions. + + See , and + which talks about a similar problem + but in the context of user namespaces. + """ + # Make the directory + result = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dir) + # Grant all the permissions: full control for user, and execute for group and other + os.chmod(result, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + # Return the path created + return result def robust_rmtree(path: Union[str, bytes]) -> None: """ diff --git a/src/toil/resource.py b/src/toil/resource.py index 705fbe1253..739326e30a 100644 --- a/src/toil/resource.py +++ b/src/toil/resource.py @@ -23,7 +23,6 @@ from contextlib import closing from io import BytesIO from pydoc import locate -from tempfile import mkdtemp from urllib.error import HTTPError from urllib.request import urlopen from zipfile import ZipFile @@ -37,6 +36,7 @@ BinaryIO) from toil import inVirtualEnv +from toil.lib.io import mkdtemp from toil.lib.iterables import concat from toil.lib.memoize import strict_bool from toil.lib.retry import ErrorCondition, retry diff --git a/src/toil/server/wes/abstract_backend.py b/src/toil/server/wes/abstract_backend.py index 3eef604067..32abff6e3b 100644 --- a/src/toil/server/wes/abstract_backend.py +++ b/src/toil/server/wes/abstract_backend.py @@ -3,7 +3,6 @@ import json import logging import os -import tempfile from abc import abstractmethod from typing import Any, Callable, Dict, List, Optional, Tuple, Union from urllib.parse import urldefrag @@ -11,6 +10,8 @@ import connexion # type: ignore from werkzeug.utils import secure_filename +from toil.lib.io import mkdtemp + logger = logging.getLogger(__name__) # Define a type for WES task log entries in responses @@ -210,7 +211,7 @@ def collect_attachments(self, run_id: Optional[str], temp_dir: Optional[str]) -> If None, a temporary directory is created. """ if not temp_dir: - temp_dir = tempfile.mkdtemp() + temp_dir = mkdtemp() body: Dict[str, Any] = {} has_attachments = False for key, ls in connexion.request.files.lists(): diff --git a/src/toil/test/__init__.py b/src/toil/test/__init__.py index 7a10e68e84..b6cff6f4fa 100644 --- a/src/toil/test/__init__.py +++ b/src/toil/test/__init__.py @@ -21,7 +21,6 @@ import signal import subprocess import sys -import tempfile import threading import time import unittest @@ -30,6 +29,7 @@ from contextlib import contextmanager from inspect import getsource from shutil import which +from tempfile import mkstemp from textwrap import dedent from typing import (Any, Callable, @@ -57,6 +57,7 @@ from toil.lib.accelerators import (have_working_nvidia_docker_runtime, have_working_nvidia_smi) from toil.lib.aws import running_on_ec2 +from toil.lib.io import mkdtemp from toil.lib.iterables import concat from toil.lib.memoize import memoize from toil.lib.threading import ExceptionalThread, cpu_count @@ -188,7 +189,7 @@ def _createTempDirEx(cls, *names: Optional[str]) -> str: prefix.extend([_f for _f in names if _f]) prefix.append('') temp_dir_path = os.path.realpath( - tempfile.mkdtemp(dir=cls._tempBaseDir, prefix="-".join(prefix)) + mkdtemp(dir=cls._tempBaseDir, prefix="-".join(prefix)) ) cls._tempDirs.append(temp_dir_path) return temp_dir_path @@ -314,7 +315,7 @@ def _mark_test(name: str, test_item: MT) -> MT: def get_temp_file(suffix: str = "", rootDir: Optional[str] = None) -> str: """Return a string representing a temporary file, that must be manually deleted.""" if rootDir is None: - handle, tmp_file = tempfile.mkstemp(suffix) + handle, tmp_file = mkstemp(suffix) os.close(handle) return tmp_file else: @@ -447,32 +448,6 @@ def needs_torque(test_item: MT) -> MT: return test_item return unittest.skip("Install PBS/Torque to include this test.")(test_item) - -def needs_tes(test_item: MT) -> MT: - """Use as a decorator before test classes or methods to run only if TES is available.""" - test_item = _mark_test('tes', test_item) - - try: - from toil.batchSystems.tes import TESBatchSystem - except ImportError: - return unittest.skip("Install py-tes to include this test")(test_item) - - tes_url = os.environ.get('TOIL_TES_ENDPOINT', TESBatchSystem.get_default_tes_endpoint()) - try: - urlopen(tes_url) - except HTTPError: - # Funnel happens to 404 if TES is working. But any HTTPError means we - # dialed somebody who picked up. - pass - except URLError: - # Will give connection refused if we can't connect because the server's - # not there. We can also get a "cannot assign requested address" if - # we're on Kubernetes dialing localhost and !!creative things!! have - # been done to the network stack. - return unittest.skip(f"Run a TES server on {tes_url} to include this test")(test_item) - return test_item - - def needs_kubernetes_installed(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if Kubernetes is installed.""" test_item = _mark_test('kubernetes', test_item) diff --git a/src/toil/test/batchSystems/batchSystemTest.py b/src/toil/test/batchSystems/batchSystemTest.py index deae0bc4f8..6f8f095881 100644 --- a/src/toil/test/batchSystems/batchSystemTest.py +++ b/src/toil/test/batchSystems/batchSystemTest.py @@ -32,9 +32,9 @@ # protected by annotations. from toil.batchSystems.mesos.test import MesosTestSupport from toil.batchSystems.parasol import ParasolBatchSystem -from toil.batchSystems.registry import (BATCH_SYSTEM_FACTORY_REGISTRY, - BATCH_SYSTEMS, - addBatchSystemFactory, +from toil.batchSystems.registry import (get_batch_system, + get_batch_systems, + add_batch_system_factory, restore_batch_system_plugin_state, save_batch_system_plugin_state) from toil.batchSystems.singleMachine import SingleMachineBatchSystem @@ -54,7 +54,6 @@ needs_mesos, needs_parasol, needs_slurm, - needs_tes, needs_torque, slow) from toil.test.batchSystems.parasolTestSupport import ParasolTestSupport @@ -88,16 +87,16 @@ def tearDown(self): restore_batch_system_plugin_state(self.__state) super().tearDown() - def testAddBatchSystemFactory(self): + def test_add_batch_system_factory(self): def test_batch_system_factory(): # TODO: Adding the same batch system under multiple names means we # can't actually create Toil options, because each version tries to # add its arguments. return SingleMachineBatchSystem - addBatchSystemFactory('testBatchSystem', test_batch_system_factory) - assert ('testBatchSystem', test_batch_system_factory) in BATCH_SYSTEM_FACTORY_REGISTRY.items() - assert 'testBatchSystem' in BATCH_SYSTEMS + add_batch_system_factory('testBatchSystem', test_batch_system_factory) + assert 'testBatchSystem' in get_batch_systems() + assert get_batch_system('testBatchSystem') == SingleMachineBatchSystem class hidden: """ @@ -575,23 +574,6 @@ def test_label_constraints(self): self.assertEqual(str(spec.tolerations), "None") -@needs_tes -@needs_fetchable_appliance -class TESBatchSystemTest(hidden.AbstractBatchSystemTest): - """ - Tests against the TES batch system - """ - - def supportsWallTime(self): - return True - - def createBatchSystem(self): - # Import the batch system when we know we have it. - # Doesn't really matter for TES right now, but someday it might. - from toil.batchSystems.tes import TESBatchSystem - return TESBatchSystem(config=self.config, - maxCores=numCores, maxMemory=1e9, maxDisk=2001) - @needs_aws_batch @needs_fetchable_appliance class AWSBatchBatchSystemTest(hidden.AbstractBatchSystemTest): diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index b64725e3af..0a06240953 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -98,11 +98,11 @@ def run_conformance_tests( :param batchSystem: If set, use this batch system instead of the default single_machine. - :param selected_tests: If set, use this list of tests to run (comma-separated test IDs) + :param selected_tests: If set, use this description of test numbers to run (comma-separated numbers or ranges) :param selected_tags: As an alternative to selected_tests, run tests with the given tags. - :param skipped_tests: Comma-separated string IDs of tests to skip. + :param skipped_tests: Comma-separated string labels of tests to skip. :param extra_args: Provide these extra arguments to runner for each test. @@ -121,7 +121,7 @@ def run_conformance_tests( f"--basedir={workDir}", ] if selected_tests: - cmd.append(f"-s={selected_tests}") + cmd.append(f"-n={selected_tests}") if selected_tags: cmd.append(f"--tags={selected_tags}") if skipped_tests: @@ -139,9 +139,10 @@ def run_conformance_tests( "--logDebug", "--statusWait=10", "--retryCount=2", - f"--caching={caching}" ] + args_passed_directly_to_runner.append(f"--caching={caching}") + if extra_args: args_passed_directly_to_runner += extra_args @@ -425,6 +426,8 @@ def test_load_contents_file(self): self.load_contents("download_file.json", self._tester) @slow + @pytest.mark.integrative + @unittest.skip def test_bioconda(self): self._tester( "src/toil/test/cwl/seqtk_seq.cwl", @@ -435,6 +438,8 @@ def test_bioconda(self): ) @needs_docker + @pytest.mark.integrative + @unittest.skip def test_biocontainers(self): self._tester( "src/toil/test/cwl/seqtk_seq.cwl", @@ -704,7 +709,7 @@ def setUp(self): self.workDir = os.path.join(self.cwlSpec, "v1.0") # The latest cwl git commit hash from https://github.com/common-workflow-language/common-workflow-language. # Update it to get the latest tests. - testhash = "06c0cba1a178e20af2634b33dee648faff144bf8" # Date: Thu Mar 23 19:07:05 2023 +0900 (move label to id) + testhash = "6a955874ade22080b8ef962b4e0d6e408112c1ef" # Date: Tue Dec 16 2020 8:43pm PST url = ( "https://github.com/common-workflow-language/common-workflow-language/archive/%s.zip" % testhash @@ -782,8 +787,9 @@ def test_kubernetes_cwl_conformance(self, **kwargs): return self.test_run_conformance( batchSystem="kubernetes", extra_args=["--retryCount=3"], - # This CWL v1.0 test doesn't work with Singularity; see - # https://github.com/common-workflow-language/cwltool/blob/9398f3253558b6c972033b5f4ac397a61f355556/conformance-test.sh#L97-L99 + # This test doesn't work with + # Singularity; see + # https://github.com/common-workflow-language/cwltool/blob/7094ede917c2d5b16d11f9231fe0c05260b51be6/conformance-test.sh#L99-L117 skipped_tests="docker_entrypoint", **kwargs, ) @@ -844,7 +850,7 @@ def setUpClass(cls): cls.test_yaml = os.path.join(cls.cwlSpec, "conformance_tests.yaml") # TODO: Use a commit zip in case someone decides to rewrite master's history? url = "https://github.com/common-workflow-language/cwl-v1.1.git" - commit = "b1d4a69df86350059bd49aa127c02be0c349f7de" + commit = "664835e83eb5e57eee18a04ce7b05fb9d70d77b7" p = subprocess.Popen( f"git clone {url} {cls.cwlSpec} && cd {cls.cwlSpec} && git checkout {commit}", shell=True, @@ -871,8 +877,9 @@ def test_kubernetes_cwl_conformance(self, **kwargs): return self.test_run_conformance( batchSystem="kubernetes", extra_args=["--retryCount=3"], - # These CWL v1.1 tests don't work with Singularity; see - # https://github.com/common-workflow-language/cwltool/blob/9398f3253558b6c972033b5f4ac397a61f355556/conformance-test.sh#L97-L105 + # These tests don't work with + # Singularity; see + # https://github.com/common-workflow-language/cwltool/blob/7094ede917c2d5b16d11f9231fe0c05260b51be6/conformance-test.sh#L99-L117 skipped_tests="docker_entrypoint,stdin_shorcut", **kwargs, ) @@ -911,12 +918,21 @@ def tearDown(self): @slow @pytest.mark.timeout(CONFORMANCE_TEST_TIMEOUT) def test_run_conformance(self, **kwargs): + if "junit_file" not in kwargs: + kwargs["junit_file"] = os.path.join( + self.rootDir, "conformance-1.2.junit.xml" + ) run_conformance_tests(workDir=self.cwlSpec, yml=self.test_yaml, **kwargs) @slow @pytest.mark.timeout(CONFORMANCE_TEST_TIMEOUT) def test_run_conformance_with_caching(self): - self.test_run_conformance(caching=True) + self.test_run_conformance( + caching=True, + junit_file = os.path.join( + self.rootDir, "caching-conformance-1.2.junit.xml" + ) + ) @slow @pytest.mark.timeout(CONFORMANCE_TEST_TIMEOUT) @@ -927,7 +943,10 @@ def test_run_conformance_with_in_place_update(self): features. """ self.test_run_conformance( - extra_args=["--bypass-file-store"], must_support_all_features=True + extra_args=["--bypass-file-store"], must_support_all_features=True, + junit_file = os.path.join( + self.rootDir, "in-place-update-conformance-1.2.junit.xml" + ) ) @slow @@ -935,13 +954,14 @@ def test_run_conformance_with_in_place_update(self): def test_kubernetes_cwl_conformance(self, **kwargs): if "junit_file" not in kwargs: kwargs["junit_file"] = os.path.join( - self.rootDir, "kubernetes-conformance.junit.xml" + self.rootDir, "kubernetes-conformance-1.2.junit.xml" ) return self.test_run_conformance( batchSystem="kubernetes", extra_args=["--retryCount=3"], - # This CWL v1.2 test doesn't work with Singularity; see - # https://github.com/common-workflow-language/cwltool/blob/9398f3253558b6c972033b5f4ac397a61f355556/conformance-test.sh#L97-L99 + # This test doesn't work with + # Singularity; see + # https://github.com/common-workflow-language/cwltool/blob/7094ede917c2d5b16d11f9231fe0c05260b51be6/conformance-test.sh#L99-L117 # and # https://github.com/common-workflow-language/cwltool/issues/1441#issuecomment-826747975 skipped_tests="docker_entrypoint", @@ -954,7 +974,7 @@ def test_kubernetes_cwl_conformance_with_caching(self): return self.test_kubernetes_cwl_conformance( caching=True, junit_file=os.path.join( - self.rootDir, "kubernetes-caching-conformance.junit.xml" + self.rootDir, "kubernetes-caching-conformance-1.2.junit.xml" ), ) @@ -976,13 +996,18 @@ def test_wes_server_cwl_conformance(self): endpoint = os.environ.get("TOIL_WES_ENDPOINT") extra_args = [f"--wes_endpoint={endpoint}"] + # These are the ones that currently fail: + # - 310: mixed_version_v10_wf + # - 311: mixed_version_v11_wf + # - 312: mixed_version_v12_wf + # Main issues: # 1. `cwltool --print-deps` doesn't seem to include secondary files from the default # e.g.: https://github.com/common-workflow-language/cwl-v1.2/blob/1.2.1_proposed/tests/mixed-versions/wf-v10.cwl#L4-L10 return self.test_run_conformance( runner="toil-wes-cwl-runner", - skipped_tests="mixed_version_v10_wf,mixed_version_v11_wf,mixed_version_v12_wf", + selected_tests="1-309,313-337", extra_args=extra_args, ) @@ -1057,6 +1082,15 @@ def test_cwl_on_arm(self): ] ) + # We know if it succeeds it should save a junit XML for us to read. + # Bring it back to be an artifact. + self.rsync_util( + f":{self.cwl_test_dir}/toil/conformance-1.2.junit.xml", + os.path.join( + self._projectRootPath(), + "arm-conformance-1.2.junit.xml" + ) + ) @needs_cwl @pytest.mark.cwl_small_log_dir @@ -1492,9 +1526,9 @@ def test_download_structure(tmp_path) -> None: # The file store should have been asked to do the download file_store.readGlobalFile.assert_has_calls( [ - call(fid1, os.path.join(to_dir, "dir1/dir2/f1"), symlink=True), - call(fid1, os.path.join(to_dir, "dir1/dir2/f1again"), symlink=True), - call(fid2, os.path.join(to_dir, "anotherfile"), symlink=True), + call(fid1, os.path.join(to_dir, "dir1/dir2/f1"), symlink=False), + call(fid1, os.path.join(to_dir, "dir1/dir2/f1again"), symlink=False), + call(fid2, os.path.join(to_dir, "anotherfile"), symlink=False), ], any_order=True, ) diff --git a/src/toil/test/docs/scripts/example_alwaysfail.py b/src/toil/test/docs/scripts/example_alwaysfail.py index 2b4c3a55ba..9c47ddca1f 100644 --- a/src/toil/test/docs/scripts/example_alwaysfail.py +++ b/src/toil/test/docs/scripts/example_alwaysfail.py @@ -1,6 +1,7 @@ -from configargparse import ArgumentParser import sys +from configargparse import ArgumentParser + from toil.common import Toil from toil.job import Job diff --git a/src/toil/test/docs/scripts/example_cachingbenchmark.py b/src/toil/test/docs/scripts/example_cachingbenchmark.py index 04eb70d68d..463a3f46be 100755 --- a/src/toil/test/docs/scripts/example_cachingbenchmark.py +++ b/src/toil/test/docs/scripts/example_cachingbenchmark.py @@ -17,8 +17,6 @@ """ import argparse -from configargparse import ArgumentParser - import collections import os import random @@ -26,6 +24,8 @@ import sys import time +from configargparse import ArgumentParser + from toil.common import Toil from toil.job import Job from toil.realtimeLogger import RealtimeLogger diff --git a/src/toil/test/docs/scripts/tutorial_cwlexample.py b/src/toil/test/docs/scripts/tutorial_cwlexample.py index ed9db885af..7bf63bb2c1 100644 --- a/src/toil/test/docs/scripts/tutorial_cwlexample.py +++ b/src/toil/test/docs/scripts/tutorial_cwlexample.py @@ -1,9 +1,9 @@ import os import subprocess -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp def initialize_jobs(job): @@ -26,7 +26,7 @@ def runQC(job, cwl_file, cwl_filename, yml_file, yml_filename, outputs_dir, outp if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_cwlexample") + jobstore: str = mkdtemp("tutorial_cwlexample") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_docker.py b/src/toil/test/docs/scripts/tutorial_docker.py index 955720d50c..c002881fde 100644 --- a/src/toil/test/docs/scripts/tutorial_docker.py +++ b/src/toil/test/docs/scripts/tutorial_docker.py @@ -1,9 +1,9 @@ import os -import tempfile from toil.common import Toil from toil.job import Job from toil.lib.docker import apiDockerCall +from toil.lib.io import mkdtemp align = Job.wrapJobFn(apiDockerCall, image='ubuntu', @@ -11,7 +11,7 @@ parameters=['ls', '-lha']) if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_docker") + jobstore: str = mkdtemp("tutorial_docker") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_dynamic.py b/src/toil/test/docs/scripts/tutorial_dynamic.py index 6324e344bf..76f7e82d49 100644 --- a/src/toil/test/docs/scripts/tutorial_dynamic.py +++ b/src/toil/test/docs/scripts/tutorial_dynamic.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp def binaryStringFn(job, depth, message=""): @@ -14,7 +14,7 @@ def binaryStringFn(job, depth, message=""): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_dynamic") + jobstore: str = mkdtemp("tutorial_dynamic") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_encapsulation.py b/src/toil/test/docs/scripts/tutorial_encapsulation.py index a8c91ce663..ce8447c45b 100644 --- a/src/toil/test/docs/scripts/tutorial_encapsulation.py +++ b/src/toil/test/docs/scripts/tutorial_encapsulation.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp if __name__ == "__main__": # A is a job with children and follow-ons, for example: @@ -18,7 +18,7 @@ Ap.addChild(A) Ap.addFollowOn(B) - jobstore: str = tempfile.mkdtemp("tutorial_encapsulations") + jobstore: str = mkdtemp("tutorial_encapsulations") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_encapsulation2.py b/src/toil/test/docs/scripts/tutorial_encapsulation2.py index 546a1a15e8..5f4fa1f2ab 100644 --- a/src/toil/test/docs/scripts/tutorial_encapsulation2.py +++ b/src/toil/test/docs/scripts/tutorial_encapsulation2.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp if __name__ == "__main__": # A @@ -19,7 +19,7 @@ # With encapsulation A and its successor subgraph appear to be a single job, hence: A.addChild(B) - jobstore: str = tempfile.mkdtemp("tutorial_encapsulations2") + jobstore: str = mkdtemp("tutorial_encapsulations2") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_invokeworkflow.py b/src/toil/test/docs/scripts/tutorial_invokeworkflow.py index c6866ac824..fabac31fc7 100644 --- a/src/toil/test/docs/scripts/tutorial_invokeworkflow.py +++ b/src/toil/test/docs/scripts/tutorial_invokeworkflow.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp class HelloWorld(Job): @@ -15,7 +15,7 @@ def run(self, fileStore): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_invokeworkflow") + jobstore: str = mkdtemp("tutorial_invokeworkflow") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "OFF" diff --git a/src/toil/test/docs/scripts/tutorial_invokeworkflow2.py b/src/toil/test/docs/scripts/tutorial_invokeworkflow2.py index 639cc83f2f..728dd5c497 100644 --- a/src/toil/test/docs/scripts/tutorial_invokeworkflow2.py +++ b/src/toil/test/docs/scripts/tutorial_invokeworkflow2.py @@ -1,8 +1,9 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp + class HelloWorld(Job): def __init__(self, message): @@ -14,7 +15,7 @@ def run(self, fileStore): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_invokeworkflow2") + jobstore: str = mkdtemp("tutorial_invokeworkflow2") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_jobfunctions.py b/src/toil/test/docs/scripts/tutorial_jobfunctions.py index 37d189a760..35c328d327 100644 --- a/src/toil/test/docs/scripts/tutorial_jobfunctions.py +++ b/src/toil/test/docs/scripts/tutorial_jobfunctions.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp def helloWorld(job, message): @@ -10,7 +10,7 @@ def helloWorld(job, message): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_jobfunctions") + jobstore: str = mkdtemp("tutorial_jobfunctions") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_managing.py b/src/toil/test/docs/scripts/tutorial_managing.py index 78542c8efc..48e7a1ee43 100644 --- a/src/toil/test/docs/scripts/tutorial_managing.py +++ b/src/toil/test/docs/scripts/tutorial_managing.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp class LocalFileStoreJob(Job): @@ -15,7 +15,7 @@ def run(self, fileStore): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_managing") + jobstore: str = mkdtemp("tutorial_managing") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_managing2.py b/src/toil/test/docs/scripts/tutorial_managing2.py index 80f594e7f1..f1eb748dc1 100644 --- a/src/toil/test/docs/scripts/tutorial_managing2.py +++ b/src/toil/test/docs/scripts/tutorial_managing2.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp def globalFileStoreJobFn(job): @@ -44,7 +44,7 @@ def globalFileStoreJobFn(job): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_managing2") + jobstore: str = mkdtemp("tutorial_managing2") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_promises.py b/src/toil/test/docs/scripts/tutorial_promises.py index a9a850eadc..99c88b2305 100644 --- a/src/toil/test/docs/scripts/tutorial_promises.py +++ b/src/toil/test/docs/scripts/tutorial_promises.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp def fn(job, i): @@ -11,7 +11,7 @@ def fn(job, i): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_promises") + jobstore: str = mkdtemp("tutorial_promises") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_promises2.py b/src/toil/test/docs/scripts/tutorial_promises2.py index ea689f12ac..99bd28341d 100644 --- a/src/toil/test/docs/scripts/tutorial_promises2.py +++ b/src/toil/test/docs/scripts/tutorial_promises2.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp def binaryStrings(job, depth, message=""): @@ -18,7 +18,7 @@ def merge(strings): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_promises2") + jobstore: str = mkdtemp("tutorial_promises2") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.loglevel = "OFF" diff --git a/src/toil/test/docs/scripts/tutorial_quickstart.py b/src/toil/test/docs/scripts/tutorial_quickstart.py index a06f2e017f..4bea3d9e07 100644 --- a/src/toil/test/docs/scripts/tutorial_quickstart.py +++ b/src/toil/test/docs/scripts/tutorial_quickstart.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp def helloWorld(message): @@ -10,7 +10,7 @@ def helloWorld(message): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_quickstart") + jobstore: str = mkdtemp("tutorial_quickstart") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "OFF" diff --git a/src/toil/test/docs/scripts/tutorial_requirements.py b/src/toil/test/docs/scripts/tutorial_requirements.py index 0e0ba9409a..8cfaa5ab28 100644 --- a/src/toil/test/docs/scripts/tutorial_requirements.py +++ b/src/toil/test/docs/scripts/tutorial_requirements.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job, PromisedRequirement +from toil.lib.io import mkdtemp def parentJob(job): @@ -26,7 +26,7 @@ def analysisJob(job, fileStoreID): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_requirements") + jobstore: str = mkdtemp("tutorial_requirements") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_services.py b/src/toil/test/docs/scripts/tutorial_services.py index 5bcf9faa47..94bc7218f7 100644 --- a/src/toil/test/docs/scripts/tutorial_services.py +++ b/src/toil/test/docs/scripts/tutorial_services.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp class DemoService(Job.Service): @@ -35,7 +35,7 @@ def dbFn(loginCredentials): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_services") + jobstore: str = mkdtemp("tutorial_services") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/docs/scripts/tutorial_staging.py b/src/toil/test/docs/scripts/tutorial_staging.py index e5f6e27fc9..57a1431ed9 100644 --- a/src/toil/test/docs/scripts/tutorial_staging.py +++ b/src/toil/test/docs/scripts/tutorial_staging.py @@ -1,8 +1,8 @@ import os -import tempfile from toil.common import Toil from toil.job import Job +from toil.lib.io import mkdtemp class HelloWorld(Job): @@ -18,7 +18,7 @@ def run(self, fileStore): if __name__ == "__main__": - jobstore: str = tempfile.mkdtemp("tutorial_staging") + jobstore: str = mkdtemp("tutorial_staging") os.rmdir(jobstore) options = Job.Runner.getDefaultOptions(jobstore) options.logLevel = "INFO" diff --git a/src/toil/test/jobStores/jobStoreTest.py b/src/toil/test/jobStores/jobStoreTest.py index 3e91822992..461778a5a4 100644 --- a/src/toil/test/jobStores/jobStoreTest.py +++ b/src/toil/test/jobStores/jobStoreTest.py @@ -18,7 +18,6 @@ import os import shutil import socketserver -import tempfile import threading import time import urllib.parse as urlparse @@ -27,6 +26,7 @@ from io import BytesIO from itertools import chain, islice from queue import Queue +from tempfile import mkstemp from threading import Thread from typing import Any, Tuple from urllib.request import Request, urlopen @@ -41,6 +41,7 @@ NoSuchJobException) from toil.jobStores.fileJobStore import FileJobStore from toil.lib.aws.utils import create_s3_bucket, get_object_for_url +from toil.lib.io import mkdtemp from toil.lib.memoize import memoize from toil.lib.retry import retry from toil.statsAndLogging import StatsAndLogging @@ -443,7 +444,7 @@ def testPerJobFiles(self): self.assertEqual(f.read(), one) # ... and copy it to a temporary physical file on the jobstore1. - fh, path = tempfile.mkstemp() + fh, path = mkstemp() try: os.close(fh) tmpPath = path + '.read-only' @@ -858,7 +859,7 @@ def checksumThreadFn(): # Multi-part upload from file checksum = hashlib.md5() - fh, path = tempfile.mkstemp() + fh, path = mkstemp() try: with os.fdopen(fh, 'wb+') as writable: with open('/dev/urandom', 'rb') as readable: @@ -1046,7 +1047,7 @@ def testDestructionIdempotence(self): def testEmptyFileStoreIDIsReadable(self): """Simply creates an empty fileStoreID and attempts to read from it.""" id = self.jobstore_initialized.get_empty_file_store_id() - fh, path = tempfile.mkstemp() + fh, path = mkstemp() try: self.jobstore_initialized.read_file(id, path) self.assertTrue(os.path.isfile(path)) @@ -1075,7 +1076,7 @@ class Test(AbstractJobStoreTest.Test, metaclass=ABCMeta): def setUp(self): # noinspection PyAttributeOutsideInit - self.sseKeyDir = tempfile.mkdtemp() + self.sseKeyDir = mkdtemp() super().setUp() def tearDown(self): @@ -1143,14 +1144,14 @@ def _hashTestFile(self, url): return hashlib.md5(f.read()).hexdigest() def _createExternalStore(self): - return tempfile.mkdtemp() + return mkdtemp() def _cleanUpExternalStore(self, dirPath): shutil.rmtree(dirPath) def testPreserveFileName(self): """Check that the fileID ends with the given file name.""" - fh, path = tempfile.mkstemp() + fh, path = mkstemp() try: os.close(fh) job = self.arbitraryJob() diff --git a/src/toil/test/provisioners/clusterTest.py b/src/toil/test/provisioners/clusterTest.py index 41f201b126..aef2b6224a 100644 --- a/src/toil/test/provisioners/clusterTest.py +++ b/src/toil/test/provisioners/clusterTest.py @@ -143,6 +143,17 @@ def sshUtil(self, command): log.error("Failed to run %s.", str(cmd)) raise subprocess.CalledProcessError(p.returncode, ' '.join(cmd)) + @retry(errors=[subprocess.CalledProcessError], intervals=[1, 1]) + def rsync_util(self, from_file: str, to_file: str) -> None: + """ + Transfer a file to/from the cluster. + + The cluster-side path should have a ':' in front of it. + """ + cmd = ['toil', 'rsync-cluster', '--insecure', '-p=aws', '-z', self.zone, self.clusterName, from_file, to_file] + log.info("Running %s.", str(cmd)) + subprocess.check_call(cmd) + @retry(errors=[subprocess.CalledProcessError], intervals=[1, 1]) def createClusterUtil(self, args=None): args = [] if args is None else args diff --git a/src/toil/test/src/miscTests.py b/src/toil/test/src/miscTests.py index 24705aef29..e03aa27cbd 100644 --- a/src/toil/test/src/miscTests.py +++ b/src/toil/test/src/miscTests.py @@ -16,12 +16,11 @@ import os import random import sys -import tempfile from uuid import uuid4 from toil.common import getNodeID from toil.lib.exceptions import panic, raise_ -from toil.lib.io import AtomicFileCreate, atomic_install, atomic_tmp_file +from toil.lib.io import AtomicFileCreate, atomic_install, atomic_tmp_file, mkdtemp from toil.lib.misc import CalledProcessErrorStderr, call_command from toil.test import ToilTest, slow @@ -63,7 +62,7 @@ def testGetSizeOfDirectoryWorks(self): files = {} # Create a random directory structure for i in range(0,10): - directories.append(tempfile.mkdtemp(dir=random.choice(directories), prefix='test')) + directories.append(mkdtemp(dir=random.choice(directories), prefix='test')) # Create 50 random file entries in different locations in the directories. 75% of the time # these are fresh files of size [1, 10] MB and 25% of the time they are hard links to old # files. diff --git a/src/toil/test/src/systemTest.py b/src/toil/test/src/systemTest.py index 560d885a7e..3576165700 100644 --- a/src/toil/test/src/systemTest.py +++ b/src/toil/test/src/systemTest.py @@ -1,9 +1,9 @@ import errno import multiprocessing import os -import tempfile from functools import partial +from toil.lib.io import mkdtemp from toil.lib.threading import cpu_count from toil.test import ToilTest @@ -37,7 +37,7 @@ def testAtomicityOfNonEmptyDirectoryRenames(self): def _testAtomicityOfNonEmptyDirectoryRenamesTask(parent, child, _): - tmpChildDir = tempfile.mkdtemp(dir=parent, prefix='child', suffix='.tmp') + tmpChildDir = mkdtemp(dir=parent, prefix='child', suffix='.tmp') grandChild = os.path.join(tmpChildDir, 'grandChild') open(grandChild, 'w').close() grandChildId = os.stat(grandChild).st_ino diff --git a/src/toil/test/utils/ABCWorkflowDebug/debugWorkflow.py b/src/toil/test/utils/ABCWorkflowDebug/debugWorkflow.py index 6278a2417d..ba953bfb2a 100644 --- a/src/toil/test/utils/ABCWorkflowDebug/debugWorkflow.py +++ b/src/toil/test/utils/ABCWorkflowDebug/debugWorkflow.py @@ -2,9 +2,9 @@ import os import sys import subprocess -import tempfile from toil.common import Toil +from toil.lib.io import mkdtemp from toil.job import Job from toil.version import python @@ -137,7 +137,7 @@ def broken_job(job, num): file = toil.importFile(None) if __name__=="__main__": - jobStorePath = sys.argv[1] if len(sys.argv) > 1 else tempfile.mkdtemp("debugWorkflow") + jobStorePath = sys.argv[1] if len(sys.argv) > 1 else mkdtemp("debugWorkflow") options = Job.Runner.getDefaultOptions(jobStorePath) options.clean = "never" options.stats = True diff --git a/src/toil/wdl/wdl_synthesis.py b/src/toil/wdl/wdl_synthesis.py index 395c5cc458..1ea8efec70 100644 --- a/src/toil/wdl/wdl_synthesis.py +++ b/src/toil/wdl/wdl_synthesis.py @@ -13,9 +13,9 @@ # limitations under the License. import logging import os -import tempfile from typing import Optional +from toil.lib.io import mkdtemp from toil.wdl.wdl_functions import heredoc_wdl from toil.wdl.wdl_types import (WDLArrayType, WDLCompoundType, @@ -69,7 +69,7 @@ def __init__(self, if jobstore: self.jobstore = jobstore else: - self.jobstore = tempfile.mkdtemp(prefix=f"{os.getcwd()}{os.sep}toilWorkflowRun") + self.jobstore = mkdtemp(prefix=f"{os.getcwd()}{os.sep}toilWorkflowRun") os.rmdir(self.jobstore) if docker_user != 'None': diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index a29d78677a..cd1fd7d47b 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -28,11 +28,11 @@ import shutil import subprocess import sys -import tempfile import uuid from contextlib import ExitStack, contextmanager from graphlib import TopologicalSorter +from tempfile import mkstemp from typing import cast, Any, Callable, Union, Dict, List, Optional, Set, Sequence, Tuple, Type, TypeVar, Iterator, \ Iterable, Generator from urllib.parse import urlsplit, urljoin, quote, unquote @@ -51,6 +51,7 @@ from toil.fileStores import FileID from toil.fileStores.abstractFileStore import AbstractFileStore from toil.jobStores.abstractJobStore import AbstractJobStore, UnimplementedURLException +from toil.lib.io import mkdtemp from toil.lib.memoize import memoize from toil.lib.conversions import convert_units, human2bytes from toil.lib.misc import get_user_name @@ -2550,12 +2551,12 @@ def main() -> None: # Make sure we have a jobStore if options.jobStore is None: # TODO: Move cwltoil's generate_default_job_store where we can use it - options.jobStore = os.path.join(tempfile.mkdtemp(), 'tree') + options.jobStore = os.path.join(mkdtemp(), 'tree') # Make sure we have an output directory (or URL prefix) and we don't need # to ever worry about a None, and MyPy knows it. # If we don't have a directory assigned, make one in the current directory. - output_directory: str = options.output_directory if options.output_directory else tempfile.mkdtemp(prefix='wdl-out-', dir=os.getcwd()) + output_directory: str = options.output_directory if options.output_directory else mkdtemp(prefix='wdl-out-', dir=os.getcwd()) with Toil(options) as toil: if options.restart: @@ -2672,7 +2673,7 @@ def devirtualize_output(filename: str) -> str: else: # Export output to path or URL. # So we need to import and then export. - fd, filename = tempfile.mkstemp() + fd, filename = mkstemp() with open(fd, 'w') as handle: # Populate the file handle.write(json.dumps(outputs))