Skip to content

Commit

Permalink
Enhanced types (DataBiosphere#3975)
Browse files Browse the repository at this point in the history
* replace asserts outside of tests with AssertionErrors


Co-authored-by: DailyDreaming <[email protected]>
Co-authored-by: Adam Novak <[email protected]>
  • Loading branch information
3 people authored Jan 29, 2022
1 parent 5c431e5 commit 9598202
Show file tree
Hide file tree
Showing 22 changed files with 718 additions and 651 deletions.
2 changes: 0 additions & 2 deletions contrib/admin/mypy-with-ignore.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def main():
'docs/vendor/sphinxcontrib/__init__.py',
'src/toil/job.py',
'src/toil/leader.py',
'src/toil/common.py',
'src/toil/__init__.py',
'src/toil/resource.py',
'src/toil/deferred.py',
Expand All @@ -41,7 +40,6 @@ def main():
'src/toil/wdl/versions/draft2.py',
'src/toil/wdl/versions/v1.py',
'src/toil/wdl/versions/dev.py',
'src/toil/provisioners/clusterScaler.py',
'src/toil/provisioners/abstractProvisioner.py',
'src/toil/provisioners/gceProvisioner.py',
'src/toil/provisioners/__init__.py',
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pretty = true

warn_unused_configs = True
disallow_any_generics = True
disallow_subclassing_any = True
disallow_subclassing_any = False
disallow_untyped_defs = True
disallow_incomplete_defs = True
check_untyped_defs = True
Expand Down
25 changes: 11 additions & 14 deletions src/toil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,14 @@ def _check_custom_bash_cmd(cmd_str):
assert not re.search(r'[\n\r\t]', cmd_str), f'"{cmd_str}" contains invalid characters (newline and/or tab).'


def lookupEnvVar(name, envName, defaultValue):
def lookupEnvVar(name: str, envName: str, defaultValue: str) -> str:
"""
Use this for looking up environment variables that control Toil and are important enough to
log the result of that lookup.
Look up environment variables that control Toil and log the result.
:param str name: the human readable name of the variable
:param str envName: the name of the environment variable to lookup
:param str defaultValue: the fall-back value
:param name: the human readable name of the variable
:param envName: the name of the environment variable to lookup
:param defaultValue: the fall-back value
:return: the value of the environment variable or the default value the variable is not set
:rtype: str
"""
try:
value = os.environ[envName]
Expand All @@ -264,17 +262,16 @@ def lookupEnvVar(name, envName, defaultValue):
return value


def checkDockerImageExists(appliance):
def checkDockerImageExists(appliance: str) -> str:
"""
Attempts to check a url registryName for the existence of a docker image with a given tag.
Attempt to check a url registryName for the existence of a docker image with a given tag.
:param str appliance: The url of a docker image's registry (with a tag) of the form:
'quay.io/<repo_path>:<tag>' or '<repo_path>:<tag>'.
Examples: 'quay.io/ucsc_cgl/toil:latest', 'ubuntu:latest', or
'broadinstitute/genomes-in-the-cloud:2.0.0'.
:param appliance: The url of a docker image's registry (with a tag) of the form:
'quay.io/<repo_path>:<tag>' or '<repo_path>:<tag>'.
Examples: 'quay.io/ucsc_cgl/toil:latest', 'ubuntu:latest', or
'broadinstitute/genomes-in-the-cloud:2.0.0'.
:return: Raises an exception if the docker image cannot be found or is invalid. Otherwise, it
will return the appliance string.
:rtype: str
"""
if currentCommit in appliance:
return appliance
Expand Down
37 changes: 8 additions & 29 deletions src/toil/batchSystems/abstractBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,25 @@ class UpdatedBatchJobInfo(NamedTuple):

# Information required for worker cleanup on shutdown of the batch system.
class WorkerCleanupInfo(NamedTuple):
workDir: str
workDir: Optional[str]
"""workdir path (where the cache would go)"""

workflowID: str
"""used to identify files specific to this workflow"""

cleanWorkDir: str

class AbstractBatchSystem(ABC):
"""
An abstract (as far as Python currently allows) base class to represent the interface the batch
system must provide to Toil.
"""

class AbstractBatchSystem(ABC):
"""An abstract base class to represent the interface the batch system must provide to Toil."""
@classmethod
@abstractmethod
def supportsAutoDeployment(cls) -> bool:
"""
Whether this batch system supports auto-deployment of the user script itself. If it does,
the :meth:`.setUserScript` can be invoked to set the resource object representing the user
script.
Whether this batch system supports auto-deployment of the user script itself.
If it does, the :meth:`.setUserScript` can be invoked to set the resource
object representing the user script.
Note to implementors: If your implementation returns True here, it should also override
"""
Expand Down Expand Up @@ -438,7 +436,7 @@ class AbstractScalableBatchSystem(AbstractBatchSystem):
"""

@abstractmethod
def getNodes(self, preemptable: Optional[bool] = None) -> Dict[str, NodeInfo]:
def getNodes(self, preemptable: Optional[bool] = None, timeout: int = 600) -> Dict[str, NodeInfo]:
"""
Returns a dictionary mapping node identifiers of preemptable or non-preemptable nodes to
NodeInfo objects, one for each node.
Expand All @@ -460,25 +458,6 @@ def nodeInUse(self, nodeIP: str) -> bool:
"""
raise NotImplementedError()

# TODO: May be unused!
@abstractmethod
@contextmanager
def nodeFiltering(self, filter: Optional[Callable[[NodeInfo], bool]]) -> Iterator[None]:
"""
Used to prevent races in autoscaling where
1) nodes have reported to the autoscaler as having no jobs
2) scaler decides to terminate these nodes. In parallel the batch system assigns jobs to the same nodes
3) scaler terminates nodes, resulting in job failures for all jobs on that node.
Call this method prior to node termination to ensure that nodes being considered for termination are not
assigned new jobs. Call the method again passing None as the filter to disable the filtering
after node termination is done.
:param method: This will be used as a filter on nodes considered when assigning new jobs.
After this context manager exits the filter should be removed
"""
raise NotImplementedError()

@abstractmethod
def ignoreNode(self, nodeAddress: str) -> None:
"""
Expand Down
29 changes: 15 additions & 14 deletions src/toil/batchSystems/mesos/batchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,9 @@ def getUpdatedBatchJob(self, maxWait):
else:
log.debug('Job %s ended naturally before it could be killed.', item.jobID)

