Skip to content

Commit

Permalink
Megamerge (#4464)
Browse files Browse the repository at this point in the history
* Update version_template.py

* Update version_template.py

* Catch 'Device or resource busy'

* Catch FileNotFoundError

* added @Retry to _readJobState

* Catch FileNotFoundError

* Allow ue of mesos roles (resolves #4455)

* Dont let --user-space-docker-cmd and --custom-net be mutually exclusive. Otherwise get this error, toil-cwl-runner: error: argument --user-space-docker-cmd: not allowed with argument --custom-net.

* Update src/toil/cwl/cwltoil.py

Co-authored-by: Michael R. Crusoe <[email protected]>

* Update src/toil/cwl/cwltoil.py

Co-authored-by: Michael R. Crusoe <[email protected]>

* Undo unwanted version change

* Document new Mesos options and don't apply sort and high always

---------

Co-authored-by: Lon Blauvelt <[email protected]>
Co-authored-by: Jake Fennick <[email protected]>
Co-authored-by: saime <[email protected]>
Co-authored-by: Brandon Walker <[email protected]>
Co-authored-by: Brandon Walker <[email protected]>
Co-authored-by: Michael R. Crusoe <[email protected]>
  • Loading branch information
7 people authored May 5, 2023
1 parent 86665a2 commit f529973
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 20 deletions.
6 changes: 6 additions & 0 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ levels in toil are based on priority from the logging module:
--mesosEndpoint MESOSENDPOINT
The host and port of the Mesos server separated by a
colon. (default: <leader IP>:5050)
--mesosFrameworkId MESOSFRAMEWORKID
Use a specific Mesos framework ID.
--mesosRole MESOSROLE
Use a Mesos role.
--mesosName MESOSNAME
The Mesos name to use. (default: toil)
--kubernetesHostPath KUBERNETES_HOST_PATH
Path on Kubernetes hosts to use as shared inter-pod temp
directory.
Expand Down
24 changes: 21 additions & 3 deletions src/toil/batchSystems/mesos/batchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):

# Address of the Mesos master in the form host:port where host can be an IP or a hostname
self.mesos_endpoint = config.mesos_endpoint
if config.mesos_role is not None:
self.mesos_role = config.mesos_role
self.mesos_name = config.mesos_name
if config.mesos_framework_id is not None:
self.mesos_framework_id = config.mesos_framework_id

# Written to when Mesos kills tasks, as directed by Toil.
# Jobs must not enter this set until they are removed from runningJobMap.
Expand Down Expand Up @@ -160,7 +165,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):

self.ignoredNodes = set()

self._startDriver()
self._startDriver(config)

def setUserScript(self, userScript):
self.userScript = userScript
Expand Down Expand Up @@ -310,14 +315,18 @@ def _buildExecutor(self):
info.source = pwd.getpwuid(os.getuid()).pw_name
return info

def _startDriver(self):
def _startDriver(self, config):
"""
The Mesos driver thread which handles the scheduler's communication with the Mesos master
"""
framework = addict.Dict()
framework.user = get_user_name() # We must determine the user name ourselves with pymesos
framework.name = "toil"
framework.name = config.mesos_name
framework.principal = framework.name
if config.mesos_role is not None:
framework.roles = config.mesos_role
framework.capabilities = [dict(type='MULTI_ROLE')]

