From 1ae5547fe93c73f68e3eb7f5d3dd05aeceb413ab Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 29 Apr 2020 16:47:27 -0700 Subject: [PATCH] Detect insufficient resources and deadlocks more usefully (#3043) * Refactor MemoryString to be comparable the Python 3 way * Move the DeadlockException * Remove 2/3 stuff from SingleMachine batch system * Make the AbstractBatchSystem actually be responsible for SingleMachineBatchSystem size checking * Get out-of-resource formatting right * Don't warn with the job directory * Give types to Toil service/deadlock args so Cactus can compare them with numbers * Add --deadlockCheckInterval and superfluous logging * Settle on when we want to log * Implement a way to get hints from the batch system to the user for debugging deadlocks * Revert changes I don't want * Revise deadlock option documentation * Replace hints with messages and details --- docs/running/cliOptions.rst | 15 +++- src/toil/batchSystems/__init__.py | 39 +++++++---- src/toil/batchSystems/abstractBatchSystem.py | 68 +++++++++++++++--- src/toil/batchSystems/singleMachine.py | 53 ++++++++------ src/toil/common.py | 24 +++++-- src/toil/jobStores/fileJobStore.py | 1 - src/toil/leader.py | 74 +++++++++++++------- src/toil/lib/threading.py | 1 + 8 files changed, 198 insertions(+), 77 deletions(-) diff --git a/docs/running/cliOptions.rst b/docs/running/cliOptions.rst index 9b9f911479..b752ce3d29 100644 --- a/docs/running/cliOptions.rst +++ b/docs/running/cliOptions.rst @@ -236,9 +236,18 @@ the logging module: concurrently on preemptable nodes. default=9223372036854775807 --deadlockWait DEADLOCKWAIT - The minimum number of seconds to observe the cluster - stuck running only the same service jobs before - throwing a deadlock exception. default=60 + Time, in seconds, to tolerate the workflow running only + the same service jobs, with no jobs to use them, before + declaring the workflow to be deadlocked and stopping. + default=60 + --deadlockCheckInterval DEADLOCKCHECKINTERVAL + Time, in seconds, to wait between checks to see if the + workflow is stuck running only service jobs, with no + jobs to use them. Should be shorter than + --deadlockWait. May need to be increased if the batch + system cannot enumerate running jobs quickly enough, or + if polling for running jobs is placing an unacceptable + load on a shared cluster. default=30 --statePollingWait STATEPOLLINGWAIT Time, in seconds, to wait before doing a scheduler query for job state. Return cached results if within diff --git a/src/toil/batchSystems/__init__.py b/src/toil/batchSystems/__init__.py index b6ef91dbe4..77af3e4cbd 100644 --- a/src/toil/batchSystems/__init__.py +++ b/src/toil/batchSystems/__init__.py @@ -12,19 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import -from past.builtins import cmp -from builtins import str -from builtins import object import sys -if sys.version_info >= (3, 0): +from functools import total_ordering - # https://docs.python.org/3.0/whatsnew/3.0.html#ordering-comparisons - def cmp(a, b): - return (a > b) - (a < b) +class DeadlockException(Exception): + """ + Exception thrown by the Leader or BatchSystem when a deadlock is encountered due to insufficient + resources to run the workflow + """ + def __init__(self, msg): + self.msg = "Deadlock encountered: " + msg + super().__init__() -class MemoryString(object): + def __str__(self): + """ + Stringify the exception, including the message. + """ + return self.msg + +@total_ordering +class MemoryString: + """ + Represents an amount of bytes, as a string, using suffixes for the unit. + + Comparable based on the actual number of bytes instead of string value. + """ def __init__(self, string): if string[-1] == 'K' or string[-1] == 'M' or string[-1] == 'G' or string[-1] == 'T': #10K self.unit = string[-1] @@ -55,8 +68,8 @@ def byteVal(self): elif self.unit == 'T': return self.val * 1099511627776 - def __cmp__(self, other): - return cmp(self.bytes, other.bytes) + def __eq__(self, other): + return self.bytes == other.bytes - def __gt__(self, other): - return self.bytes > other.bytes + def __lt__(self, other): + return self.bytes < other.bytes diff --git a/src/toil/batchSystems/abstractBatchSystem.py b/src/toil/batchSystems/abstractBatchSystem.py index bbba691a7b..44b3df114c 100644 --- a/src/toil/batchSystems/abstractBatchSystem.py +++ b/src/toil/batchSystems/abstractBatchSystem.py @@ -183,6 +183,26 @@ def getUpdatedBatchJob(self, maxWait): batch system does not support tracking wall time. """ raise NotImplementedError() + + def getSchedulingStatusMessage(self): + """ + Get a log message fragment for the user about anything that might be + going wrong in the batch system, if available. + + If no useful message is available, return None. + + This can be used to report what resource is the limiting factor when + scheduling jobs, for example. If the leader thinks the workflow is + stuck, the message can be displayed to the user to help them diagnose + why it might be stuck. + + :rtype: str or None + :return: User-directed message about scheduling state. + """ + + # Default implementation returns None. + # Override to provide scheduling status information. + return None @abstractmethod def shutdown(self): @@ -253,7 +273,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): workflowID=self.config.workflowID, cleanWorkDir=self.config.cleanWorkDir) - def checkResourceRequest(self, memory, cores, disk): + def checkResourceRequest(self, memory, cores, disk, name=None, detail=None): """ Check resource request is not greater than that available or allowed. @@ -262,6 +282,10 @@ def checkResourceRequest(self, memory, cores, disk): :param float cores: number of cores being requested :param int disk: amount of disk space being requested, in bytes + + :param str name: Name of the job being checked, for generating a useful error report. + + :param str detail: Batch-system-specific message to include in the error. :raise InsufficientSystemResources: raised when a resource is requested in an amount greater than allowed @@ -270,11 +294,14 @@ def checkResourceRequest(self, memory, cores, disk): assert disk is not None assert cores is not None if cores > self.maxCores: - raise InsufficientSystemResources('cores', cores, self.maxCores) + raise InsufficientSystemResources('cores', cores, self.maxCores, + batchSystem=self.__class__.__name__, name=name, detail=detail) if memory > self.maxMemory: - raise InsufficientSystemResources('memory', memory, self.maxMemory) + raise InsufficientSystemResources('memory', memory, self.maxMemory, + batchSystem=self.__class__.__name__, name=name, detail=detail) if disk > self.maxDisk: - raise InsufficientSystemResources('disk', disk, self.maxDisk) + raise InsufficientSystemResources('disk', disk, self.maxDisk, + batchSystem=self.__class__.__name__, name=name, detail=detail) def setEnv(self, name, value=None): """ @@ -519,7 +546,7 @@ class InsufficientSystemResources(Exception): To be raised when a job requests more of a particular resource than is either currently allowed or avaliable """ - def __init__(self, resource, requested, available): + def __init__(self, resource, requested, available, batchSystem=None, name=None, detail=None): """ Creates an instance of this exception that indicates which resource is insufficient for current demands, as well as the amount requested and amount actually available. @@ -530,12 +557,37 @@ def __init__(self, resource, requested, available): in this exception :param int|float available: amount of the particular resource actually available + + :param str batchSystem: Name of the batch system class complaining, for + generating a useful error report. If you are using a single machine + batch system for local jobs in another batch system, it is important to + know which one has run out of resources. + + :param str name: Name of the job being checked, for generating a useful error report. + + :param str detail: Batch-system-specific message to include in the error. """ self.requested = requested self.available = available self.resource = resource + self.batchSystem = batchSystem if batchSystem is not None else 'this batch system' + self.unit = 'bytes of ' if resource == 'disk' or resource == 'memory' else '' + self.name = name + self.detail = detail def __str__(self): - return 'Requesting more {} than either physically available, or enforced by --max{}. ' \ - 'Requested: {}, Available: {}'.format(self.resource, self.resource.capitalize(), - self.requested, self.available) + if self.name is not None: + phrases = [('The job {} is requesting {} {}{}, more than ' + 'the maximum of {} {}{} that {} was configured ' + 'with.'.format(self.name, self.requested, self.unit, self.resource, + self.available, self.unit, self.resource, self.batchSystem))] + else: + phrases = [('Requesting more {} than either physically available to {}, or enforced by --max{}. ' + 'Requested: {}, Available: {}'.format(self.resource, self.batchSystem, + self.resource.capitalize(), + self.requested, self.available))] + + if self.detail is not None: + phrases.append(self.detail) + + return ' '.join(phrases) diff --git a/src/toil/batchSystems/singleMachine.py b/src/toil/batchSystems/singleMachine.py index 2e3178b280..f69a670b16 100644 --- a/src/toil/batchSystems/singleMachine.py +++ b/src/toil/batchSystems/singleMachine.py @@ -12,13 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import -from __future__ import division -from future import standard_library -standard_library.install_aliases() -from builtins import str -from builtins import range -from builtins import object from past.utils import old_div from contextlib import contextmanager import logging @@ -160,6 +153,9 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): self.memory = ResourcePool(self.maxMemory, 'memory') # A pool representing the available space in bytes self.disk = ResourcePool(self.maxDisk, 'disk') + + # If we can't schedule something, we fill this in with a reason why + self.schedulingStatusMessage = None # We use this event to signal shutdown self.shuttingDown = Event() @@ -176,7 +172,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): self.daddyThread = Thread(target=self.daddy, daemon=True) self.daddyThread.start() log.debug('Started in normal mode.') - + def daddy(self): """ Be the "daddy" thread. @@ -352,6 +348,22 @@ def _runDebugJob(self, jobCommand, jobID, environment): self.runningJobs.pop(jobID) if not info.killIntended: self.outputQueue.put(UpdatedBatchJobInfo(jobID=jobID, exitStatus=0, wallTime=time.time() - info.time, exitReason=None)) + + def getSchedulingStatusMessage(self): + # Implement the abstractBatchSystem's scheduling status message API + return self.schedulingStatusMessage + + def _setSchedulingStatusMessage(self, message): + """ + If we can't run a job, we record a short message about why not. If the + leader wants to know what is up with us (for example, to diagnose a + deadlock), it can ask us for the message. + """ + + self.schedulingStatusMessage = message + + # Report the message in the debug log too. + log.debug(message) def _startChild(self, jobCommand, jobID, coreFractions, jobMemory, jobDisk, environment): """ @@ -423,13 +435,13 @@ def _startChild(self, jobCommand, jobID, coreFractions, jobMemory, jobDisk, envi # We can't get disk, so free cores and memory self.coreFractions.release(coreFractions) self.memory.release(jobMemory) - log.debug('Not enough disk to run job %s', jobID) + self._setSchedulingStatusMessage('Not enough disk to run job %s' % jobID) else: # Free cores, since we can't get memory self.coreFractions.release(coreFractions) - log.debug('Not enough memory to run job %s', jobID) + self._setSchedulingStatusMessage('Not enough memory to run job %s' % jobID) else: - log.debug('Not enough cores to run job %s', jobID) + self._setSchedulingStatusMessage('Not enough cores to run job %s' % jobID) # If we get here, we didn't succeed or fail starting the job. # We didn't manage to get the resources. @@ -481,16 +493,15 @@ def issueBatchJob(self, jobNode): self._checkOnDaddy() - # Round cores to minCores and apply scale - cores = math.ceil(jobNode.cores * self.scale / self.minCores) * self.minCores - assert cores <= self.maxCores, ('The job {} is requesting {} cores, more than the maximum of ' - '{} cores this batch system was configured with. Scale is ' - 'set to {}.'.format(jobNode.jobName, cores, self.maxCores, self.scale)) - assert cores >= self.minCores - assert jobNode.memory <= self.maxMemory, ('The job {} is requesting {} bytes of memory, more than ' - 'the maximum of {} this batch system was configured ' - 'with.'.format(jobNode.jobName, jobNode.memory, self.maxMemory)) - + # Round cores to minCores and apply scale. + # Make sure to give minCores even if asked for 0 cores, or negative or something. + cores = max(math.ceil(jobNode.cores * self.scale / self.minCores) * self.minCores, self.minCores) + + # Don't do our own assertions about job size vs. our configured size. + # The abstract batch system can handle it. + self.checkResourceRequest(jobNode.memory, cores, jobNode.disk, name=jobNode.jobName, + detail='Scale is set to {}.'.format(self.scale)) + self.checkResourceRequest(jobNode.memory, cores, jobNode.disk) log.debug("Issuing the command: %s with memory: %i, cores: %i, disk: %i" % ( jobNode.command, jobNode.memory, cores, jobNode.disk)) diff --git a/src/toil/common.py b/src/toil/common.py index 88914544cc..599bf2737e 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -99,7 +99,8 @@ def __init__(self): # Parameters to limit service jobs, so preventing deadlock scheduling scenarios self.maxPreemptableServiceJobs = sys.maxsize self.maxServiceJobs = sys.maxsize - self.deadlockWait = 60 # Number of seconds to wait before declaring a deadlock + self.deadlockWait = 60 # Number of seconds we must be stuck with all services before declaring a deadlock + self.deadlockCheckInterval = 30 # Minimum polling delay for deadlocks self.statePollingWait = 1 # Number of seconds to wait before querying job state # Resource requirements @@ -253,6 +254,7 @@ def parseIntList(s): setOption("maxServiceJobs", int) setOption("maxPreemptableServiceJobs", int) setOption("deadlockWait", int) + setOption("deadlockCheckInterval", int) setOption("statePollingWait", int) # Resource requirements @@ -471,16 +473,26 @@ def _addOptions(addGroupFn, config): "Allows the specification of the maximum number of service jobs " "in a cluster. By keeping this limited " " we can avoid all the nodes being occupied with services, so causing a deadlock") - addOptionFn("--maxServiceJobs", dest="maxServiceJobs", default=None, + addOptionFn("--maxServiceJobs", dest="maxServiceJobs", default=None, type=int, help=( "The maximum number of service jobs that can be run concurrently, excluding service jobs running on preemptable nodes. default=%s" % config.maxServiceJobs)) - addOptionFn("--maxPreemptableServiceJobs", dest="maxPreemptableServiceJobs", default=None, + addOptionFn("--maxPreemptableServiceJobs", dest="maxPreemptableServiceJobs", default=None, type=int, help=( "The maximum number of service jobs that can run concurrently on preemptable nodes. default=%s" % config.maxPreemptableServiceJobs)) - addOptionFn("--deadlockWait", dest="deadlockWait", default=None, + addOptionFn("--deadlockWait", dest="deadlockWait", default=None, type=int, help=( - "The minimum number of seconds to observe the cluster stuck running only the same service jobs before throwing a deadlock exception. default=%s" % config.deadlockWait)) - addOptionFn("--statePollingWait", dest="statePollingWait", default=1, + "Time, in seconds, to tolerate the workflow running only the same service " + "jobs, with no jobs to use them, before declaring the workflow to be " + "deadlocked and stopping. default=%s" % config.deadlockWait)) + addOptionFn("--deadlockCheckInterval", dest="deadlockCheckInterval", default=None, type=int, + help=( + "Time, in seconds, to wait between checks to see if the workflow is stuck " + "running only service jobs, with no jobs to use them. Should be shorter than " + "--deadlockWait. May need to be increased if the batch system cannot " + "enumerate running jobs quickly enough, or if polling for running jobs is " + "placing an unacceptable load on a shared cluster. default=%s" % + config.deadlockCheckInterval)) + addOptionFn("--statePollingWait", dest="statePollingWait", default=1, type=int, help=("Time, in seconds, to wait before doing a scheduler query for job state. " "Return cached results if within the waiting period.")) diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index 12762d2d55..5ba014f476 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -243,7 +243,6 @@ def jobs(self): # is in progress. for tempDir in self._jobDirectories(): for i in os.listdir(tempDir): - logger.warning('Job Dir: %s' % i) if i.startswith(self.JOB_DIR_PREFIX): # This is a job instance directory jobId = self._getJobIdFromDir(os.path.join(tempDir, i)) diff --git a/src/toil/leader.py b/src/toil/leader.py index 8154485311..1d5e6dfd6c 100644 --- a/src/toil/leader.py +++ b/src/toil/leader.py @@ -38,6 +38,7 @@ CWL_INTERNAL_JOBS = () from toil.batchSystems.abstractBatchSystem import BatchJobExitReason from toil.jobStores.abstractJobStore import NoSuchJobException +from toil.batchSystems import DeadlockException from toil.lib.throttle import LocalThrottle from toil.provisioners.clusterScaler import ScalerThread from toil.serviceManager import ServiceManager @@ -96,22 +97,6 @@ def __str__(self): return self.msg -#################################################### -# Exception thrown by the Leader class when a deadlock is encountered due to insufficient -# resources to run the workflow -#################################################### - -class DeadlockException(Exception): - def __init__(self, msg): - self.msg = "Deadlock encountered: " + msg - super().__init__() - - def __str__(self): - """ - Stringify the exception, including the message. - """ - return self.msg - #################################################### ##Following class represents the leader #################################################### @@ -196,7 +181,7 @@ def __init__(self, config, batchSystem, provisioner, jobStore, rootJob, jobCache self.debugJobNames = ("CWLJob", "CWLWorkflow", "CWLScatter", "CWLGather", "ResolveIndirect") - self.deadlockThrottler = LocalThrottle(self.config.deadlockWait) + self.deadlockThrottler = LocalThrottle(self.config.deadlockCheckInterval) self.statusThrottler = LocalThrottle(self.config.statusWait) @@ -588,7 +573,7 @@ def innerLoop(self): # the cluster scaler object will only be instantiated if autoscaling is enabled if self.clusterScaler is not None: self.clusterScaler.check() - + if len(self.toilState.updatedJobs) == 0 and self.deadlockThrottler.throttle(wait=False): # Nothing happened this round and it's been long # enough since we last checked. Check for deadlocks. @@ -616,28 +601,66 @@ def checkForDeadlocks(self): """ Checks if the system is deadlocked running service jobs. """ + totalRunningJobs = len(self.batchSystem.getRunningBatchJobIDs()) totalServicesIssued = self.serviceJobsIssued + self.preemptableServiceJobsIssued + # If there are no updated jobs and at least some jobs running if totalServicesIssued >= totalRunningJobs and totalRunningJobs > 0: serviceJobs = [x for x in list(self.jobBatchSystemIDToIssuedJob.keys()) if isinstance(self.jobBatchSystemIDToIssuedJob[x], ServiceJobNode)] runningServiceJobs = set([x for x in serviceJobs if self.serviceManager.isRunning(self.jobBatchSystemIDToIssuedJob[x])]) assert len(runningServiceJobs) <= totalRunningJobs - + # If all the running jobs are active services then we have a potential deadlock if len(runningServiceJobs) == totalRunningJobs: - # We wait self.config.deadlockWait seconds before declaring the system deadlocked + # There could be trouble; we are 100% services. + # See if the batch system has anything to say for itself about its failure to run our jobs. + message = self.batchSystem.getSchedulingStatusMessage() + if message is not None: + # Prepend something explaining the message + message = "The batch system reports: {}".format(message) + else: + # Use a generic message if none is available + message = "Cluster may be too small." + + + # See if this is a new potential deadlock if self.potentialDeadlockedJobs != runningServiceJobs: + logger.warning(("Potential deadlock detected! All %s running jobs are service jobs, " + "with no normal jobs to use them! %s"), totalRunningJobs, message) self.potentialDeadlockedJobs = runningServiceJobs self.potentialDeadlockTime = time.time() - elif time.time() - self.potentialDeadlockTime >= self.config.deadlockWait: - raise DeadlockException("The system is service deadlocked - all %d running jobs are active services" % totalRunningJobs) + else: + # We wait self.config.deadlockWait seconds before declaring the system deadlocked + stuckFor = time.time() - self.potentialDeadlockTime + if stuckFor >= self.config.deadlockWait: + logger.error("We have been deadlocked since %s on these service jobs: %s", + self.potentialDeadlockTime, self.potentialDeadlockedJobs) + raise DeadlockException(("The workflow is service deadlocked - all %d running jobs " + "have been the same active services for at least %s seconds") % (totalRunningJobs, self.config.deadlockWait)) + else: + # Complain that we are still stuck. + waitingNormalJobs = self.getNumberOfJobsIssued() - totalServicesIssued + logger.warning(("Potentially deadlocked for %.0f seconds. Waiting at most %.0f more seconds " + "for any of %d issued non-service jobs to schedule and start. %s"), + stuckFor, self.config.deadlockWait - stuckFor, waitingNormalJobs, message) else: # We have observed non-service jobs running, so reset the potential deadlock + + if len(self.potentialDeadlockedJobs) > 0: + # We thought we had a deadlock. Tell the user it is fixed. + logger.warning("Potential deadlock has been resolved; non-service jobs are now running.") + self.potentialDeadlockedJobs = set() self.potentialDeadlockTime = 0 else: - # We have observed non-service jobs running, so reset the potential deadlock + # We have observed non-service jobs running, so reset the potential deadlock. + # TODO: deduplicate with above + + if len(self.potentialDeadlockedJobs) > 0: + # We thought we had a deadlock. Tell the user it is fixed. + logger.warning("Potential deadlock has been resolved; non-service jobs are now running.") + self.potentialDeadlockedJobs = set() self.potentialDeadlockTime = 0 @@ -731,8 +754,9 @@ def _reportWorkflowStatus(self): Report the current status of the workflow to the user. """ - # For now just log our status hint to the log. - # TODO: fancier UI? + # For now just log our scheduling status message to the log. + # TODO: make this update fast enought to put it in the progress + # bar/status line. logger.info(self._getStatusHint()) def removeJob(self, jobBatchSystemID): diff --git a/src/toil/lib/threading.py b/src/toil/lib/threading.py index 3775bf5679..c42bf92d07 100644 --- a/src/toil/lib/threading.py +++ b/src/toil/lib/threading.py @@ -43,6 +43,7 @@ class BoundedEmptySemaphore( BoundedSemaphore ): def __init__( self, value=1, verbose=None ): super( BoundedEmptySemaphore, self ).__init__( value, verbose ) for i in range( value ): + # Empty out the semaphore assert self.acquire( blocking=False )