def nodeInUse(self, nodeIP):
def nodeInUse(self, nodeIP: str) -> bool:
return nodeIP in self.hostToJobIDs

@contextmanager
def nodeFiltering(self, filter):
self.nodeFilter = [filter]
yield
self.nodeFilter = []

def getWaitDuration(self):
"""
Gets the period of time to wait (floating point, in seconds) between checking for
Expand Down Expand Up @@ -715,13 +709,20 @@ def _registerNode(self, nodeAddress, agentId, nodePort=5051):

return executor

def getNodes(self, preemptable=None, timeout=600):
timeout = timeout or sys.maxsize
return {nodeAddress: executor.nodeInfo
for nodeAddress, executor in self.executors.items()
if time.time() - executor.lastSeen < timeout
and (preemptable is None
or preemptable == (executor.agentId not in self.nonPreemptableNodes))}
def getNodes(self,
preemptable: Optional[bool] = None,
timeout: Optional[int] = None) -> Dict[str, NodeInfo]:
"""
Return all nodes that match:
- preemptable status (None includes all)
- timeout period (seen within the last # seconds, or None for all)
"""
nodes = dict()
for node_ip, executor in self.executors.items():
if preemptable is None or (preemptable == (executor.agentId not in self.nonPreemptableNodes)):
if timeout is None or (time.time() - executor.lastSeen < timeout):
nodes[node_ip] = executor.nodeInfo
return nodes

def reregistered(self, driver, masterInfo):
"""
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def kubernetes_batch_system_factory():
return KubernetesBatchSystem


BATCH_SYSTEM_FACTORY_REGISTRY = {
BATCH_SYSTEM_FACTORY_REGISTRY: Dict[str, Callable[[], Type["AbstractBatchSystem"]]] = {
'aws_batch' : aws_batch_batch_system_factory,
'parasol' : parasol_batch_system_factory,
'single_machine' : single_machine_batch_system_factory,
Expand Down
Loading

0 comments on commit 9598202

Please sign in to comment.