diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index ed0bc2f3c7..6fdb588440 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -19,7 +19,7 @@ To be copied to the [draft changelog](https://github.com/DataBiosphere/toil/wiki * [ ] New functions without [type hints](https://docs.python.org/3/library/typing.html). * [ ] New functions or classes without informative docstrings. * [ ] Changes to semantics not reflected in the relevant docstrings. - * [ ] New or changed command line options for Toil workflows that are not reflected in `docs/running/cliOptions.rst` + * [ ] New or changed command line options for Toil workflows that are not reflected in `docs/running/{cliOptions,cwl,wdl}.rst` * [ ] New features without tests. * [ ] Comment on the lines of code where problems exist with a review comment. You can shift-click the line numbers in the diff to select multiple lines. * [ ] Finish the review with an overall description of your opinion. diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index 2a2a18b0bd..da1046b40a 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -174,5 +174,9 @@ There are several environment variables that affect the way Toil runs. | | in the Toil Docker container to use as a mirror | | | for Docker Hub. | +----------------------------------+----------------------------------------------------+ +| OMP_NUM_THREADS | The number of cores set for OpenMP applications in | +| | the workers. If not set, Toil will use the number | +| | of job threads. | ++----------------------------------+----------------------------------------------------+ .. _standard temporary directory: https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir diff --git a/docs/running/cwl.rst b/docs/running/cwl.rst index bb0cb54614..46e08fa2c3 100644 --- a/docs/running/cwl.rst +++ b/docs/running/cwl.rst @@ -95,6 +95,10 @@ printed to the stdout stream after workflow execution. ``--stats``: Save resources usages in json files that can be collected with the ``toil stats`` command after the workflow is done. +``--disable-streaming``: Does not allow streaming of input files. This is enabled +by default for files marked with ``streamable`` flag True and only for remote files +when the jobStore is not on local machine. + Running CWL in the Cloud ------------------------ diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index e9818de4d5..c6b87014c9 --- a/setup.py +++ b/setup.py @@ -18,6 +18,9 @@ from setuptools import find_packages, setup +cwltool_version = '3.1.20210816212154' + + def run_setup(): """ Calls setup(). This function exists so the setup() invocation preceded more internal @@ -33,7 +36,7 @@ def run_setup(): gcs = 'google-cloud-storage==1.6.0' gcs_oauth2_boto_plugin = 'gcs_oauth2_boto_plugin==1.14' apacheLibcloud = 'apache-libcloud==2.2.1' - cwltool = 'cwltool==3.1.20210616134059' + cwltool = f'cwltool=={cwltool_version}' galaxyToolUtil = 'galaxy-tool-util' htcondor = 'htcondor>=8.6.0' kubernetes = 'kubernetes>=12.0.1, <13' @@ -168,7 +171,10 @@ def import_version(): # Use the template to generate src/toil/version.py import version_template with NamedTemporaryFile(mode='w', dir='src/toil', prefix='version.py.', delete=False) as f: - f.write(version_template.expand_()) + f.write(version_template.expand_(others={ + # expose the dependency versions that we may need to access in Toil + 'cwltool_version': cwltool_version, + })) os.rename(f.name, 'src/toil/version.py') # Unfortunately, we can't use a straight import here because that would also load the stuff diff --git a/src/toil/batchSystems/abstractBatchSystem.py b/src/toil/batchSystems/abstractBatchSystem.py index a97f4f8220..478adbede9 100644 --- a/src/toil/batchSystems/abstractBatchSystem.py +++ b/src/toil/batchSystems/abstractBatchSystem.py @@ -110,11 +110,13 @@ def setUserScript(self, userScript): raise NotImplementedError() @abstractmethod - def issueBatchJob(self, jobDesc): + def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None): """ Issues a job with the specified command to the batch system and returns a unique jobID. :param jobDesc a toil.job.JobDescription + :param job_environment: a collection of job-specific environment variables + to be set on the worker. :return: a unique jobID that can be used to reference the newly issued job :rtype: int @@ -443,6 +445,7 @@ def shutdownLocal(self): # type: () -> None """To be called from shutdown()""" self.localBatch.shutdown() + class BatchSystemCleanupSupport(BatchSystemLocalSupport): """ Adds cleanup support when the last running job leaves a node, for batch diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 0e7a218396..99d27de55e 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -17,7 +17,7 @@ from datetime import datetime from queue import Empty, Queue from threading import Lock, Thread -from typing import Any, List, Union +from typing import Any, List, Dict, Union, Optional from toil.batchSystems.abstractBatchSystem import (BatchJobExitReason, BatchSystemCleanupSupport, @@ -107,10 +107,10 @@ def createJobs(self, newJob: Any) -> bool: while len(self.waitingJobs) > 0 and \ len(self.runningJobs) < int(self.boss.config.maxLocalJobs): activity = True - jobID, cpu, memory, command, jobName = self.waitingJobs.pop(0) + jobID, cpu, memory, command, jobName, environment = self.waitingJobs.pop(0) # prepare job submission command - subLine = self.prepareSubmission(cpu, memory, jobID, command, jobName) + subLine = self.prepareSubmission(cpu, memory, jobID, command, jobName, environment) logger.debug("Running %r", subLine) batchJobID = self.boss.with_retries(self.submitJob, subLine) logger.debug("Submitted job %s", str(batchJobID)) @@ -255,29 +255,35 @@ def run(self): logger.error("GridEngine like batch system failure", exc_info=ex) raise - @abstractmethod def coalesce_job_exit_codes(self, batch_job_id_list: list) -> list: """ Returns exit codes for a list of jobs. Implementation-specific; called by AbstractGridEngineWorker.checkOnJobs() - :param string batchjobIDList: List of batch system job ID + :param string batch_job_id_list: List of batch system job ID """ raise NotImplementedError() @abstractmethod - def prepareSubmission(self, cpu, memory, jobID, command, jobName): + def prepareSubmission(self, + cpu: int, + memory: int, + jobID: int, + command: str, + jobName: str, + job_environment: Optional[Dict[str, str]] = None) -> List[str]: """ Preparation in putting together a command-line string for submitting to batch system (via submitJob().) - :param: string cpu - :param: string memory - :param: string jobID : Toil job ID + :param: int cpu + :param: int memory + :param: int jobID: Toil job ID :param: string subLine: the command line string to be called :param: string jobName: the name of the Toil job, to provide metadata to batch systems if desired + :param: dict job_environment: the environment variables to be set on the worker - :rtype: string + :rtype: List[str] """ raise NotImplementedError() @@ -354,7 +360,7 @@ def supportsWorkerCleanup(cls): def supportsAutoDeployment(cls): return False - def issueBatchJob(self, jobDesc): + def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None): # Avoid submitting internal jobs to the batch queue, handle locally localID = self.handleLocalJob(jobDesc) if localID: @@ -363,7 +369,8 @@ def issueBatchJob(self, jobDesc): self.checkResourceRequest(jobDesc.memory, jobDesc.cores, jobDesc.disk) jobID = self.getNextJobID() self.currentJobs.add(jobID) - self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, jobDesc.command, jobDesc.jobName)) + self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, jobDesc.command, jobDesc.jobName, + job_environment)) logger.debug("Issued the job command: %s with job id: %s and job name %s", jobDesc.command, str(jobID), jobDesc.jobName) return jobID diff --git a/src/toil/batchSystems/gridengine.py b/src/toil/batchSystems/gridengine.py index 2140ea10b3..ee081ceafd 100644 --- a/src/toil/batchSystems/gridengine.py +++ b/src/toil/batchSystems/gridengine.py @@ -16,6 +16,7 @@ import os import time from pipes import quote +from typing import Optional, List, Dict from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem from toil.lib.misc import CalledProcessErrorStderr, call_command @@ -48,8 +49,14 @@ def getRunningJobIDs(self): def killJob(self, jobID): call_command(['qdel', self.getBatchSystemID(jobID)]) - def prepareSubmission(self, cpu, memory, jobID, command, jobName): - return self.prepareQsub(cpu, memory, jobID) + [command] + def prepareSubmission(self, + cpu: int, + memory: int, + jobID: int, + command: str, + jobName: str, + job_environment: Optional[Dict[str, str]] = None): + return self.prepareQsub(cpu, memory, jobID, job_environment) + [command] def submitJob(self, subLine): stdout = call_command(subLine) @@ -94,14 +101,22 @@ def getJobExitCode(self, sgeJobID): """ Implementation-specific helper methods """ - def prepareQsub(self, cpu: int, mem: int, jobID: int) -> List[str]: + def prepareQsub(self, + cpu: int, + mem: int, + jobID: int, + job_environment: Optional[Dict[str, str]] = None) -> List[str]: qsubline = ['qsub', '-V', '-b', 'y', '-terse', '-j', 'y', '-cwd', '-N', 'toil_job_' + str(jobID)] - if self.boss.environment: + environment = self.boss.environment.copy() + if job_environment: + environment.update(job_environment) + + if environment: qsubline.append('-v') qsubline.append(','.join(k + '=' + quote(os.environ[k] if v is None else v) - for k, v in self.boss.environment.items())) + for k, v in environment.items())) reqline = list() sgeArgs = os.getenv('TOIL_GRIDENGINE_ARGS') diff --git a/src/toil/batchSystems/htcondor.py b/src/toil/batchSystems/htcondor.py index a2d2fc8532..30b1de8aec 100644 --- a/src/toil/batchSystems/htcondor.py +++ b/src/toil/batchSystems/htcondor.py @@ -16,7 +16,7 @@ import math import os import time -from typing import Any +from typing import Any, Optional, Dict import htcondor @@ -277,12 +277,12 @@ def connectSchedd(self): return schedd def getEnvString(self): - '''Build an environment string that a HTCondor Submit object can use. + """ + Build an environment string that a HTCondor Submit object can use. For examples of valid strings, see: http://research.cs.wisc.edu/htcondor/manual/current/condor_submit.html#man-condor-submit-environment - - ''' + """ env_items = [] if self.boss.environment: @@ -302,7 +302,7 @@ def getEnvString(self): return '"' + ' '.join(env_items) + '"' # Override the issueBatchJob method so HTCondor can be given the disk request - def issueBatchJob(self, jobNode): + def issueBatchJob(self, jobNode, job_environment: Optional[Dict[str, str]] = None): # Avoid submitting internal jobs to the batch queue, handle locally localID = self.handleLocalJob(jobNode) if localID: @@ -313,6 +313,7 @@ def issueBatchJob(self, jobNode): self.currentJobs.add(jobID) # Add the jobNode.disk and jobNode.jobName to the job tuple - self.newJobsQueue.put((jobID, jobNode.cores, jobNode.memory, jobNode.disk, jobNode.jobName, jobNode.command)) + self.newJobsQueue.put((jobID, jobNode.cores, jobNode.memory, jobNode.disk, jobNode.jobName, jobNode.command, + job_environment)) logger.debug("Issued the job command: %s with job id: %s ", jobNode.command, str(jobID)) return jobID diff --git a/src/toil/batchSystems/kubernetes.py b/src/toil/batchSystems/kubernetes.py index b5b8acbdb2..88bdb25494 100644 --- a/src/toil/batchSystems/kubernetes.py +++ b/src/toil/batchSystems/kubernetes.py @@ -32,6 +32,7 @@ import tempfile import time import uuid +from typing import Optional, Dict import kubernetes import pytz @@ -362,15 +363,23 @@ def _create_affinity(self, preemptable: bool) -> kubernetes.client.V1Affinity: # Make the node affinity into an overall affinity return kubernetes.client.V1Affinity(node_affinity=node_affinity) - def _create_pod_spec(self, jobDesc: JobDescription) -> kubernetes.client.V1PodSpec: + def _create_pod_spec( + self, + jobDesc: JobDescription, + job_environment: Optional[Dict[str, str]] = None + ) -> kubernetes.client.V1PodSpec: """ Make the specification for a pod that can execute the given job. """ + environment = self.environment.copy() + if job_environment: + environment.update(job_environment) + # Make a job dict to send to the executor. # First just wrap the command and the environment to run it in job = {'command': jobDesc.command, - 'environment': self.environment.copy()} + 'environment': environment} # TODO: query customDockerInitCmd to respect TOIL_CUSTOM_DOCKER_INIT_COMMAND if self.userScript is not None: @@ -456,8 +465,7 @@ def _create_pod_spec(self, jobDesc: JobDescription) -> kubernetes.client.V1PodSp return pod_spec - - def issueBatchJob(self, jobDesc): + def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None): # TODO: get a sensible self.maxCores, etc. so we can checkResourceRequest. # How do we know if the cluster will autoscale? @@ -473,7 +481,7 @@ def issueBatchJob(self, jobDesc): self.checkResourceRequest(jobDesc.memory, jobDesc.cores, jobDesc.disk) # Make a pod that describes running the job - pod_spec = self._create_pod_spec(jobDesc) + pod_spec = self._create_pod_spec(jobDesc, job_environment=job_environment) # Make a batch system scope job ID jobID = self.getNextJobID() diff --git a/src/toil/batchSystems/lsf.py b/src/toil/batchSystems/lsf.py index a3e687b49c..6fdf1fa4aa 100644 --- a/src/toil/batchSystems/lsf.py +++ b/src/toil/batchSystems/lsf.py @@ -25,7 +25,7 @@ import subprocess from datetime import datetime from random import randint -from typing import List, Union +from typing import Union, Optional, List, Dict from dateutil.parser import parse from dateutil.tz import tzlocal @@ -85,12 +85,23 @@ def fallbackRunningJobIDs(self, currentjobs): def killJob(self, jobID): call_command(['bkill', self.getBatchSystemID(jobID)]) - def prepareSubmission(self, cpu, memory, jobID, command, jobName): - return self.prepareBsub(cpu, memory, jobID) + [command] + def prepareSubmission(self, + cpu: int, + memory: int, + jobID: int, + command: str, + jobName: str, + job_environment: Optional[Dict[str, str]] = None): + return (self.prepareBsub(cpu, memory, jobID) + [command], + job_environment) # pass job_environment to .submitJob() def submitJob(self, subLine): + subLine, job_environment = subLine combinedEnv = self.boss.environment combinedEnv.update(os.environ) + if job_environment: + combinedEnv.update(job_environment) + stdout = call_command(subLine, env=combinedEnv) # Example success: Job <39605914> is submitted to default queue . # Example fail: Service class does not exist. Job not submitted. @@ -102,7 +113,7 @@ def submitJob(self, subLine): else: logger.error("Could not submit job\nReason: {}".format(stdout)) temp_id = randint(10000000, 99999999) - #Flag this job to be handled by getJobExitCode + # Flag this job to be handled by getJobExitCode result = "NOT_SUBMITTED_{}".format(temp_id) return result @@ -229,7 +240,6 @@ def parse_bjobs_record(self, bjobs_record: dict, job: int) -> Union[int, None]: return self.getJobExitCodeBACCT(job) - def getJobExitCodeBACCT(self,job): # if not found in bjobs, then try bacct (slower than bjobs) logger.debug("bjobs failed to detect job - trying bacct: " @@ -318,7 +328,7 @@ def prepareBsub(self, cpu: int, mem: int, jobID: int) -> List[str]: bsubline.extend(lsfArgs.split()) return bsubline - def parseBjobs(self,bjobs_output_str): + def parseBjobs(self, bjobs_output_str): """ Parse records from bjobs json type output params: diff --git a/src/toil/batchSystems/lsfHelper.py b/src/toil/batchSystems/lsfHelper.py index 352c04c2cd..9ec547a2fd 100755 --- a/src/toil/batchSystems/lsfHelper.py +++ b/src/toil/batchSystems/lsfHelper.py @@ -22,6 +22,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import fnmatch +import logging import os import re import subprocess @@ -37,6 +38,8 @@ DEFAULT_RESOURCE_UNITS = "MB" LSF_JSON_OUTPUT_MIN_VERSION = "10.1.0.2" +logger = logging.getLogger(__name__) + def find(basedir, string): """ @@ -109,8 +112,9 @@ def apply_bparams(fn): """ cmd = ["bparams", "-a"] try: - output = subprocess.check_output(cmd).decode('utf-8') - except: + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode('utf-8') + except subprocess.CalledProcessError as exc: + logger.debug(exc.output.decode('utf-8')) return None return fn(output.split("\n")) @@ -121,8 +125,9 @@ def apply_lsadmin(fn): """ cmd = ["lsadmin", "showconf", "lim"] try: - output = subprocess.check_output(cmd).decode('utf-8') - except: + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode('utf-8') + except subprocess.CalledProcessError as exc: + logger.debug(exc.output.decode('utf-8')) return None return fn(output.split("\n")) diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index 58824dfd66..1ed5280576 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -24,6 +24,7 @@ import traceback from contextlib import contextmanager from queue import Empty, Queue +from typing import Optional, Dict from urllib.parse import quote_plus from urllib.request import urlopen @@ -167,7 +168,7 @@ def ignoreNode(self, nodeAddress): def unignoreNode(self, nodeAddress): self.ignoredNodes.remove(nodeAddress) - def issueBatchJob(self, jobNode: JobDescription): + def issueBatchJob(self, jobNode: JobDescription, job_environment: Optional[Dict[str, str]] = None): """ Issues the following command returning a unique jobID. Command is the string to run, memory is an int giving the number of bytes the job needs to run in and cores is the number of cpus @@ -190,12 +191,16 @@ def issueBatchJob(self, jobNode: JobDescription): ) jobID = self.getNextJobID() + environment = self.environment.copy() + if job_environment: + environment.update(job_environment) + job = ToilJob(jobID=jobID, name=str(jobNode), resources=MesosShape(wallTime=0, **mesos_resources), command=jobNode.command, userScript=self.userScript, - environment=self.environment.copy(), + environment=environment, workerCleanupInfo=self.workerCleanupInfo) jobType = job.resources log.debug("Queueing the job command: %s with job id: %s ...", jobNode.command, str(jobID)) diff --git a/src/toil/batchSystems/parasol.py b/src/toil/batchSystems/parasol.py index 38a18e7362..6317c5fa1e 100644 --- a/src/toil/batchSystems/parasol.py +++ b/src/toil/batchSystems/parasol.py @@ -22,6 +22,7 @@ from queue import Empty, Queue from shutil import which from threading import Thread +from typing import Optional, Dict from toil.batchSystems.abstractBatchSystem import (BatchSystemSupport, UpdatedBatchJobInfo) @@ -123,7 +124,7 @@ def _runParasol(self, command, autoRetry=True): parasolOutputPattern = re.compile("your job ([0-9]+).*") - def issueBatchJob(self, jobDesc): + def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None): """ Issues parasol with job commands. """ @@ -136,7 +137,7 @@ def issueBatchJob(self, jobDesc): # meams the new job can't ever decrease the memory requirements # of jobs already in the batch. if len(self.resultsFiles) >= self.maxBatches: - raise RuntimeError( 'Number of batches reached limit of %i' % self.maxBatches) + raise RuntimeError('Number of batches reached limit of %i' % self.maxBatches) try: results = self.resultsFiles[(truncatedMemory, jobDesc.cores)] except KeyError: @@ -145,7 +146,7 @@ def issueBatchJob(self, jobDesc): # Prefix the command with environment overrides, optionally looking them up from the # current environment if the value is None - command = ' '.join(concat('env', self.__environment(), jobDesc.command)) + command = ' '.join(concat('env', self.__environment(job_environment), jobDesc.command)) parasolCommand = ['-verbose', '-ram=%i' % jobDesc.memory, '-cpu=%i' % jobDesc.cores, @@ -187,8 +188,12 @@ def setEnv(self, name, value=None): raise ValueError('Parasol does not support spaces in environment variable values.') return super(ParasolBatchSystem, self).setEnv(name, value) - def __environment(self): - return (k + '=' + (os.environ[k] if v is None else v) for k, v in list(self.environment.items())) + def __environment(self, job_environment: Optional[Dict[str, str]] = None): + environment = self.environment.copy() + if job_environment: + environment.update(job_environment) + + return (k + '=' + (os.environ[k] if v is None else v) for k, v in list(environment.items())) def killBatchJobs(self, jobIDs): """Kills the given jobs, represented as Job ids, then checks they are dead by checking @@ -204,8 +209,8 @@ def killBatchJobs(self, jobIDs): runningJobs = self.getIssuedBatchJobIDs() if set(jobIDs).difference(set(runningJobs)) == set(jobIDs): break - logger.warning( 'Tried to kill some jobs, but something happened and they are still ' - 'going, will try againin 5s.') + logger.warning('Tried to kill some jobs, but something happened and they are still ' + 'going, will try again in 5s.') time.sleep(5) # Update the CPU usage, because killed jobs aren't written to the results file. for jobID in jobIDs: @@ -329,7 +334,7 @@ def updatedJobWorker(self): # second. usrTicks = int(usrTicks) sysTicks = int(sysTicks) - wallTime = float( max( 1, usrTicks + sysTicks) ) * 0.01 + wallTime = float(max(1, usrTicks + sysTicks)) * 0.01 else: wallTime = float(endTime - startTime) self.updatedJobsQueue.put(UpdatedBatchJobInfo(jobID=jobId, exitStatus=status, wallTime=wallTime, exitReason=None)) @@ -360,7 +365,6 @@ def shutdown(self): os.remove(results) os.rmdir(self.parasolResultsDir) - @classmethod def setOptions(cls, setOption): from toil.common import iC diff --git a/src/toil/batchSystems/registry.py b/src/toil/batchSystems/registry.py index ab256459d3..c5d98d4a2d 100644 --- a/src/toil/batchSystems/registry.py +++ b/src/toil/batchSystems/registry.py @@ -13,6 +13,12 @@ # limitations under the License. +from typing import Callable, Type, TYPE_CHECKING + +if TYPE_CHECKING: + from toil.batchSystems.abstractBatchSystem import AbstractBatchSystem + + def gridengine_batch_system_factory(): from toil.batchSystems.gridengine import GridEngineBatchSystem return GridEngineBatchSystem @@ -71,3 +77,10 @@ def kubernetes_batch_system_factory(): } BATCH_SYSTEMS = list(BATCH_SYSTEM_FACTORY_REGISTRY.keys()) DEFAULT_BATCH_SYSTEM = 'single_machine' + +def addBatchSystemFactory(key: str, batchSystemFactory: Callable[[], Type['AbstractBatchSystem']]): + """ + Adds a batch system to the registry for workflow-supplied batch systems. + """ + BATCH_SYSTEMS.append(key) + BATCH_SYSTEM_FACTORY_REGISTRY[key] = batchSystemFactory \ No newline at end of file diff --git a/src/toil/batchSystems/singleMachine.py b/src/toil/batchSystems/singleMachine.py index 08845cae0a..082f57d716 100644 --- a/src/toil/batchSystems/singleMachine.py +++ b/src/toil/batchSystems/singleMachine.py @@ -600,8 +600,7 @@ def _handleChild(self, pid: int) -> None: log.debug('Child %d for job %s succeeded', pid, jobID) - - def issueBatchJob(self, jobDesc): + def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None): """Adds the command and resources to a queue to be run.""" self._checkOnDaddy() @@ -621,15 +620,18 @@ def issueBatchJob(self, jobDesc): self.jobIndex += 1 self.jobs[jobID] = jobDesc.command + environment = self.environment.copy() + if job_environment: + environment.update(job_environment) + if self.debugWorker: # Run immediately, blocking for return. # Ignore resource requirements; we run one job at a time - self._runDebugJob(jobDesc.command, jobID, self.environment.copy()) + self._runDebugJob(jobDesc.command, jobID, environment) else: # Queue the job for later self.inputQueue.put((jobDesc.command, jobID, cores, jobDesc.memory, - jobDesc.disk, self.environment.copy())) - + jobDesc.disk, environment)) return jobID diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 1af9e6ea81..dd6141e6c9 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -15,7 +15,7 @@ import math import os from pipes import quote -from typing import List +from typing import List, Dict, Optional from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem from toil.lib.misc import CalledProcessErrorStderr, call_command @@ -52,8 +52,14 @@ def getRunningJobIDs(self): def killJob(self, jobID): call_command(['scancel', self.getBatchSystemID(jobID)]) - def prepareSubmission(self, cpu, memory, jobID, command, jobName): - return self.prepareSbatch(cpu, memory, jobID, jobName) + ['--wrap={}'.format(command)] + def prepareSubmission(self, + cpu: int, + memory: int, + jobID: int, + command: str, + jobName: str, + job_environment: Optional[Dict[str, str]] = None) -> List[str]: + return self.prepareSbatch(cpu, memory, jobID, jobName, job_environment) + ['--wrap={}'.format(command)] def submitJob(self, subLine): try: @@ -159,14 +165,25 @@ def _getJobDetailsFromScontrol(self, slurmJobID): Implementation-specific helper methods """ - def prepareSbatch(self, cpu: int, mem: int, jobID: int, jobName: str) -> List[str]: + def prepareSbatch(self, + cpu: int, + mem: int, + jobID: int, + jobName: str, + job_environment: Optional[Dict[str, str]]) -> List[str]: + # Returns the sbatch command line before the script to run sbatch_line = ['sbatch', '-J', 'toil_job_{}_{}'.format(jobID, jobName)] - if self.boss.environment: + environment = {} + environment.update(self.boss.environment) + if job_environment: + environment.update(job_environment) + + if environment: argList = [] - for k, v in self.boss.environment.items(): + for k, v in environment.items(): quoted_value = quote(os.environ[k] if v is None else v) argList.append('{}={}'.format(k, quoted_value)) diff --git a/src/toil/batchSystems/torque.py b/src/toil/batchSystems/torque.py index fc3ae1f9ba..0f452e67fd 100644 --- a/src/toil/batchSystems/torque.py +++ b/src/toil/batchSystems/torque.py @@ -19,6 +19,7 @@ import time from pipes import quote from queue import Empty +from typing import Optional, List, Dict from toil.batchSystems.abstractGridEngineBatchSystem import (AbstractGridEngineBatchSystem, UpdatedBatchJobInfo) @@ -110,8 +111,14 @@ def getUpdatedBatchJob(self, maxWait): def killJob(self, jobID): call_command(['qdel', self.getBatchSystemID(jobID)]) - def prepareSubmission(self, cpu, memory, jobID, command, jobName): - return self.prepareQsub(cpu, memory, jobID) + [self.generateTorqueWrapper(command, jobID)] + def prepareSubmission(self, + cpu: int, + memory: int, + jobID: int, + command: str, + jobName: str, + job_environment: Optional[Dict[str, str]] = None) -> List[str]: + return self.prepareQsub(cpu, memory, jobID, job_environment) + [self.generateTorqueWrapper(command, jobID)] def submitJob(self, subLine): return call_command(subLine) @@ -143,14 +150,22 @@ def getJobExitCode(self, torqueJobID): """ Implementation-specific helper methods """ - def prepareQsub(self, cpu, mem, jobID): + def prepareQsub(self, + cpu: int, + mem: int, + jobID: int, + job_environment: Optional[Dict[str, str]]) -> List[str]: # TODO: passing $PWD on command line not working for -d, resorting to # $PBS_O_WORKDIR but maybe should fix this here instead of in script? qsubline = ['qsub', '-S', '/bin/sh', '-V', '-N', 'toil_job_{}'.format(jobID)] - if self.boss.environment: + environment = self.boss.environment.copy() + if job_environment: + environment.update(job_environment) + + if environment: qsubline.append('-v') qsubline.append(','.join(k + '=' + quote(os.environ[k] if v is None else v) for k, v in self.boss.environment.items())) @@ -165,23 +180,23 @@ def prepareQsub(self, cpu, mem, jobID): # Other resource requirements can be passed through the environment (see man qsub) reqlineEnv = os.getenv('TOIL_TORQUE_REQS') if reqlineEnv is not None: - logger.debug("Additional Torque resource requirements appended to qsub from "\ - "TOIL_TORQUE_REQS env. variable: {}".format(reqlineEnv)) + logger.debug("Additional Torque resource requirements appended to qsub from " + "TOIL_TORQUE_REQS env. variable: {}".format(reqlineEnv)) if ("mem=" in reqlineEnv) or ("nodes=" in reqlineEnv) or ("ppn=" in reqlineEnv): raise ValueError("Incompatible resource arguments ('mem=', 'nodes=', 'ppn='): {}".format(reqlineEnv)) reqline.append(reqlineEnv) if reqline: - qsubline += ['-l',','.join(reqline)] + qsubline += ['-l', ','.join(reqline)] # All other qsub parameters can be passed through the environment (see man qsub). # No attempt is made to parse them out here and check that they do not conflict # with those that we already constructed above arglineEnv = os.getenv('TOIL_TORQUE_ARGS') if arglineEnv is not None: - logger.debug("Native Torque options appended to qsub from TOIL_TORQUE_ARGS env. variable: {}".\ - format(arglineEnv)) + logger.debug("Native Torque options appended to qsub from TOIL_TORQUE_ARGS env. variable: {}" + .format(arglineEnv)) if ("mem=" in arglineEnv) or ("nodes=" in arglineEnv) or ("ppn=" in arglineEnv): raise ValueError("Incompatible resource arguments ('mem=', 'nodes=', 'ppn='): {}".format(arglineEnv)) qsubline += shlex.split(arglineEnv) diff --git a/src/toil/cwl/__init__.py b/src/toil/cwl/__init__.py index e69de29bb2..15b9720414 100644 --- a/src/toil/cwl/__init__.py +++ b/src/toil/cwl/__init__.py @@ -0,0 +1,37 @@ +# Copyright (C) 2015-2021 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from pkg_resources import get_distribution, DistributionNotFound +from toil.version import cwltool_version + +logger = logging.getLogger(__name__) + + +def check_cwltool_version() -> None: + """ + Check if the installed cwltool version matches Toil's expected version. A + warning is printed if the versions differ. + """ + try: + installed_version = get_distribution("cwltool").version + + if installed_version != cwltool_version: + logger.warning(f"You are using cwltool version {installed_version}, which might not be compatible with " + f"version {cwltool_version} used by Toil. You should consider running 'pip install cwltool==" + f"{cwltool_version}' to match Toil's cwltool version.") + except DistributionNotFound: + logger.warning("cwltool is not installed.") + + +check_cwltool_version() diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index be7828c5b7..cec8d933d2 100755 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -21,6 +21,7 @@ import base64 import copy import datetime +import errno import functools import json import logging @@ -96,6 +97,7 @@ from schema_salad import validate from schema_salad.schema import Names from schema_salad.sourceline import SourceLine +from threading import Thread from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM from toil.common import Config, Toil, addOptions @@ -104,6 +106,8 @@ from toil.fileStores.abstractFileStore import AbstractFileStore from toil.job import Job from toil.jobStores.abstractJobStore import NoSuchFileException +from toil.jobStores.fileJobStore import FileJobStore +from toil.lib.threading import ExceptionalThread from toil.version import baseVersion logger = logging.getLogger(__name__) @@ -578,6 +582,7 @@ def __init__( separateDirs: bool = True, get_file: Union[Any, None] = None, stage_listing: bool = False, + streaming_allowed: bool = True, ): """ Initialize this ToilPathMapper. @@ -587,6 +592,8 @@ def __init__( """ self.get_file = get_file self.stage_listing = stage_listing + self.streaming_allowed = streaming_allowed + super(ToilPathMapper, self).__init__( referenced_files, basedir, stagedir, separateDirs=separateDirs ) @@ -786,7 +793,12 @@ def visit( # If we have access to the Toil file store, we will have a # get_file set, and it will convert this path to a file: # URI for a local file it downloaded. - deref = self.get_file(path) if self.get_file else ab + if self.get_file: + deref = self.get_file( + path, obj.get("streamable", False), self.streaming_allowed + ) + else: + deref = ab if deref.startswith("file:"): deref = schema_salad.ref_resolver.uri_file_path(deref) @@ -876,7 +888,8 @@ def make_path_mapper( runtimeContext.basedir, stagedir, separateDirs, - getattr(runtimeContext, 'toil_get_file', None) # type: ignore + getattr(runtimeContext, 'toil_get_file', None), # type: ignore + streaming_allowed=runtimeContext.streaming_allowed ) @@ -1089,11 +1102,15 @@ def realpath(self, path: str) -> str: path = self._abs(path) return os.path.realpath(path) + def toil_get_file( file_store: AbstractFileStore, index: Dict[str, str], existing: Dict[str, str], - file_store_id: str + file_store_id: str, + streamable: bool = False, + streaming_allowed: bool = True, + pipe_threads: list = None, ) -> str: """ Set up the given file or directory from the Toil jobstore at a file URI @@ -1108,6 +1125,14 @@ def toil_get_file( :param existing: Maps from file_store_id URI to downloaded file path. :param file_store_id: The URI for the file to download. + + :param streamable: If the file is has 'streamable' flag set + + :param streaming_allowed: If streaming is allowed + + :param pipe_threads: List of threads responsible for streaming the data + and open file descriptors corresponding to those files. Caller is resposible + to close the file descriptors (to break the pipes) and join the threads """ if file_store_id.startswith("toildir:"): @@ -1136,7 +1161,41 @@ def toil_get_file( return schema_salad.ref_resolver.file_uri(dest_path) elif file_store_id.startswith("toilfile:"): # This is a plain file with no context. - src_path = file_store.readGlobalFile(FileID.unpack(file_store_id[len("toilfile:"):]), symlink=True) + def write_to_pipe( + file_store: AbstractFileStore, pipe_name: str, file_store_id: FileID + ) -> None: + try: + with open(pipe_name, "wb") as pipe: + with file_store.jobStore.readFileStream(file_store_id) as fi: + file_store.logAccess(file_store_id) + chunk_sz = 1024 + while True: + data = fi.read(chunk_sz) + if not data: + break + pipe.write(data) + except IOError as e: + # The other side of the pipe may have been closed by the + # reading thread, which is OK. + if e.errno != errno.EPIPE: + raise + + if ( + streaming_allowed + and streamable + and not isinstance(file_store.jobStore, FileJobStore) + ): + logger.debug("Streaming file %s", FileID.unpack(file_store_id[len("toilfile:"):])) + src_path = file_store.getLocalTempFileName() + os.mkfifo(src_path) + th = ExceptionalThread( + target=write_to_pipe, args=(file_store, src_path, FileID.unpack(file_store_id[len("toilfile:"):])) + ) + th.start() + pipe_threads.append((th, os.open(src_path, os.O_RDONLY))) + else: + src_path = file_store.readGlobalFile(FileID.unpack(file_store_id[len("toilfile:"):]), symlink=True) + # TODO: shouldn't we be using these as a cache? index[src_path] = file_store_id existing[file_store_id] = src_path @@ -1850,9 +1909,10 @@ def run(self, file_store: AbstractFileStore) -> Any: ToilFsAccess, file_store=file_store ) - runtime_context.toil_get_file = functools.partial( # type: ignore - toil_get_file, file_store, index, existing - ) + pipe_threads = [] + runtime_context.toil_get_file = functools.partial( # type: ignore + toil_get_file, file_store, index, existing, pipe_threads=pipe_threads + ) process_uuid = uuid.uuid4() # noqa F841 started_at = datetime.datetime.now() # noqa F841 @@ -1871,6 +1931,10 @@ def run(self, file_store: AbstractFileStore) -> Any: if status != "success": raise cwltool.errors.WorkflowException(status) + for t, fd in pipe_threads: + os.close(fd) + t.join() + # Get ahold of the filesystem fs_access = runtime_context.make_fs_access(runtime_context.basedir) @@ -2769,6 +2833,13 @@ def main(args: Union[List[str]] = None, stdout: TextIO = sys.stdout) -> int: "paths are accessible in place from all nodes.", dest="bypass_file_store" ) + parser.add_argument( + "--disable-streaming", + action="store_true", + default=False, + help="Disable file streaming for files that have 'streamable' flag True", + dest="disable_streaming", + ) provgroup = parser.add_argument_group( "Options for recording provenance " "information of the execution" @@ -2910,6 +2981,7 @@ def main(args: Union[List[str]] = None, stdout: TextIO = sys.stdout) -> int: runtime_context.workdir = workdir # type: ignore runtime_context.move_outputs = "leave" runtime_context.rm_tmpdir = False + runtime_context.streaming_allowed = not options.disable_streaming if options.mpi_config_file is not None: runtime_context.mpi_config = MpiConfig.load(options.mpi_config_file) runtime_context.bypass_file_store = options.bypass_file_store @@ -3063,6 +3135,14 @@ def set_secondary(fileobj): if shortname(inp["id"]) in initialized_job_order and inp.get("secondaryFiles"): set_secondary(initialized_job_order[shortname(inp["id"])]) + if ( + shortname(inp["id"]) in initialized_job_order + and inp["type"] == "File" + ): + initialized_job_order[shortname(inp["id"])]["streamable"] = inp.get( + "streamable", False + ) + runtime_context.use_container = not options.no_container runtime_context.tmp_outdir_prefix = os.path.realpath(tmp_outdir_prefix) runtime_context.job_script_provider = job_script_provider diff --git a/src/toil/jobStores/googleJobStore.py b/src/toil/jobStores/googleJobStore.py index e30e7145c9..776916d4c2 100644 --- a/src/toil/jobStores/googleJobStore.py +++ b/src/toil/jobStores/googleJobStore.py @@ -41,6 +41,7 @@ GOOGLE_STORAGE = 'gs' +MAX_BATCH_SIZE = 1000 # TODO # - needed to run 'gsutil config' to get 'gs_oauth2_refresh_token' in the boto file @@ -158,17 +159,20 @@ def destroy(self): except exceptions.NotFound: # just return if not connect to physical storage. Needed for idempotency return + try: self.bucket.delete(force=True) # throws ValueError if bucket has more than 256 objects. Then we must delete manually except ValueError: - self.bucket.delete_blobs(self.bucket.list_blobs()) + # use google batching to delete. Improved efficiency compared to deleting sequentially + blobs_to_delete = self.buckets.list_blobs() + count = 0 + while count < len(blobs_to_delete): + with self.storageClient.batch(): + for blob in blobs_to_delete[count:count + MAX_BATCH_SIZE]: + blob.delete() + count = count + MAX_BATCH_SIZE self.bucket.delete() - # if ^ throws a google.cloud.exceptions.Conflict, then we should have a deletion retry mechanism. - - # google freaks out if we call delete multiple times on the bucket obj, so after success - # just set to None. - self.bucket = None def _newJobID(self): return "job"+str(uuid.uuid4()) @@ -179,8 +183,13 @@ def assignID(self, jobDescription): jobStoreID, '' if jobDescription.command is None else jobDescription.command) jobDescription.jobStoreID = jobStoreID + @contextmanager + def batch(self): + # not implemented, google could storage does not support batching for uploading or downloading (2021) + yield + + def create(self, jobDescription): - # TODO: we don't implement batching, but we probably should. jobDescription.pre_update_hook() self._writeBytes(jobDescription.jobStoreID, pickle.dumps(jobDescription, protocol=pickle.HIGHEST_PROTOCOL)) return jobDescription diff --git a/src/toil/leader.py b/src/toil/leader.py index caa3b362b5..e236e49233 100644 --- a/src/toil/leader.py +++ b/src/toil/leader.py @@ -757,8 +757,17 @@ def issueJob(self, jobNode): workerCommand.append(base64.b64encode(pickle.dumps(self.toilState)).decode('utf-8')) jobNode.command = ' '.join(workerCommand) + + omp_threads = os.environ.get('OMP_NUM_THREADS') \ + or str(max(1, int(jobNode.cores))) # make sure OMP_NUM_THREADS is a positive integer + + job_environment = { + # Set the number of cores used by OpenMP applications + 'OMP_NUM_THREADS': omp_threads, + } + # jobBatchSystemID is an int that is an incremented counter for each job - jobBatchSystemID = self.batchSystem.issueBatchJob(jobNode) + jobBatchSystemID = self.batchSystem.issueBatchJob(jobNode, job_environment=job_environment) self.jobBatchSystemIDToIssuedJob[jobBatchSystemID] = jobNode if jobNode.preemptable: # len(jobBatchSystemIDToIssuedJob) should always be greater than or equal to preemptableJobsIssued, diff --git a/src/toil/provisioners/__init__.py b/src/toil/provisioners/__init__.py index 6862cd63c8..5835bc3d69 100644 --- a/src/toil/provisioners/__init__.py +++ b/src/toil/provisioners/__init__.py @@ -123,6 +123,7 @@ def parse_node_types(node_type_specs: Optional[str]) -> List[Tuple[Set[str], Opt return parsed + def check_valid_node_types(provisioner, node_types: List[Tuple[Set[str], Optional[float]]]): """ Raises if an invalid nodeType is specified for aws or gce. @@ -164,6 +165,7 @@ class NoSuchClusterException(Exception): def __init__(self, cluster_name): super(NoSuchClusterException, self).__init__(f"The cluster '{cluster_name}' could not be found") + class ClusterTypeNotSupportedException(Exception): """Indicates that a provisioner does not support a given cluster type.""" def __init__(self, provisioner_class, cluster_type): diff --git a/src/toil/provisioners/abstractProvisioner.py b/src/toil/provisioners/abstractProvisioner.py index 12fac50e67..fbd9df12f2 100644 --- a/src/toil/provisioners/abstractProvisioner.py +++ b/src/toil/provisioners/abstractProvisioner.py @@ -29,6 +29,7 @@ a_short_time = 5 logger = logging.getLogger(__name__) + class ManagedNodesNotSupportedException(RuntimeError): """ Raised when attempting to add managed nodes (which autoscale up and down by @@ -40,6 +41,7 @@ class ManagedNodesNotSupportedException(RuntimeError): """ pass + @total_ordering class Shape(object): """ @@ -52,6 +54,7 @@ class Shape(object): The memory and disk attributes store the number of bytes required by a job (or provided by a node) in RAM or on disk (SSD or HDD), respectively. """ + def __init__(self, wallTime, memory, cores, disk, preemptable): self.wallTime = wallTime self.memory = memory @@ -138,7 +141,7 @@ def __init__(self, clusterName=None, clusterType='mesos', zone=None, nodeStorage if self.clusterType not in self.supportedClusterTypes(): # This isn't actually a cluster type we can do - ClusterTypeNotSupportedException(type(self), clusterType) + raise ClusterTypeNotSupportedException(type(self), clusterType) self._zone = zone self._nodeStorage = nodeStorage @@ -184,6 +187,29 @@ def readClusterSettings(self): """ raise NotImplementedError + def _write_file_to_cloud(self, key: str, contents: bytes) -> str: + """ + Write a file to a physical storage system that is accessible to the + leader and all nodes during the life of the cluster. Additional + resources should be cleaned up in `self.destroyCluster()`. + + :return: A public URL that can be used to retrieve the file. + """ + raise NotImplementedError + + def _read_file_from_cloud(self, key: str) -> bytes: + """ + Return the contents of the file written by `self._write_file_to_cloud()`. + """ + raise NotImplementedError + + def _get_user_data_limit(self) -> int: + """ + Get the maximum number of bytes that can be passed as the user data + during node creation. + """ + raise NotImplementedError + def _setLeaderWorkerAuthentication(self, leader: Node = None): """ Configure authentication between the leader and the workers. @@ -235,8 +261,8 @@ def _setSSH(self, leader: Node = None) -> str: # To work locally or remotely we need to do all our setup work as one # big bash -c command = ['bash', '-c', ('set -e; if [ ! -e /root/.sshSuccess ] ; ' - 'then ssh-keygen -f /root/.ssh/id_rsa -t rsa -N ""; ' - 'touch /root/.sshSuccess; fi; chmod 700 /root/.ssh;')] + 'then ssh-keygen -f /root/.ssh/id_rsa -t rsa -N ""; ' + 'touch /root/.sshSuccess; fi; chmod 700 /root/.ssh;')] if leader is None: # Run locally @@ -258,7 +284,7 @@ def _setSSH(self, leader: Node = None) -> str: leaderPublicKey = f.read() # Drop the key type and keep just the key data - leaderPublicKey=leaderPublicKey.split(' ')[1] + leaderPublicKey = leaderPublicKey.split(' ')[1] # confirm it really is an RSA public key assert leaderPublicKey.startswith('AAAAB3NzaC1yc2E'), leaderPublicKey @@ -384,7 +410,6 @@ def addNodes(self, nodeTypes: Set[str], numNodes, preemptable, spotBid=None): """ raise NotImplementedError - def addManagedNodes(self, nodeTypes: Set[str], minNodes, maxNodes, preemptable, spotBid=None) -> None: """ Add a group of managed nodes of the given type, up to the given maximum. @@ -453,7 +478,6 @@ def destroyCluster(self): """ raise NotImplementedError - class InstanceConfiguration: """ Allows defining the initial setup for an instance and then turning it @@ -521,7 +545,6 @@ def toCloudConfig(self) -> str: # Mark as CloudConfig and serialize as YAML return '#cloud-config\n\n' + yaml.dump(config) - def getBaseInstanceConfiguration(self) -> InstanceConfiguration: """ Get the base configuration for both leader and worker instances for all cluster types. @@ -671,7 +694,7 @@ def addToilService(self, config: InstanceConfiguration, role: str, keyPath: str elif role == 'worker': entryPoint = 'mesos-agent' entryPointArgs = MESOS_LOG_DIR + WORKER_DOCKER_ARGS.format(ip=self._leaderPrivateIP, - preemptable=preemptable) + preemptable=preemptable) else: raise RuntimeError("Unknown role %s" % role) elif self.clusterType == 'kubernetes': @@ -682,7 +705,7 @@ def addToilService(self, config: InstanceConfiguration, role: str, keyPath: str entryPointArgs = 'infinity' else: raise RuntimeError('Toil service not needed for %s nodes in a %s cluster', - role, self.clusterType) + role, self.clusterType) else: raise RuntimeError('Toil service not needed in a %s cluster', self.clusterType) @@ -831,7 +854,6 @@ def addKubernetesServices(self, config: InstanceConfiguration): # Now we should have the kubeadm command, and the bootlooping kubelet # waiting for kubeadm to configure it. - def getKubernetesAutoscalerSetupCommands(self, values: Dict[str, str]) -> str: """ Return Bash commands that set up the Kubernetes cluster autoscaler for @@ -1113,7 +1135,7 @@ def _getCloudConfigUserData(self, role, keyPath=None, preemptable=False): # This involves an SSH public key form the leader config.addSSHRSAKey(self._leaderWorkerAuthentication) elif self.clusterType == 'kubernetes': - # We can install the Kubernetes wotker and make it phone home + # We can install the Kubernetes worker and make it phone home # to the leader. # TODO: this puts sufficient info to fake a malicious worker # into the worker config, which probably is accessible by diff --git a/src/toil/provisioners/aws/awsProvisioner.py b/src/toil/provisioners/aws/awsProvisioner.py index cdb6d85062..47496ed5a0 100644 --- a/src/toil/provisioners/aws/awsProvisioner.py +++ b/src/toil/provisioners/aws/awsProvisioner.py @@ -19,17 +19,18 @@ import time import uuid -import boto3 from botocore.exceptions import ClientError import boto.ec2 -from typing import List, Dict, Any, Optional, Set, Collection +from typing import List, Dict, Optional, Set, Collection from functools import wraps from boto.ec2.blockdevicemapping import BlockDeviceMapping as Boto2BlockDeviceMapping, BlockDeviceType as Boto2BlockDeviceType from boto.exception import BotoServerError, EC2ResponseError from boto.utils import get_instance_metadata from boto.ec2.instance import Instance as Boto2Instance +from toil.lib.aws.utils import create_s3_bucket +from toil.lib.conversions import human2bytes from toil.lib.ec2 import (a_short_time, create_auto_scaling_group, create_instances, @@ -49,7 +50,9 @@ get_error_code, get_error_message, get_error_status, - old_retry) + old_retry, + retry, + ErrorCondition) from toil.provisioners import NoSuchClusterException from toil.provisioners.abstractProvisioner import (AbstractProvisioner, Shape, @@ -71,6 +74,11 @@ # unavailable to jobs when the node comes up? # TODO: measure _STORAGE_ROOT_OVERHEAD_GIGS = 4 +# The maximum length of a S3 bucket +_S3_BUCKET_MAX_NAME_LEN = 63 +# The suffix of the S3 bucket associated with the cluster +_S3_BUCKET_INTERNAL_SUFFIX = '--internal' + def awsRetryPredicate(e): if isinstance(e, socket.gaierror): @@ -89,9 +97,11 @@ def awsRetryPredicate(e): return True return False + def expectedShutdownErrors(e): return get_error_status(e) == 400 and 'dependent object' in get_error_body(e) + def awsRetry(f): """ This decorator retries the wrapped function if aws throws unexpected errors @@ -123,6 +133,7 @@ def awsFilterImpairedNodes(nodes, ec2): 'will not be terminated.', ' '.join(impairedNodes)) return healthyNodes + class InvalidClusterStateException(Exception): pass @@ -140,7 +151,6 @@ def __init__(self, clusterName, clusterType, zone, nodeStorage, nodeStorageOverr 'configuration file, TOIL_AWS_ZONE environment variable, or ' 'on the command line.') - # establish boto3 clients self.session = establish_boto3_session(region_name=zone_to_region(zone)) # Boto3 splits functionality between a "resource" and a "client" for the same AWS aspect. @@ -148,12 +158,16 @@ def __init__(self, clusterName, clusterType, zone, nodeStorage, nodeStorageOverr self.ec2_client = self.session.client('ec2') self.autoscaling_client = self.session.client('autoscaling') self.iam_client = self.session.client('iam') + self.s3_resource = self.session.resource('s3') + self.s3_client = self.session.client('s3') # Call base class constructor, which will call createClusterSettings() # or readClusterSettings() super(AWSProvisioner, self).__init__(clusterName, clusterType, zone, nodeStorage, nodeStorageOverrides) - + # After self.clusterName is set, generate a valid name for the S3 bucket associated with this cluster + suffix = _S3_BUCKET_INTERNAL_SUFFIX + self.s3_bucket_name = self.clusterName[:_S3_BUCKET_MAX_NAME_LEN - len(suffix)] + suffix def supportedClusterTypes(self): return {'mesos', 'kubernetes'} @@ -194,6 +208,55 @@ def readClusterSettings(self): # workers for this leader. self._setLeaderWorkerAuthentication() + @retry(errors=[ErrorCondition( + error=ClientError, + error_codes=[404, 500, 502, 503, 504] + )]) + def _write_file_to_cloud(self, key: str, contents: bytes) -> str: + bucket_name = self.s3_bucket_name + region = zone_to_region(self._zone) + + # create bucket if needed, then write file to S3 + try: + # the head_bucket() call makes sure that the bucket exists and the user can access it + self.s3_client.head_bucket(Bucket=bucket_name) + bucket = self.s3_resource.Bucket(bucket_name) + except ClientError as err: + if err.response.get('ResponseMetadata', {}).get('HTTPStatusCode') == 404: + bucket = create_s3_bucket(self.s3_resource, bucket_name=bucket_name, region=region) + bucket.wait_until_exists() + bucket.Versioning().enable() + + owner_tag = os.environ.get('TOIL_OWNER_TAG') + if owner_tag: + bucket_tagging = self.s3_resource.BucketTagging(bucket_name) + bucket_tagging.put(Tagging={'TagSet': [{'Key': 'Owner', 'Value': owner_tag}]}) + else: + raise + + # write file to bucket + logger.debug(f'Writing "{key}" to bucket "{bucket_name}"...') + obj = bucket.Object(key=key) + obj.put(Body=contents) + + obj.wait_until_exists() + return f's3://{bucket_name}/{key}' + + def _read_file_from_cloud(self, key: str) -> bytes: + bucket_name = self.s3_bucket_name + obj = self.s3_resource.Object(bucket_name, key) + + try: + return obj.get().get('Body').read() + except ClientError as e: + if e.response.get('ResponseMetadata', {}).get('HTTPStatusCode') == 404: + logger.warning(f'Trying to read non-existent file "{key}" from {bucket_name}.') + raise + + def _get_user_data_limit(self) -> int: + # See: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-add-user-data.html + return human2bytes('16KB') + def launchCluster(self, leaderNodeType: str, leaderStorage: int, @@ -227,7 +290,6 @@ def launchCluster(self, raise RuntimeError('Kubernetes requires 2 or more cores, and %s is too small' % leaderNodeType) - self._keyName = keyName # This is where we put the leader self._vpcSubnet = vpcSubnet @@ -335,9 +397,9 @@ def getNodeShape(self, instance_type: str, preemptable=False) -> Shape: # the root volume disk = self._nodeStorageOverrides.get(instance_type, self._nodeStorage) * 2 ** 30 - #Underestimate memory by 100M to prevent autoscaler from disagreeing with - #mesos about whether a job can run on a particular node type - memory = (type_info.memory - 0.1) * 2** 30 + # Underestimate memory by 100M to prevent autoscaler from disagreeing with + # mesos about whether a job can run on a particular node type + memory = (type_info.memory - 0.1) * 2 ** 30 return Shape(wallTime=60 * 60, memory=memory, cores=type_info.cores, @@ -376,7 +438,7 @@ def destroyCluster(self): self.autoscaling_client.delete_auto_scaling_group(AutoScalingGroupName=asgName, ForceDelete=True) removed = True if removed: - logger.debug('... Succesfully deleted autoscaling groups') + logger.debug('... Successfully deleted autoscaling groups') # Do the workers after the ASGs because some may belong to ASGs logger.info('Terminating any remaining workers ...') @@ -392,7 +454,7 @@ def destroyCluster(self): self._terminateInstances(instancesToTerminate) removed = True if removed: - logger.debug('... Succesfully terminated workers') + logger.debug('... Successfully terminated workers') logger.info('Deleting launch templates ...') removed = False @@ -411,8 +473,7 @@ def destroyCluster(self): # We missed something removed = False if removed: - logger.debug('... Succesfully deleted launch templates') - + logger.debug('... Successfully deleted launch templates') if len(instances) == len(instancesToTerminate): # All nodes are gone now. @@ -436,7 +497,7 @@ def destroyCluster(self): else: raise if removed: - logger.debug('... Succesfully deleted security group') + logger.debug('... Successfully deleted security group') else: assert len(instances) > len(instancesToTerminate) # the security group can't be deleted until all nodes are terminated @@ -444,6 +505,28 @@ def destroyCluster(self): 'have failed health checks. As a result, the security group & IAM ' 'roles will not be deleted.') + # delete S3 buckets that might have been created by `self._write_file_to_cloud()` + logger.info('Deleting S3 buckets ...') + removed = False + for attempt in old_retry(timeout=300, predicate=awsRetryPredicate): + with attempt: + try: + bucket = self.s3_resource.Bucket(self.s3_bucket_name) + + bucket.objects.all().delete() + bucket.object_versions.delete() + bucket.delete() + removed = True + except self.s3_client.exceptions.NoSuchBucket: + pass + except ClientError as e: + if e.response.get('ResponseMetadata', {}).get('HTTPStatusCode') == 404: + pass + else: + raise # retry this + if removed: + print('... Successfully deleted S3 buckets') + def terminateNodes(self, nodes : List[Node]): self._terminateIDs([x.name for x in nodes]) @@ -638,7 +721,6 @@ def _getLeaderInstance(self) -> Boto2Instance: ) return leader - def getLeader(self, wait=False) -> Node: """ Get the leader for the cluster as a Toil Node object. @@ -725,7 +807,7 @@ def _deleteRoles(self, names: List[str]): for attempt in old_retry(timeout=300, predicate=expectedShutdownErrors): with attempt: self.iam_client.delete_role(RoleName=role_name) - logger.debug('... Succesfully deleted IAM role %s', role_name) + logger.debug('... Successfully deleted IAM role %s', role_name) @awsRetry @@ -785,7 +867,7 @@ def _getBoto3BlockDeviceMappings(cls, type_info: InstanceType, rootVolSize: int # virtual block device in the VM bdms.append({ 'DeviceName': bdtKeys[disk], - 'VirtualName': 'ephemeral{}'.format(disk - 1) # ephemeral counts start at 0 + 'VirtualName': 'ephemeral{}'.format(disk - 1) # ephemeral counts start at 0 }) logger.debug('Device mapping: %s', bdms) return bdms @@ -837,7 +919,7 @@ def groupNotFound(e): # security group create/get. ssh + all ports open within the group try: web = self._boto2.ec2.create_security_group(self.clusterName, - 'Toil appliance security group', vpc_id=vpcId) + 'Toil appliance security group', vpc_id=vpcId) except EC2ResponseError as e: if e.status == 400 and 'already exists' in e.body: pass # group exists- nothing to do @@ -885,7 +967,6 @@ def _get_launch_template_ids(self, filters: Optional[List[Dict[str, List[str]]]] Returns a list of launch template IDs. """ - # How do we match the right templates? combined_filters = [{'Name': 'tag:' + _TAG_KEY_TOIL_CLUSTER_NAME, 'Values': [self.clusterName]}] @@ -943,7 +1024,7 @@ def _get_worker_launch_template(self, instance_type: str, preemptable: bool = Fa if len(templates) > 1: # There shouldn't ever be multiple templates with our reserved name raise RuntimeError(f"Multiple launch templates already exist named {lt_name}; " - "something else is operating in our cluster namespace.") + "something else is operating in our cluster namespace.") elif len(templates) == 0: # Template doesn't exist so we can create it. try: @@ -961,7 +1042,6 @@ def _get_worker_launch_template(self, instance_type: str, preemptable: bool = Fa # There must be exactly one template return templates[0] - def _name_worker_launch_template(self, instance_type: str, preemptable: bool = False) -> str: """ Get the name we should use for the launch template with the given parameters. @@ -1229,7 +1309,6 @@ def _getRoleInlinePolicyNames(self, role_name: str) -> List[str]: return allPolicies - def full_policy(self, resource: str) -> dict: """ Produce a dict describing the JSON form of a full-access-granting AWS @@ -1329,14 +1408,14 @@ def _createProfileArn(self) -> str: profile_arn = profile.arn if len(profile.roles) > 1: - raise RuntimeError('Did not expect profile to contain more than one role') + raise RuntimeError('Did not expect profile to contain more than one role') elif len(profile.roles) == 1: # this should be profile.roles[0].role_name if profile.roles.member.role_name == iamRoleName: return profile_arn else: self._boto2.iam.remove_role_from_instance_profile(iamRoleName, - profile.roles.member.role_name) + profile.roles.member.role_name) for attempt in old_retry(predicate=lambda err: err.status == 404): with attempt: self._boto2.iam.add_role_to_instance_profile(iamRoleName, iamRoleName) diff --git a/src/toil/provisioners/gceProvisioner.py b/src/toil/provisioners/gceProvisioner.py index dc8a58529c..5f241c3b65 100644 --- a/src/toil/provisioners/gceProvisioner.py +++ b/src/toil/provisioners/gceProvisioner.py @@ -17,6 +17,7 @@ import threading import time import uuid +from typing import Set, Optional import requests from libcloud.compute.drivers.gce import GCEFailedNode @@ -24,6 +25,7 @@ from libcloud.compute.types import Provider from toil.jobStores.googleJobStore import GoogleJobStore +from toil.lib.conversions import human2bytes from toil.provisioners import NoSuchClusterException from toil.provisioners.abstractProvisioner import AbstractProvisioner, Shape from toil.provisioners.node import Node @@ -37,8 +39,8 @@ class GCEProvisioner(AbstractProvisioner): Implements a Google Compute Engine Provisioner using libcloud. """ - NODE_BOTO_PATH = "/root/.boto" # boto file path on instances - SOURCE_IMAGE = (b'projects/flatcar-cloud/global/images/family/flatcar-stable') + NODE_BOTO_PATH = "/root/.boto" # boto file path on instances + SOURCE_IMAGE = b'projects/flatcar-cloud/global/images/family/flatcar-stable' def __init__(self, clusterName, clusterType, zone, nodeStorage, nodeStorageOverrides, sseKey): self.cloud = 'gce' @@ -109,9 +111,15 @@ def _readCredentials(self): self._projectId = self.googleConnectionParams['project_id'] self._clientEmail = self.googleConnectionParams['client_email'] self._credentialsPath = self._googleJson - self._clearLeaderWorkerAuthentication() # TODO: Why are we doing this? + self._clearLeaderWorkerAuthentication() # TODO: Why are we doing this? self._gceDriver = self._getDriver() + def _write_file_to_cloud(self, key: str, contents: bytes) -> str: + raise NotImplementedError("The gceProvisioner doesn't support _write_file_to_cloud().") + + def _get_user_data_limit(self) -> int: + # See: https://cloud.google.com/compute/docs/metadata/setting-custom-metadata#limitations + return human2bytes('256KB') def launchCluster(self, leaderNodeType, leaderStorage, owner, **kwargs): """ @@ -145,10 +153,13 @@ def launchCluster(self, leaderNodeType, leaderStorage, owner, **kwargs): disk = {} disk['initializeParams'] = { 'sourceImage': self.SOURCE_IMAGE, - 'diskSizeGb' : leaderStorage } - disk.update({'boot': True, - 'autoDelete': True }) - name= 'l' + str(uuid.uuid4()) + 'diskSizeGb': leaderStorage + } + disk.update({ + 'boot': True, + 'autoDelete': True + }) + name = 'l' + str(uuid.uuid4()) leader = self._gceDriver.create_node( name, @@ -164,8 +175,8 @@ def launchCluster(self, leaderNodeType, leaderStorage, owner, **kwargs): ) self._instanceGroup.add_instances([leader]) - self._leaderPrivateIP = leader.private_ips[0] # needed if adding workers - #self.subnetID = leader.subnet_id #TODO: get subnetID + self._leaderPrivateIP = leader.private_ips[0] # needed if adding workers + # self.subnetID = leader.subnet_id # TODO: get subnetID # Wait for the appliance to start and inject credentials. leaderNode = Node(publicIP=leader.public_ips[0], privateIP=leader.private_ips[0], @@ -188,16 +199,16 @@ def getNodeShape(self, instance_type: str, preemptable=False): assert len(sizes) == 1 instanceType = sizes[0] - disk = 0 #instanceType.disks * instanceType.disk_capacity * 2 ** 30 + disk = 0 # instanceType.disks * instanceType.disk_capacity * 2 ** 30 if disk == 0: # This is an EBS-backed instance. We will use the root # volume, so add the amount of EBS storage requested forhe root volume disk = self._nodeStorageOverrides.get(instance_type, self._nodeStorage) * 2 ** 30 # Ram is in M. - #Underestimate memory by 100M to prevent autoscaler from disagreeing with - #mesos about whether a job can run on a particular node type - memory = (instanceType.ram/1000 - 0.1) * 2** 30 + # Underestimate memory by 100M to prevent autoscaler from disagreeing with + # mesos about whether a job can run on a particular node type + memory = (instanceType.ram/1000 - 0.1) * 2 ** 30 return Shape(wallTime=60 * 60, memory=memory, cores=instanceType.extra['guestCpus'], @@ -233,7 +244,7 @@ def terminateNodes(self, nodes): def addNodes(self, nodeTypes: Set[str], numNodes, preemptable, spotBid=None): assert self._leaderPrivateIP - + # We don't support any balancing here so just pick one of the # equivalent node types node_type = next(iter(nodeTypes)) @@ -255,17 +266,19 @@ def addNodes(self, nodeTypes: Set[str], numNodes, preemptable, spotBid=None): else: logger.debug('Launching %s preemptable nodes', numNodes) - #kwargs["subnet_id"] = self.subnetID if self.subnetID else self._getClusterInstance(self.instanceMetaData).subnet_id - userData = self._getCloudConfigUserData('worker', keyPath, preemptable) + # kwargs["subnet_id"] = self.subnetID if self.subnetID else self._getClusterInstance(self.instanceMetaData).subnet_id + userData = self._getCloudConfigUserData('worker', keyPath, preemptable) metadata = {'items': [{'key': 'user-data', 'value': userData}]} imageType = 'flatcar-stable' sa_scopes = [{'scopes': ['compute', 'storage-full']}] disk = {} disk['initializeParams'] = { 'sourceImage': self.SOURCE_IMAGE, - 'diskSizeGb' : self._nodeStorageOverrides.get(node_type, self._nodeStorage) } - disk.update({'boot': True, - 'autoDelete': True }) + 'diskSizeGb': self._nodeStorageOverrides.get(node_type, self._nodeStorage) } + disk.update({ + 'boot': True, + 'autoDelete': True + }) # TODO: # - bug in gce.py for ex_create_multiple_nodes (erroneously, doesn't allow image and disk to specified) @@ -281,9 +294,9 @@ def addNodes(self, nodeTypes: Set[str], numNodes, preemptable, spotBid=None): location=self._zone, ex_service_accounts=sa_scopes, ex_metadata=metadata, - ex_disks_gce_struct = [disk], + ex_disks_gce_struct=[disk], description=self._tags, - ex_preemptible = preemptable + ex_preemptible=preemptable ) failedWorkers = [] for instance in instancesLaunched: @@ -294,7 +307,7 @@ def addNodes(self, nodeTypes: Set[str], numNodes, preemptable, spotBid=None): node = Node(publicIP=instance.public_ips[0], privateIP=instance.private_ips[0], name=instance.name, launchTime=instance.created_at, nodeType=instance.size, - preemptable=False, tags=self._tags) #FIXME: what should tags be set to? + preemptable=False, tags=self._tags) # FIXME: what should tags be set to? try: self._injectWorkerFiles(node, botoExists) logger.debug("Created worker %s" % node.publicIP) @@ -328,7 +341,7 @@ def getProvisionedWorkers(self, instance_type: Optional[str] = None, preemptable for ip in instance.private_ips: if ip == self._leaderPrivateIP: isWorker = False - break # don't include the leader + break # don't include the leader if isWorker and instance.state == 'running': workerInstances.append(instance) @@ -346,8 +359,8 @@ def getLeader(self): except IndexError: raise NoSuchClusterException(self.clusterName) return Node(publicIP=leader.public_ips[0], privateIP=leader.private_ips[0], - name=leader.name, launchTime=leader.created_at, nodeType=leader.size, - preemptable=False, tags=None) + name=leader.name, launchTime=leader.created_at, nodeType=leader.size, + preemptable=False, tags=None) def _injectWorkerFiles(self, node, botoExists): """ @@ -393,6 +406,7 @@ def worker(driver, instance): # MONKEY PATCH - This function was copied form libcloud to fix a bug. DEFAULT_TASK_COMPLETION_TIMEOUT = 180 + def ex_create_multiple_nodes( self, base_name, size, image, number, location=None, ex_network='default', ex_subnetwork=None, ex_tags=None, @@ -428,8 +442,7 @@ def ex_create_multiple_nodes( if ex_subnetwork and not hasattr(ex_subnetwork, 'name'): ex_subnetwork = \ driver.ex_get_subnetwork(ex_subnetwork, - region=driver._get_region_from_zone( - location)) + region=driver._get_region_from_zone(location)) if ex_image_family: image = driver.ex_get_image_from_family(ex_image_family) if image and not hasattr(image, 'name'): @@ -462,14 +475,14 @@ def ex_create_multiple_nodes( for i in range(number): name = 'wp' if ex_preemptible else 'wn' - name += str(uuid.uuid4()) #'%s-%03d' % (base_name, i) + name += str(uuid.uuid4()) # '%s-%03d' % (base_name, i) status = {'name': name, 'node_response': None, 'node': None} status_list.append(status) start_time = time.time() complete = False while not complete: - if (time.time() - start_time >= timeout): + if time.time() - start_time >= timeout: raise Exception("Timeout (%s sec) while waiting for multiple " "instances") complete = True diff --git a/src/toil/test/batchSystems/batchSystemTest.py b/src/toil/test/batchSystems/batchSystemTest.py index d66940da5f..ba4b2a7d28 100644 --- a/src/toil/test/batchSystems/batchSystemTest.py +++ b/src/toil/test/batchSystems/batchSystemTest.py @@ -24,7 +24,6 @@ from fractions import Fraction from inspect import getsource from textwrap import dedent -from typing import Callable from unittest import skipIf from toil.batchSystems.abstractBatchSystem import (AbstractBatchSystem, @@ -35,9 +34,13 @@ # protected by annotations. from toil.batchSystems.mesos.test import MesosTestSupport from toil.batchSystems.parasol import ParasolBatchSystem +from toil.batchSystems.registry import (BATCH_SYSTEM_FACTORY_REGISTRY, + BATCH_SYSTEMS, + single_machine_batch_system_factory, + addBatchSystemFactory) from toil.test.batchSystems.parasolTestSupport import ParasolTestSupport from toil.batchSystems.singleMachine import SingleMachineBatchSystem -from toil.common import Config +from toil.common import Config, Toil from toil.job import Job, JobDescription from toil.lib.threading import cpu_count from toil.lib.retry import retry_flaky_test @@ -145,11 +148,11 @@ def tearDown(self): self.batchSystem.shutdown() super(hidden.AbstractBatchSystemTest, self).tearDown() - def testAvailableCores(self): + def test_available_cores(self): self.assertTrue(cpu_count() >= numCores) @retry_flaky_test() - def testRunJobs(self): + def test_run_jobs(self): jobDesc1 = self._mockJobDescription(command='sleep 1000', jobName='test1', unitName=None, jobStoreID='1', requirements=defaultRequirements) jobDesc2 = self._mockJobDescription(command='sleep 1000', jobName='test2', unitName=None, @@ -208,7 +211,7 @@ def testRunJobs(self): # Make sure killBatchJobs can handle jobs that don't exist self.batchSystem.killBatchJobs([10]) - def testSetEnv(self): + def test_set_env(self): # Parasol disobeys shell rules and splits the command at the space # character into arguments before exec'ing it, whether the space is # quoted, escaped or not. @@ -237,6 +240,28 @@ def testSetEnv(self): self.assertEqual(jobUpdateInfo.exitStatus, 23) self.assertEqual(jobUpdateInfo.jobID, job5) + def test_set_job_env(self): + """ Test the mechanism for setting per-job environment variables to batch system jobs.""" + script = 'if [ "x${FOO}" == "xbar" ] ; then exit 23 ; else exit 42 ; fi' + command = "bash -c \"\\${@}\" bash eval " + script.replace(';', r'\;') + + # Issue a job with a job environment variable + job_desc_6 = self._mockJobDescription(command=command, jobName='test6', unitName=None, + jobStoreID='6', requirements=defaultRequirements) + job6 = self.batchSystem.issueBatchJob(job_desc_6, job_environment={ + 'FOO': 'bar' + }) + job_update_info = self.batchSystem.getUpdatedBatchJob(maxWait=1000) + self.assertEqual(job_update_info.exitStatus, 23) # this should succeed + self.assertEqual(job_update_info.jobID, job6) + # Now check that the environment variable doesn't exist for other jobs + job_desc_7 = self._mockJobDescription(command=command, jobName='test7', unitName=None, + jobStoreID='7', requirements=defaultRequirements) + job7 = self.batchSystem.issueBatchJob(job_desc_7) + job_update_info = self.batchSystem.getUpdatedBatchJob(maxWait=1000) + self.assertEqual(job_update_info.exitStatus, 42) + self.assertEqual(job_update_info.jobID, job7) + def testCheckResourceRequest(self): if isinstance(self.batchSystem, BatchSystemSupport): checkResourceRequest = self.batchSystem.checkResourceRequest @@ -286,6 +311,14 @@ def _waitForJobsToStart(self, numJobs, tries=20): time.sleep(1) return runningIDs + def testAddBatchSystemFactory(self): + def test_batch_system_factory(): + return SingleMachineBatchSystem + + addBatchSystemFactory('testBatchSystem', test_batch_system_factory) + assert ('testBatchSystem', test_batch_system_factory) in BATCH_SYSTEM_FACTORY_REGISTRY.items() + assert 'testBatchSystem' in BATCH_SYSTEMS + class AbstractBatchSystemJobTest(ToilTest, metaclass=ABCMeta): """ An abstract base class for batch system tests that use a full Toil workflow rather @@ -341,10 +374,33 @@ def testJobConcurrency(self): for _ in range(self.cpuCount): root.addFollowOn(Job.wrapFn(measureConcurrency, counterPath, self.sleepTime, cores=coresPerJob, memory='1M', disk='1Mi')) - Job.Runner.startToil(root, options) + with Toil(options) as toil: + toil.start(root) _, maxValue = getCounters(counterPath) self.assertEqual(maxValue, self.cpuCount // coresPerJob) + def test_omp_threads(self): + """ + Test if the OMP_NUM_THREADS env var is set correctly based on jobs.cores. + """ + test_cases = { + # mapping of the number of cores to the OMP_NUM_THREADS value + 0.1: "1", + 1: "1", + 2: "2" + } + + temp_dir = self._createTempDir() + options = self.getOptions(temp_dir) + + for cores, expected_omp_threads in test_cases.items(): + if os.environ.get('OMP_NUM_THREADS'): + expected_omp_threads = os.environ.get('OMP_NUM_THREADS') + logger.info(f"OMP_NUM_THREADS is set. Using OMP_NUM_THREADS={expected_omp_threads} instead.") + with Toil(options) as toil: + output = toil.start(Job.wrapFn(get_omp_threads, memory='1Mi', cores=cores, disk='1Mi')) + self.assertEqual(output, expected_omp_threads) + class AbstractGridEngineBatchSystemTest(AbstractBatchSystemTest): """ An abstract class to reduce redundancy between Grid Engine, Slurm, and other similar batch @@ -374,6 +430,7 @@ def createBatchSystem(self): return KubernetesBatchSystem(config=self.config, maxCores=numCores, maxMemory=1e9, maxDisk=2001) + @slow @needs_mesos class MesosBatchSystemTest(hidden.AbstractBatchSystemTest, MesosTestSupport): @@ -381,6 +438,7 @@ class MesosBatchSystemTest(hidden.AbstractBatchSystemTest, MesosTestSupport): Tests against the Mesos batch system """ + @classmethod def createConfig(cls): """ needs to set mesosMasterAddress to localhost for testing since the default is now the @@ -418,6 +476,7 @@ def testIgnoreNode(self): # Make sure job is NOT running self.assertEqual(set(runningJobIDs), set({})) + def write_temp_file(s: str, temp_dir: str) -> str: """ Dump a string into a temp file and return its path. @@ -434,13 +493,14 @@ def write_temp_file(s: str, temp_dir: str) -> str: finally: os.close(fd) + @travis_test class SingleMachineBatchSystemTest(hidden.AbstractBatchSystemTest): """ Tests against the single-machine batch system """ - def supportsWallTime(self) -> None: + def supportsWallTime(self) -> bool: return True def createBatchSystem(self) -> AbstractBatchSystem: @@ -876,6 +936,7 @@ def tearDown(self): for f in glob('toil_job_*.[oe]*'): os.unlink(f) + @slow @needs_htcondor class HTCondorBatchSystemTest(hidden.AbstractGridEngineBatchSystemTest): @@ -886,11 +947,12 @@ class HTCondorBatchSystemTest(hidden.AbstractGridEngineBatchSystemTest): def createBatchSystem(self) -> AbstractBatchSystem: from toil.batchSystems.htcondor import HTCondorBatchSystem return HTCondorBatchSystem(config=self.config, maxCores=numCores, maxMemory=1000e9, - maxDisk=1e9) + maxDisk=1e9) def tearDown(self): super(HTCondorBatchSystemTest, self).tearDown() + @travis_test class SingleMachineBatchSystemJobTest(hidden.AbstractBatchSystemJobTest): """ @@ -930,7 +992,7 @@ def testConcurrencyWithDisk(self): # Physically, we're asking for 50% of disk and 50% of disk + 500bytes in the two jobs. The # batchsystem should not allow the 2 child jobs to run concurrently. root.addChild(Job.wrapFn(measureConcurrency, counterPath, self.sleepTime, cores=1, - memory='1M', disk=half_disk)) + memory='1M', disk=half_disk)) root.addChild(Job.wrapFn(measureConcurrency, counterPath, self.sleepTime, cores=1, memory='1M', disk=more_than_half_disk)) Job.Runner.startToil(root, options) @@ -1100,3 +1162,7 @@ def resetCounters(path): with open(path, "w") as f: f.write("0,0") f.close() + + +def get_omp_threads() -> str: + return os.environ['OMP_NUM_THREADS'] diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index 219f0c7013..5be716bdf5 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -559,6 +559,49 @@ def test_kubernetes_cwl_conformance(self, **kwargs): def test_kubernetes_cwl_conformance_with_caching(self): return self.test_kubernetes_cwl_conformance(caching=True) + def _expected_streaming_output(self, outDir): + # Having unicode string literals isn't necessary for the assertion but + # makes for a less noisy diff in case the assertion fails. + loc = "file://" + os.path.join(outDir, "output.txt") + return { + "output": { + "location": loc, + "basename": "output.txt", + "size": 24, + "class": "File", + "checksum": "sha1$d14dd02e354918b4776b941d154c18ebc15b9b38", + } + } + + @needs_aws_s3 + def test_streamable(self): + """ + Test that a file with 'streamable'=True is a named pipe + """ + cwlfile = "src/toil/test/cwl/stream.cwl" + jobfile = "src/toil/test/cwl/stream.json" + out_name = "output" + jobstore = f'--jobStore=aws:us-west-1:toil-stream-{uuid.uuid4()}' + from toil.cwl import cwltoil + + st = StringIO() + args = [ + "--outdir", + self.outDir, + jobstore, + os.path.join(self.rootDir, cwlfile), + os.path.join(self.rootDir, jobfile), + ] + cwltoil.main(args, stdout=st) + out = json.loads(st.getvalue()) + out[out_name].pop("http://commonwl.org/cwltool#generation", None) + out[out_name].pop("nameext", None) + out[out_name].pop("nameroot", None) + self.assertEqual(out, self._expected_streaming_output(self.outDir)) + with open(out[out_name]["location"][len("file://") :], "r") as f: + self.assertEqual(f.read().strip(), "When is s4 coming out?") + + @needs_cwl class CWLSmallTests(ToilTest): def test_usage_message(self): diff --git a/src/toil/test/cwl/stream.cwl b/src/toil/test/cwl/stream.cwl new file mode 100644 index 0000000000..a5efd7e278 --- /dev/null +++ b/src/toil/test/cwl/stream.cwl @@ -0,0 +1,43 @@ +# Example demonstrating files with flag 'streamable'=True are named pipes +class: Workflow +doc: "Stream a file" +cwlVersion: v1.2 + +inputs: + input: + type: File + streamable: true + +outputs: + output: + type: File + outputSource: cat/output + +steps: + test: + run: + class: CommandLineTool + baseCommand: ["test","-p"] + arguments: [$(inputs.f)] + outputs: [] + inputs: + f: File + out: [] + in: + f: + source: input + cat: + run: + class: CommandLineTool + baseCommand: ["cat"] + arguments: [$(inputs.f)] + outputs: + output: + type: stdout + inputs: + f: File + stdout: output.txt + out: [output] + in: + f: + source: input diff --git a/src/toil/test/cwl/stream.json b/src/toil/test/cwl/stream.json new file mode 100644 index 0000000000..18157fc51f --- /dev/null +++ b/src/toil/test/cwl/stream.json @@ -0,0 +1,6 @@ +{ + "input": { + "class": "File", + "location": "s3://toil-cwl-infra-test-bucket-dont-delete/whale.txt.changedOnPorpoise" + } +} diff --git a/src/toil/test/provisioners/aws/awsProvisionerTest.py b/src/toil/test/provisioners/aws/awsProvisionerTest.py index e1d1608b66..47cbc7612f 100644 --- a/src/toil/test/provisioners/aws/awsProvisionerTest.py +++ b/src/toil/test/provisioners/aws/awsProvisionerTest.py @@ -36,6 +36,7 @@ log = logging.getLogger(__name__) + class AWSProvisionerBenchTest(ToilTest): """ Tests for the AWS provisioner that don't actually provision anything. @@ -43,13 +44,33 @@ class AWSProvisionerBenchTest(ToilTest): # Needs to talk to EC2 for image discovery @needs_aws_ec2 - def testAMIFinding(self): + def test_AMI_finding(self): for zone in ['us-west-2a', 'eu-central-1a', 'sa-east-1b']: provisioner = AWSProvisioner('fakename', 'mesos', zone, 10000, None, None) ami = provisioner._discoverAMI() # Make sure we got an AMI and it looks plausible assert(ami.startswith('ami-')) + @needs_aws_ec2 + def test_read_write_global_files(self): + """ + Make sure the `_write_file_to_cloud()` and `_read_file_from_cloud()` + functions of the AWS provisioner work as intended. + """ + provisioner = AWSProvisioner(f'aws-provisioner-test-{uuid4()}', 'mesos', 'us-west-2a', 50, None, None) + key = 'config/test.txt' + contents = "Hello, this is a test.".encode('utf-8') + + try: + url = provisioner._write_file_to_cloud(key, contents=contents) + self.assertTrue(url.startswith("s3://")) + + self.assertEqual(contents, provisioner._read_file_from_cloud(key)) + finally: + # the cluster was never launched, but we need to clean up the s3 bucket + provisioner.destroyCluster() + + @needs_aws_ec2 @needs_fetchable_appliance @slow @@ -148,7 +169,7 @@ def sshUtil(self, command): running = True while running: # While the process is running, see if it stopped - running = (p.poll() == None) + running = (p.poll() is None) # Also collect its output out_data = p.stdout.read() @@ -184,7 +205,7 @@ def sshUtil(self, command): if out_buffer: log.info('STDOUT: %s', out_buffer.decode('utf-8', errors='ignore')) if err_buffer: - log.info('STDOUT: %s', err_buffer.decode('utf-8', errors='ignore')) + log.info('STDERR: %s', err_buffer.decode('utf-8', errors='ignore')) if p.returncode != 0: # It failed @@ -422,6 +443,7 @@ def _runScript(self, toilOptions): runCommand.extend(toilOptions) self.sshUtil(runCommand) + @integrative @pytest.mark.timeout(1200) class AWSManagedAutoscaleTest(AWSAutoscaleTest): @@ -450,7 +472,6 @@ def _runScript(self, toilOptions): self.sshUtil(runCommand) - @integrative @pytest.mark.timeout(1200) class AWSAutoscaleTestMultipleNodeTypes(AbstractAWSAutoscaleTest): @@ -471,7 +492,7 @@ def _getScript(self): os.unlink(sseKeyFile) def _runScript(self, toilOptions): - #Set memory requirements so that sort jobs can be run + # Set memory requirements so that sort jobs can be run # on small instances, but merge jobs must be run on large # instances toilOptions.extend(['--provisioner=aws', '--batchSystem=mesos', diff --git a/version_template.py b/version_template.py index 967d9cf874..1af562c94a 100644 --- a/version_template.py +++ b/version_template.py @@ -88,7 +88,7 @@ def dockerTag(): """The primary tag of the Docker image for the appliance. This uniquely identifies the appliance image.""" return version() + _pythonVersionSuffix() - + def currentCommit(): import os from subprocess import check_output @@ -127,12 +127,23 @@ def dirty(): return False # In case the git call fails. -def expand_(name=None): +def expand_(name=None, others=None): + """ + Returns a string of all the globals and additional variables passed as the + others keyword argument. + + :param str name: If set, only the value of the given symbol is returned. + :param dict others: A dictionary of additional variables to be included in + the return value. + """ variables = {k: v for k, v in globals().items() if not k.startswith('_') and not k.endswith('_')} + if others is not None: + variables.update(others) + def resolve(k): - v = variables[k] + v = variables.get(k, None) if callable(v): v = v() return v