# Make the driver which implements most of the scheduler logic and calls back to us for the user-defined parts.
# Make sure it will call us with nice namespace-y addicts
self.driver = MesosSchedulerDriver(self, framework,
Expand Down Expand Up @@ -839,8 +848,17 @@ def get_default_mesos_endpoint(cls) -> str:
def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None:
parser.add_argument("--mesosEndpoint", "--mesosMaster", dest="mesos_endpoint", default=cls.get_default_mesos_endpoint(),
help="The host and port of the Mesos master separated by colon. (default: %(default)s)")
parser.add_argument("--mesosFrameworkId", dest="mesos_framework_id",
help="Use a specific Mesos framework ID.")
parser.add_argument("--mesosRole", dest="mesos_role",
help="Use a Mesos role.")
parser.add_argument("--mesosName", dest="mesos_name", default="toil",
help="The Mesos name to use. (default: %(default)s)")

@classmethod
def setOptions(cls, setOption: OptionSetter):
setOption("mesos_endpoint", None, None, cls.get_default_mesos_endpoint(), old_names=["mesosMasterAddress"])
setOption("mesos_name", None, None, "toil")
setOption("mesos_role")
setOption("mesos_framework_id")

4 changes: 3 additions & 1 deletion src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
from toil.version import baseVersion
from toil.exceptions import FailedJobsException


logger = logging.getLogger(__name__)

# Find the default temporary directory
Expand Down Expand Up @@ -3269,7 +3270,8 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
help="Do not delete Docker container used by jobs after they exit",
dest="rm_container",
)
dockergroup.add_argument(
extra_dockergroup = parser.add_argument_group()
extra_dockergroup.add_argument(
"--custom-net",
help="Specify docker network name to pass to docker run command",
)
Expand Down
2 changes: 2 additions & 0 deletions src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from toil.lib.compatibility import deprecated
from toil.lib.conversions import bytes2human
from toil.lib.io import make_public_dir, robust_rmtree
from toil.lib.retry import retry
from toil.lib.threading import get_process_name, process_name_exists

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -301,6 +302,7 @@ def _getAllJobStates(cls, coordination_dir: str) -> Iterator[Dict[str, str]]:
raise

@staticmethod
@retry(errors=[OSError])
def _readJobState(jobStateFileName: str) -> Dict[str, str]:
with open(jobStateFileName, 'rb') as fH:
state = dill.load(fH)
Expand Down
15 changes: 15 additions & 0 deletions src/toil/lib/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def robust_rmtree(path: Union[str, bytes]) -> None:
except FileNotFoundError:
# Directory went away
return
except OSError as exc:
if exc.errno == 16:
# 'Device or resource busy'
return
raise

# We assume the directory going away while we have it open won't upset
# the listdir iterator.
Expand All @@ -54,6 +59,11 @@ def robust_rmtree(path: Union[str, bytes]) -> None:
except FileNotFoundError:
# Directory went away
return
except OSError as exc:
if exc.errno == 16:
# 'Device or resource busy'
return
raise

else:
# It is not or was not a directory.
Expand All @@ -63,6 +73,11 @@ def robust_rmtree(path: Union[str, bytes]) -> None:
except FileNotFoundError:
# File went away
return
except OSError as exc:
if exc.errno == 16:
# 'Device or resource busy'
return
raise


def atomic_tmp_file(final_path: str) -> str:
Expand Down
37 changes: 21 additions & 16 deletions src/toil/lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,18 @@ def global_mutex(base_dir: str, mutex: str) -> Iterator[None]:
# get a lock on the deleted file.

while True:
# Try to create the file, ignoring if it exists or not.
fd = os.open(lock_filename, os.O_CREAT | os.O_WRONLY)
fd = -1

# Wait until we can exclusively lock it.
fcntl.lockf(fd, fcntl.LOCK_EX)

# Holding the lock, make sure we are looking at the same file on disk still.
fd_stats = os.fstat(fd)
try:
# Try to create the file, ignoring if it exists or not.
fd = os.open(lock_filename, os.O_CREAT | os.O_WRONLY)

# Wait until we can exclusively lock it.
fcntl.lockf(fd, fcntl.LOCK_EX)

# Holding the lock, make sure we are looking at the same file on disk still.
fd_stats = os.fstat(fd)

path_stats: Optional[os.stat_result] = os.stat(lock_filename)
except FileNotFoundError:
path_stats = None
Expand All @@ -386,9 +389,10 @@ def global_mutex(base_dir: str, mutex: str) -> Iterator[None]:
# any). This usually happens, because before someone releases a
# lock, they delete the file. Go back and contend again. TODO: This
# allows a lot of queue jumping on our mutex.
fcntl.lockf(fd, fcntl.LOCK_UN)
os.close(fd)
continue
if fd != -1:
fcntl.lockf(fd, fcntl.LOCK_UN)
os.close(fd)
continue
else:
# We have a lock on the file that the name points to. Since we
# hold the lock, nobody will be deleting it or can be in the
Expand All @@ -404,12 +408,13 @@ def global_mutex(base_dir: str, mutex: str) -> Iterator[None]:
# under someone else who thinks they are holding it.
logger.debug('PID %d releasing mutex %s', os.getpid(), lock_filename)
os.unlink(lock_filename)
fcntl.lockf(fd, fcntl.LOCK_UN)
# Note that we are unlinking it and then unlocking it; a lot of people
# might have opened it before we unlinked it and will wake up when they
# get the worthless lock on the now-unlinked file. We have to do some
# stat gymnastics above to work around this.
os.close(fd)
if fd != -1:
fcntl.lockf(fd, fcntl.LOCK_UN)
# Note that we are unlinking it and then unlocking it; a lot of people
# might have opened it before we unlinked it and will wake up when they
# get the worthless lock on the now-unlinked file. We have to do some
# stat gymnastics above to work around this.
os.close(fd)


class LastProcessStandingArena:
Expand Down

0 comments on commit f529973

Please sign in to comment.