Skip to content

Commit

Permalink
Detect insufficient resources and deadlocks more usefully (#3043)
Browse files Browse the repository at this point in the history
* Refactor MemoryString to be comparable the Python 3 way

* Move the DeadlockException

* Remove 2/3 stuff from SingleMachine batch system

* Make the AbstractBatchSystem actually be responsible for SingleMachineBatchSystem size checking

* Get out-of-resource formatting right

* Don't warn with the job directory

* Give types to Toil service/deadlock args so Cactus can compare them with numbers

* Add --deadlockCheckInterval and superfluous logging

* Settle on when we want to log

* Implement a way to get hints from the batch system to the user for debugging deadlocks

* Revert changes I don't want

* Revise deadlock option documentation

* Replace hints with messages and details
  • Loading branch information
adamnovak authored Apr 29, 2020
1 parent 8b45f1a commit 1ae5547
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 77 deletions.
15 changes: 12 additions & 3 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,18 @@ the logging module:
concurrently on preemptable nodes.
default=9223372036854775807
--deadlockWait DEADLOCKWAIT
The minimum number of seconds to observe the cluster
stuck running only the same service jobs before
throwing a deadlock exception. default=60
Time, in seconds, to tolerate the workflow running only
the same service jobs, with no jobs to use them, before
declaring the workflow to be deadlocked and stopping.
default=60
--deadlockCheckInterval DEADLOCKCHECKINTERVAL
Time, in seconds, to wait between checks to see if the
workflow is stuck running only service jobs, with no
jobs to use them. Should be shorter than
--deadlockWait. May need to be increased if the batch
system cannot enumerate running jobs quickly enough, or
if polling for running jobs is placing an unacceptable
load on a shared cluster. default=30
--statePollingWait STATEPOLLINGWAIT
Time, in seconds, to wait before doing a scheduler
query for job state. Return cached results if within
Expand Down
39 changes: 26 additions & 13 deletions src/toil/batchSystems/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from past.builtins import cmp
from builtins import str
from builtins import object
import sys

if sys.version_info >= (3, 0):
from functools import total_ordering

# https://docs.python.org/3.0/whatsnew/3.0.html#ordering-comparisons
def cmp(a, b):
return (a > b) - (a < b)
class DeadlockException(Exception):
"""
Exception thrown by the Leader or BatchSystem when a deadlock is encountered due to insufficient
resources to run the workflow
"""
def __init__(self, msg):
self.msg = "Deadlock encountered: " + msg
super().__init__()

class MemoryString(object):
def __str__(self):
"""
Stringify the exception, including the message.
"""
return self.msg

@total_ordering
class MemoryString:
"""
Represents an amount of bytes, as a string, using suffixes for the unit.
Comparable based on the actual number of bytes instead of string value.
"""
def __init__(self, string):
if string[-1] == 'K' or string[-1] == 'M' or string[-1] == 'G' or string[-1] == 'T': #10K
self.unit = string[-1]
Expand Down Expand Up @@ -55,8 +68,8 @@ def byteVal(self):
elif self.unit == 'T':
return self.val * 1099511627776

def __cmp__(self, other):
return cmp(self.bytes, other.bytes)
def __eq__(self, other):
return self.bytes == other.bytes

def __gt__(self, other):
return self.bytes > other.bytes
def __lt__(self, other):
return self.bytes < other.bytes
68 changes: 60 additions & 8 deletions src/toil/batchSystems/abstractBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,26 @@ def getUpdatedBatchJob(self, maxWait):
batch system does not support tracking wall time.
"""
raise NotImplementedError()

def getSchedulingStatusMessage(self):
"""
Get a log message fragment for the user about anything that might be
going wrong in the batch system, if available.
If no useful message is available, return None.
This can be used to report what resource is the limiting factor when
scheduling jobs, for example. If the leader thinks the workflow is
stuck, the message can be displayed to the user to help them diagnose
why it might be stuck.
:rtype: str or None
:return: User-directed message about scheduling state.
"""

# Default implementation returns None.
# Override to provide scheduling status information.
return None

@abstractmethod
def shutdown(self):
Expand Down Expand Up @@ -253,7 +273,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
workflowID=self.config.workflowID,
cleanWorkDir=self.config.cleanWorkDir)

def checkResourceRequest(self, memory, cores, disk):
def checkResourceRequest(self, memory, cores, disk, name=None, detail=None):
"""
Check resource request is not greater than that available or allowed.
Expand All @@ -262,6 +282,10 @@ def checkResourceRequest(self, memory, cores, disk):
:param float cores: number of cores being requested
:param int disk: amount of disk space being requested, in bytes
:param str name: Name of the job being checked, for generating a useful error report.
:param str detail: Batch-system-specific message to include in the error.
:raise InsufficientSystemResources: raised when a resource is requested in an amount
greater than allowed
Expand All @@ -270,11 +294,14 @@ def checkResourceRequest(self, memory, cores, disk):
assert disk is not None
assert cores is not None
if cores > self.maxCores:
raise InsufficientSystemResources('cores', cores, self.maxCores)
raise InsufficientSystemResources('cores', cores, self.maxCores,
batchSystem=self.__class__.__name__, name=name, detail=detail)
if memory > self.maxMemory:
raise InsufficientSystemResources('memory', memory, self.maxMemory)
raise InsufficientSystemResources('memory', memory, self.maxMemory,
batchSystem=self.__class__.__name__, name=name, detail=detail)
if disk > self.maxDisk:
raise InsufficientSystemResources('disk', disk, self.maxDisk)
raise InsufficientSystemResources('disk', disk, self.maxDisk,
batchSystem=self.__class__.__name__, name=name, detail=detail)

def setEnv(self, name, value=None):
"""
Expand Down Expand Up @@ -519,7 +546,7 @@ class InsufficientSystemResources(Exception):
To be raised when a job requests more of a particular resource than is either currently allowed
or avaliable
"""
def __init__(self, resource, requested, available):
def __init__(self, resource, requested, available, batchSystem=None, name=None, detail=None):
"""
Creates an instance of this exception that indicates which resource is insufficient for current
demands, as well as the amount requested and amount actually available.
Expand All @@ -530,12 +557,37 @@ def __init__(self, resource, requested, available):
in this exception
:param int|float available: amount of the particular resource actually available
:param str batchSystem: Name of the batch system class complaining, for
generating a useful error report. If you are using a single machine
batch system for local jobs in another batch system, it is important to
know which one has run out of resources.
:param str name: Name of the job being checked, for generating a useful error report.
:param str detail: Batch-system-specific message to include in the error.
"""
self.requested = requested
self.available = available
self.resource = resource
self.batchSystem = batchSystem if batchSystem is not None else 'this batch system'
self.unit = 'bytes of ' if resource == 'disk' or resource == 'memory' else ''
self.name = name
self.detail = detail

def __str__(self):
return 'Requesting more {} than either physically available, or enforced by --max{}. ' \
'Requested: {}, Available: {}'.format(self.resource, self.resource.capitalize(),
self.requested, self.available)
if self.name is not None:
phrases = [('The job {} is requesting {} {}{}, more than '
'the maximum of {} {}{} that {} was configured '
'with.'.format(self.name, self.requested, self.unit, self.resource,
self.available, self.unit, self.resource, self.batchSystem))]
else:
phrases = [('Requesting more {} than either physically available to {}, or enforced by --max{}. '
'Requested: {}, Available: {}'.format(self.resource, self.batchSystem,
self.resource.capitalize(),
self.requested, self.available))]

if self.detail is not None:
phrases.append(self.detail)

return ' '.join(phrases)
53 changes: 32 additions & 21 deletions src/toil/batchSystems/singleMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import division
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import range
from builtins import object
from past.utils import old_div
from contextlib import contextmanager
import logging
Expand Down Expand Up @@ -160,6 +153,9 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
self.memory = ResourcePool(self.maxMemory, 'memory')
# A pool representing the available space in bytes
self.disk = ResourcePool(self.maxDisk, 'disk')

# If we can't schedule something, we fill this in with a reason why
self.schedulingStatusMessage = None

# We use this event to signal shutdown
self.shuttingDown = Event()
Expand All @@ -176,7 +172,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
self.daddyThread = Thread(target=self.daddy, daemon=True)
self.daddyThread.start()
log.debug('Started in normal mode.')

def daddy(self):
"""
Be the "daddy" thread.
Expand Down Expand Up @@ -352,6 +348,22 @@ def _runDebugJob(self, jobCommand, jobID, environment):
self.runningJobs.pop(jobID)
if not info.killIntended:
self.outputQueue.put(UpdatedBatchJobInfo(jobID=jobID, exitStatus=0, wallTime=time.time() - info.time, exitReason=None))

def getSchedulingStatusMessage(self):
# Implement the abstractBatchSystem's scheduling status message API
return self.schedulingStatusMessage

def _setSchedulingStatusMessage(self, message):
"""
If we can't run a job, we record a short message about why not. If the
leader wants to know what is up with us (for example, to diagnose a
deadlock), it can ask us for the message.
"""

self.schedulingStatusMessage = message

# Report the message in the debug log too.
log.debug(message)

def _startChild(self, jobCommand, jobID, coreFractions, jobMemory, jobDisk, environment):
"""
Expand Down Expand Up @@ -423,13 +435,13 @@ def _startChild(self, jobCommand, jobID, coreFractions, jobMemory, jobDisk, envi
# We can't get disk, so free cores and memory
self.coreFractions.release(coreFractions)
self.memory.release(jobMemory)
log.debug('Not enough disk to run job %s', jobID)
self._setSchedulingStatusMessage('Not enough disk to run job %s' % jobID)
else:
# Free cores, since we can't get memory
self.coreFractions.release(coreFractions)
log.debug('Not enough memory to run job %s', jobID)
self._setSchedulingStatusMessage('Not enough memory to run job %s' % jobID)
else:
log.debug('Not enough cores to run job %s', jobID)
self._setSchedulingStatusMessage('Not enough cores to run job %s' % jobID)

# If we get here, we didn't succeed or fail starting the job.
# We didn't manage to get the resources.
Expand Down Expand Up @@ -481,16 +493,15 @@ def issueBatchJob(self, jobNode):

self._checkOnDaddy()

# Round cores to minCores and apply scale
cores = math.ceil(jobNode.cores * self.scale / self.minCores) * self.minCores
assert cores <= self.maxCores, ('The job {} is requesting {} cores, more than the maximum of '
'{} cores this batch system was configured with. Scale is '
'set to {}.'.format(jobNode.jobName, cores, self.maxCores, self.scale))
assert cores >= self.minCores
assert jobNode.memory <= self.maxMemory, ('The job {} is requesting {} bytes of memory, more than '
'the maximum of {} this batch system was configured '
'with.'.format(jobNode.jobName, jobNode.memory, self.maxMemory))

# Round cores to minCores and apply scale.
# Make sure to give minCores even if asked for 0 cores, or negative or something.
cores = max(math.ceil(jobNode.cores * self.scale / self.minCores) * self.minCores, self.minCores)

# Don't do our own assertions about job size vs. our configured size.
# The abstract batch system can handle it.
self.checkResourceRequest(jobNode.memory, cores, jobNode.disk, name=jobNode.jobName,
detail='Scale is set to {}.'.format(self.scale))

self.checkResourceRequest(jobNode.memory, cores, jobNode.disk)
log.debug("Issuing the command: %s with memory: %i, cores: %i, disk: %i" % (
jobNode.command, jobNode.memory, cores, jobNode.disk))
Expand Down
24 changes: 18 additions & 6 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def __init__(self):
# Parameters to limit service jobs, so preventing deadlock scheduling scenarios
self.maxPreemptableServiceJobs = sys.maxsize
self.maxServiceJobs = sys.maxsize
self.deadlockWait = 60 # Number of seconds to wait before declaring a deadlock
self.deadlockWait = 60 # Number of seconds we must be stuck with all services before declaring a deadlock
self.deadlockCheckInterval = 30 # Minimum polling delay for deadlocks
self.statePollingWait = 1 # Number of seconds to wait before querying job state

# Resource requirements
Expand Down Expand Up @@ -253,6 +254,7 @@ def parseIntList(s):
setOption("maxServiceJobs", int)
setOption("maxPreemptableServiceJobs", int)
setOption("deadlockWait", int)
setOption("deadlockCheckInterval", int)
setOption("statePollingWait", int)

# Resource requirements
Expand Down Expand Up @@ -471,16 +473,26 @@ def _addOptions(addGroupFn, config):
"Allows the specification of the maximum number of service jobs "
"in a cluster. By keeping this limited "
" we can avoid all the nodes being occupied with services, so causing a deadlock")
addOptionFn("--maxServiceJobs", dest="maxServiceJobs", default=None,
addOptionFn("--maxServiceJobs", dest="maxServiceJobs", default=None, type=int,
help=(
"The maximum number of service jobs that can be run concurrently, excluding service jobs running on preemptable nodes. default=%s" % config.maxServiceJobs))
addOptionFn("--maxPreemptableServiceJobs", dest="maxPreemptableServiceJobs", default=None,
addOptionFn("--maxPreemptableServiceJobs", dest="maxPreemptableServiceJobs", default=None, type=int,
help=(
"The maximum number of service jobs that can run concurrently on preemptable nodes. default=%s" % config.maxPreemptableServiceJobs))
addOptionFn("--deadlockWait", dest="deadlockWait", default=None,
addOptionFn("--deadlockWait", dest="deadlockWait", default=None, type=int,
help=(
"The minimum number of seconds to observe the cluster stuck running only the same service jobs before throwing a deadlock exception. default=%s" % config.deadlockWait))
addOptionFn("--statePollingWait", dest="statePollingWait", default=1,
"Time, in seconds, to tolerate the workflow running only the same service "
"jobs, with no jobs to use them, before declaring the workflow to be "
"deadlocked and stopping. default=%s" % config.deadlockWait))
addOptionFn("--deadlockCheckInterval", dest="deadlockCheckInterval", default=None, type=int,
help=(
"Time, in seconds, to wait between checks to see if the workflow is stuck "
"running only service jobs, with no jobs to use them. Should be shorter than "
"--deadlockWait. May need to be increased if the batch system cannot "
"enumerate running jobs quickly enough, or if polling for running jobs is "
"placing an unacceptable load on a shared cluster. default=%s" %
config.deadlockCheckInterval))
addOptionFn("--statePollingWait", dest="statePollingWait", default=1, type=int,
help=("Time, in seconds, to wait before doing a scheduler query for job state. "
"Return cached results if within the waiting period."))

Expand Down
1 change: 0 additions & 1 deletion src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ def jobs(self):
# is in progress.
for tempDir in self._jobDirectories():
for i in os.listdir(tempDir):
logger.warning('Job Dir: %s' % i)
if i.startswith(self.JOB_DIR_PREFIX):
# This is a job instance directory
jobId = self._getJobIdFromDir(os.path.join(tempDir, i))
Expand Down
Loading

0 comments on commit 1ae5547

Please sign in to comment.