Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into jobstack
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-c committed Aug 25, 2021
2 parents a4ac602 + 2cc02d3 commit c932df0
Show file tree
Hide file tree
Showing 31 changed files with 729 additions and 169 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ To be copied to the [draft changelog](https://github.com/DataBiosphere/toil/wiki
* [ ] New functions without [type hints](https://docs.python.org/3/library/typing.html).
* [ ] New functions or classes without informative docstrings.
* [ ] Changes to semantics not reflected in the relevant docstrings.
* [ ] New or changed command line options for Toil workflows that are not reflected in `docs/running/cliOptions.rst`
* [ ] New or changed command line options for Toil workflows that are not reflected in `docs/running/{cliOptions,cwl,wdl}.rst`
* [ ] New features without tests.
* [ ] Comment on the lines of code where problems exist with a review comment. You can shift-click the line numbers in the diff to select multiple lines.
* [ ] Finish the review with an overall description of your opinion.
Expand Down
4 changes: 4 additions & 0 deletions docs/appendices/environment_vars.rst
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,9 @@ There are several environment variables that affect the way Toil runs.
| | in the Toil Docker container to use as a mirror |
| | for Docker Hub. |
+----------------------------------+----------------------------------------------------+
| OMP_NUM_THREADS | The number of cores set for OpenMP applications in |
| | the workers. If not set, Toil will use the number |
| | of job threads. |
+----------------------------------+----------------------------------------------------+

.. _standard temporary directory: https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir
4 changes: 4 additions & 0 deletions docs/running/cwl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ printed to the stdout stream after workflow execution.
``--stats``: Save resources usages in json files that can be collected with the
``toil stats`` command after the workflow is done.

``--disable-streaming``: Does not allow streaming of input files. This is enabled
by default for files marked with ``streamable`` flag True and only for remote files
when the jobStore is not on local machine.

Running CWL in the Cloud
------------------------

Expand Down
10 changes: 8 additions & 2 deletions setup.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from setuptools import find_packages, setup


cwltool_version = '3.1.20210816212154'


def run_setup():
"""
Calls setup(). This function exists so the setup() invocation preceded more internal
Expand All @@ -33,7 +36,7 @@ def run_setup():
gcs = 'google-cloud-storage==1.6.0'
gcs_oauth2_boto_plugin = 'gcs_oauth2_boto_plugin==1.14'
apacheLibcloud = 'apache-libcloud==2.2.1'
cwltool = 'cwltool==3.1.20210616134059'
cwltool = f'cwltool=={cwltool_version}'
galaxyToolUtil = 'galaxy-tool-util'
htcondor = 'htcondor>=8.6.0'
kubernetes = 'kubernetes>=12.0.1, <13'
Expand Down Expand Up @@ -168,7 +171,10 @@ def import_version():
# Use the template to generate src/toil/version.py
import version_template
with NamedTemporaryFile(mode='w', dir='src/toil', prefix='version.py.', delete=False) as f:
f.write(version_template.expand_())
f.write(version_template.expand_(others={
# expose the dependency versions that we may need to access in Toil
'cwltool_version': cwltool_version,
}))
os.rename(f.name, 'src/toil/version.py')

# Unfortunately, we can't use a straight import here because that would also load the stuff
Expand Down
5 changes: 4 additions & 1 deletion src/toil/batchSystems/abstractBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ def setUserScript(self, userScript):
raise NotImplementedError()

@abstractmethod
def issueBatchJob(self, jobDesc):
def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None):
"""
Issues a job with the specified command to the batch system and returns a unique jobID.
:param jobDesc a toil.job.JobDescription
:param job_environment: a collection of job-specific environment variables
to be set on the worker.
:return: a unique jobID that can be used to reference the newly issued job
:rtype: int
Expand Down Expand Up @@ -443,6 +445,7 @@ def shutdownLocal(self): # type: () -> None
"""To be called from shutdown()"""
self.localBatch.shutdown()


class BatchSystemCleanupSupport(BatchSystemLocalSupport):
"""
Adds cleanup support when the last running job leaves a node, for batch
Expand Down
31 changes: 19 additions & 12 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from datetime import datetime
from queue import Empty, Queue
from threading import Lock, Thread
from typing import Any, List, Union
from typing import Any, List, Dict, Union, Optional

from toil.batchSystems.abstractBatchSystem import (BatchJobExitReason,
BatchSystemCleanupSupport,
Expand Down Expand Up @@ -107,10 +107,10 @@ def createJobs(self, newJob: Any) -> bool:
while len(self.waitingJobs) > 0 and \
len(self.runningJobs) < int(self.boss.config.maxLocalJobs):
activity = True
jobID, cpu, memory, command, jobName = self.waitingJobs.pop(0)
jobID, cpu, memory, command, jobName, environment = self.waitingJobs.pop(0)

# prepare job submission command
subLine = self.prepareSubmission(cpu, memory, jobID, command, jobName)
subLine = self.prepareSubmission(cpu, memory, jobID, command, jobName, environment)
logger.debug("Running %r", subLine)
batchJobID = self.boss.with_retries(self.submitJob, subLine)
logger.debug("Submitted job %s", str(batchJobID))
Expand Down Expand Up @@ -255,29 +255,35 @@ def run(self):
logger.error("GridEngine like batch system failure", exc_info=ex)
raise

@abstractmethod
def coalesce_job_exit_codes(self, batch_job_id_list: list) -> list:
"""
Returns exit codes for a list of jobs.
Implementation-specific; called by
AbstractGridEngineWorker.checkOnJobs()
:param string batchjobIDList: List of batch system job ID
:param string batch_job_id_list: List of batch system job ID
"""
raise NotImplementedError()

@abstractmethod
def prepareSubmission(self, cpu, memory, jobID, command, jobName):
def prepareSubmission(self,
cpu: int,
memory: int,
jobID: int,
command: str,
jobName: str,
job_environment: Optional[Dict[str, str]] = None) -> List[str]:
"""
Preparation in putting together a command-line string
for submitting to batch system (via submitJob().)
:param: string cpu
:param: string memory
:param: string jobID : Toil job ID
:param: int cpu
:param: int memory
:param: int jobID: Toil job ID
:param: string subLine: the command line string to be called
:param: string jobName: the name of the Toil job, to provide metadata to batch systems if desired
:param: dict job_environment: the environment variables to be set on the worker
:rtype: string
:rtype: List[str]
"""
raise NotImplementedError()

Expand Down Expand Up @@ -354,7 +360,7 @@ def supportsWorkerCleanup(cls):
def supportsAutoDeployment(cls):
return False

def issueBatchJob(self, jobDesc):
def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None):
# Avoid submitting internal jobs to the batch queue, handle locally
localID = self.handleLocalJob(jobDesc)
if localID:
Expand All @@ -363,7 +369,8 @@ def issueBatchJob(self, jobDesc):
self.checkResourceRequest(jobDesc.memory, jobDesc.cores, jobDesc.disk)
jobID = self.getNextJobID()
self.currentJobs.add(jobID)
self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, jobDesc.command, jobDesc.jobName))
self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, jobDesc.command, jobDesc.jobName,
job_environment))
logger.debug("Issued the job command: %s with job id: %s and job name %s", jobDesc.command, str(jobID),
jobDesc.jobName)
return jobID
Expand Down
25 changes: 20 additions & 5 deletions src/toil/batchSystems/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os
import time
from pipes import quote
from typing import Optional, List, Dict

from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem
from toil.lib.misc import CalledProcessErrorStderr, call_command
Expand Down Expand Up @@ -48,8 +49,14 @@ def getRunningJobIDs(self):
def killJob(self, jobID):
call_command(['qdel', self.getBatchSystemID(jobID)])

def prepareSubmission(self, cpu, memory, jobID, command, jobName):
return self.prepareQsub(cpu, memory, jobID) + [command]
def prepareSubmission(self,
cpu: int,
memory: int,
jobID: int,
command: str,
jobName: str,
job_environment: Optional[Dict[str, str]] = None):
return self.prepareQsub(cpu, memory, jobID, job_environment) + [command]

def submitJob(self, subLine):
stdout = call_command(subLine)
Expand Down Expand Up @@ -94,14 +101,22 @@ def getJobExitCode(self, sgeJobID):
"""
Implementation-specific helper methods
"""
def prepareQsub(self, cpu: int, mem: int, jobID: int) -> List[str]:
def prepareQsub(self,
cpu: int,
mem: int,
jobID: int,
job_environment: Optional[Dict[str, str]] = None) -> List[str]:
qsubline = ['qsub', '-V', '-b', 'y', '-terse', '-j', 'y', '-cwd',
'-N', 'toil_job_' + str(jobID)]

if self.boss.environment:
environment = self.boss.environment.copy()
if job_environment:
environment.update(job_environment)

if environment:
qsubline.append('-v')
qsubline.append(','.join(k + '=' + quote(os.environ[k] if v is None else v)
for k, v in self.boss.environment.items()))
for k, v in environment.items()))

reqline = list()
sgeArgs = os.getenv('TOIL_GRIDENGINE_ARGS')
Expand Down
13 changes: 7 additions & 6 deletions src/toil/batchSystems/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import math
import os
import time
from typing import Any
from typing import Any, Optional, Dict

import htcondor

Expand Down Expand Up @@ -277,12 +277,12 @@ def connectSchedd(self):
return schedd

def getEnvString(self):
'''Build an environment string that a HTCondor Submit object can use.
"""
Build an environment string that a HTCondor Submit object can use.
For examples of valid strings, see:
http://research.cs.wisc.edu/htcondor/manual/current/condor_submit.html#man-condor-submit-environment
'''
"""

env_items = []
if self.boss.environment:
Expand All @@ -302,7 +302,7 @@ def getEnvString(self):
return '"' + ' '.join(env_items) + '"'

# Override the issueBatchJob method so HTCondor can be given the disk request
def issueBatchJob(self, jobNode):
def issueBatchJob(self, jobNode, job_environment: Optional[Dict[str, str]] = None):
# Avoid submitting internal jobs to the batch queue, handle locally
localID = self.handleLocalJob(jobNode)
if localID:
Expand All @@ -313,6 +313,7 @@ def issueBatchJob(self, jobNode):
self.currentJobs.add(jobID)

# Add the jobNode.disk and jobNode.jobName to the job tuple
self.newJobsQueue.put((jobID, jobNode.cores, jobNode.memory, jobNode.disk, jobNode.jobName, jobNode.command))
self.newJobsQueue.put((jobID, jobNode.cores, jobNode.memory, jobNode.disk, jobNode.jobName, jobNode.command,
job_environment))
logger.debug("Issued the job command: %s with job id: %s ", jobNode.command, str(jobID))
return jobID
18 changes: 13 additions & 5 deletions src/toil/batchSystems/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tempfile
import time
import uuid
from typing import Optional, Dict

import kubernetes
import pytz
Expand Down Expand Up @@ -362,15 +363,23 @@ def _create_affinity(self, preemptable: bool) -> kubernetes.client.V1Affinity:
# Make the node affinity into an overall affinity
return kubernetes.client.V1Affinity(node_affinity=node_affinity)

def _create_pod_spec(self, jobDesc: JobDescription) -> kubernetes.client.V1PodSpec:
def _create_pod_spec(
self,
jobDesc: JobDescription,
job_environment: Optional[Dict[str, str]] = None
) -> kubernetes.client.V1PodSpec:
"""
Make the specification for a pod that can execute the given job.
"""

environment = self.environment.copy()
if job_environment:
environment.update(job_environment)

# Make a job dict to send to the executor.
# First just wrap the command and the environment to run it in
job = {'command': jobDesc.command,
'environment': self.environment.copy()}
'environment': environment}
# TODO: query customDockerInitCmd to respect TOIL_CUSTOM_DOCKER_INIT_COMMAND

if self.userScript is not None:
Expand Down Expand Up @@ -456,8 +465,7 @@ def _create_pod_spec(self, jobDesc: JobDescription) -> kubernetes.client.V1PodSp

return pod_spec


def issueBatchJob(self, jobDesc):
def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None):
# TODO: get a sensible self.maxCores, etc. so we can checkResourceRequest.
# How do we know if the cluster will autoscale?

Expand All @@ -473,7 +481,7 @@ def issueBatchJob(self, jobDesc):
self.checkResourceRequest(jobDesc.memory, jobDesc.cores, jobDesc.disk)

# Make a pod that describes running the job
pod_spec = self._create_pod_spec(jobDesc)
pod_spec = self._create_pod_spec(jobDesc, job_environment=job_environment)

# Make a batch system scope job ID
jobID = self.getNextJobID()
Expand Down
22 changes: 16 additions & 6 deletions src/toil/batchSystems/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import subprocess
from datetime import datetime
from random import randint
from typing import List, Union
from typing import Union, Optional, List, Dict

from dateutil.parser import parse
from dateutil.tz import tzlocal
Expand Down Expand Up @@ -85,12 +85,23 @@ def fallbackRunningJobIDs(self, currentjobs):
def killJob(self, jobID):
call_command(['bkill', self.getBatchSystemID(jobID)])

def prepareSubmission(self, cpu, memory, jobID, command, jobName):
return self.prepareBsub(cpu, memory, jobID) + [command]
def prepareSubmission(self,
cpu: int,
memory: int,
jobID: int,
command: str,
jobName: str,
job_environment: Optional[Dict[str, str]] = None):
return (self.prepareBsub(cpu, memory, jobID) + [command],
job_environment) # pass job_environment to .submitJob()

def submitJob(self, subLine):
subLine, job_environment = subLine
combinedEnv = self.boss.environment
combinedEnv.update(os.environ)
if job_environment:
combinedEnv.update(job_environment)

stdout = call_command(subLine, env=combinedEnv)
# Example success: Job <39605914> is submitted to default queue <general>.
# Example fail: Service class does not exist. Job not submitted.
Expand All @@ -102,7 +113,7 @@ def submitJob(self, subLine):
else:
logger.error("Could not submit job\nReason: {}".format(stdout))
temp_id = randint(10000000, 99999999)
#Flag this job to be handled by getJobExitCode
# Flag this job to be handled by getJobExitCode
result = "NOT_SUBMITTED_{}".format(temp_id)
return result

Expand Down Expand Up @@ -229,7 +240,6 @@ def parse_bjobs_record(self, bjobs_record: dict, job: int) -> Union[int, None]:

return self.getJobExitCodeBACCT(job)


def getJobExitCodeBACCT(self,job):
# if not found in bjobs, then try bacct (slower than bjobs)
logger.debug("bjobs failed to detect job - trying bacct: "
Expand Down Expand Up @@ -318,7 +328,7 @@ def prepareBsub(self, cpu: int, mem: int, jobID: int) -> List[str]:
bsubline.extend(lsfArgs.split())
return bsubline

def parseBjobs(self,bjobs_output_str):
def parseBjobs(self, bjobs_output_str):
"""
Parse records from bjobs json type output
params:
Expand Down
Loading

0 comments on commit c932df0

Please sign in to comment.