From d7102135477650f3e9c0d55675ae59b3daecf4c7 Mon Sep 17 00:00:00 2001 From: Lon Blauvelt Date: Wed, 15 Nov 2023 21:29:41 -0800 Subject: [PATCH] Remove the parasol batch system. (#4678) Co-authored-by: Adam Novak --- attic/README.md | 15 +- contrib/admin/mypy-with-ignore.py | 1 - docs/contributing/contributing.rst | 6 +- docs/index.rst | 1 - docs/running/cliOptions.rst | 12 +- docs/running/hpcEnvironments.rst | 2 +- docs/running/introduction.rst | 4 +- setup.cfg | 1 - src/toil/batchSystems/parasol.py | 378 ------------------ src/toil/batchSystems/registry.py | 6 - src/toil/common.py | 2 - src/toil/test/__init__.py | 8 - src/toil/test/batchSystems/batchSystemTest.py | 83 ---- .../test/batchSystems/parasolTestSupport.py | 117 ------ src/toil/test/cwl/cwlTest.py | 13 - src/toil/test/sort/sortTest.py | 13 +- 16 files changed, 15 insertions(+), 647 deletions(-) delete mode 100644 src/toil/batchSystems/parasol.py delete mode 100644 src/toil/test/batchSystems/parasolTestSupport.py diff --git a/attic/README.md b/attic/README.md index 9db7984d1e..18e51af984 100644 --- a/attic/README.md +++ b/attic/README.md @@ -1,5 +1,5 @@ #Toil -Python based pipeline management software for clusters that makes running recursive and dynamically scheduled computations straightforward. So far works with gridEngine, lsf, parasol and on multi-core machines. +Python based pipeline management software for clusters that makes running recursive and dynamically scheduled computations straightforward. So far works with gridEngine, lsf, and on multi-core machines. ##Authors [Benedict Paten](https://github.com/benedictpaten/), [Dent Earl](https://github.com/dentearl/), [Daniel Zerbino](https://github.com/dzserbino/), [Glenn Hickey](https://github.com/glennhickey/), other UCSC people. @@ -28,9 +28,9 @@ The following walks through running a toil script and using the command-line too Once toil is installed, running a toil script is performed by executing the script from the command-line, e.g. (using the file sorting toy example in **tests/sort/scriptTreeTest_Sort.py**): -[]$ scriptTreeTest_Sort.py --fileToSort foo --toil bar/toil --batchSystem parasol --logLevel INFO --stats +[]$ scriptTreeTest_Sort.py --fileToSort foo --toil bar/toil --batchSystem slurm --logLevel INFO --stats -Which in this case uses the parasol batch system, and INFO level logging and where foo is the file to sort and bar/toil is the location of a directory (which should not already exist) from which the batch will be managed. Details of the toil options are described below; the stats option is used to gather statistics about the jobs in a run. +Which in this case uses the slurm batch system, and INFO level logging and where foo is the file to sort and bar/toil is the location of a directory (which should not already exist) from which the batch will be managed. Details of the toil options are described below; the stats option is used to gather statistics about the jobs in a run. The script will return a zero exit value if the toil system is successfully able to run to completion, else it will create an exception. If the script fails because a job failed then the log file information of the job will be reported to std error. The toil directory (here 'bar/toil') is not automatically deleted regardless of success or failure, and contains a record of the jobs run, which can be enquired about using the **toilStatus** command. e.g. @@ -150,8 +150,7 @@ The important arguments to **toilStats** are: --batchSystem=BATCHSYSTEM The type of batch system to run the job(s) with, - currently can be - 'singleMachine'/'parasol'/'acidTest'/'gridEngine'/'lsf'. + currently can be 'singleMachine'/'gridEngine'/'lsf'. default=singleMachine --maxThreads=MAXTHREADS The maximum number of threads (technically processes @@ -159,8 +158,6 @@ The important arguments to **toilStats** are: mode. Increasing this will allow more jobs to run concurrently when running on a single machine. default=4 - --parasolCommand=PARASOLCOMMAND - The command to run the parasol program default=parasol Options to specify default cpu/memory requirements (if not specified by the jobs themselves), and to limit the total amount of @@ -202,7 +199,7 @@ The important arguments to **toilStats** are: --bigBatchSystem=BIGBATCHSYSTEM The batch system to run for jobs with larger memory/cpus requests, currently can be - 'singleMachine'/'parasol'/'acidTest'/'gridEngine'. + 'singleMachine'/'gridEngine'. default=none --bigMemoryThreshold=BIGMEMORYTHRESHOLD The memory threshold above which to submit to the big @@ -240,7 +237,7 @@ The important arguments to **toilStats** are: The following sections are for people creating toil scripts and as general information. The presentation **[docs/toilSlides.pdf](https://github.com/benedictpaten/toil/blob/master/doc/toilSlides.pdf)** is also a quite useful, albeit slightly out of date, guide to using toil. - -Most batch systems (such as LSF, Parasol, etc.) do not allow jobs to spawn +Most batch systems (such as LSF) do not allow jobs to spawn other jobs in a simple way. The basic pattern provided by toil is as follows: diff --git a/contrib/admin/mypy-with-ignore.py b/contrib/admin/mypy-with-ignore.py index cab39a6e6f..01f57150e0 100755 --- a/contrib/admin/mypy-with-ignore.py +++ b/contrib/admin/mypy-with-ignore.py @@ -51,7 +51,6 @@ def main(): 'src/toil/batchSystems/slurm.py', 'src/toil/batchSystems/gridengine.py', 'src/toil/batchSystems/singleMachine.py', - 'src/toil/batchSystems/parasol.py', 'src/toil/batchSystems/torque.py', 'src/toil/batchSystems/options.py', 'src/toil/batchSystems/registry.py', diff --git a/docs/contributing/contributing.rst b/docs/contributing/contributing.rst index 8868c68a1f..6240d16f5a 100644 --- a/docs/contributing/contributing.rst +++ b/docs/contributing/contributing.rst @@ -52,10 +52,10 @@ depend on a currently installed *feature*, use This will run only the tests that don't depend on the ``aws`` extra, even if that extra is currently installed. Note the distinction between the terms *feature* and *extra*. Every extra is a feature but there are features that are -not extras, such as the ``gridengine`` and ``parasol`` features. To skip tests -involving both the ``parasol`` feature and the ``aws`` extra, use the following:: +not extras, such as the ``gridengine`` feature. To skip tests +involving both the ``gridengine`` feature and the ``aws`` extra, use the following:: - $ make test tests="-m 'not aws and not parasol' src" + $ make test tests="-m 'not aws and not gridengine' src" diff --git a/docs/index.rst b/docs/index.rst index 70134d36a0..071055a5e0 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -15,7 +15,6 @@ If using Toil for your research, please cite .. _website: http://toil.ucsc-cgl.org/ .. _announce: https://groups.google.com/forum/#!forum/toil-announce .. _GridEngine: http://gridscheduler.sourceforge.net/ -.. _Parasol: http://genecats.soe.ucsc.edu/eng/parasol.html .. _Apache Mesos: http://mesos.apache.org/ .. _spot market: https://aws.amazon.com/ec2/spot/ .. _Amazon Web Services: https://aws.amazon.com/ diff --git a/docs/running/cliOptions.rst b/docs/running/cliOptions.rst index 348947c2be..2e7324ae24 100644 --- a/docs/running/cliOptions.rst +++ b/docs/running/cliOptions.rst @@ -154,7 +154,7 @@ levels in toil are based on priority from the logging module: --batchSystem BATCHSYSTEM The type of batch system to run the job(s) with, - currently can be one of aws_batch, parasol, single_machine, + currently can be one of aws_batch, single_machine, grid_engine, lsf, mesos, slurm, tes, torque, htcondor, kubernetes. (default: single_machine) --disableAutoDeployment @@ -194,14 +194,6 @@ levels in toil are based on priority from the logging module: unset, the Toil work directory will be used. Only works for grid engine batch systems such as gridengine, htcondor, torque, slurm, and lsf. - --parasolCommand PARASOLCOMMAND - The name or path of the parasol program. Will be - looked up on PATH unless it starts with a - slash. (default: parasol) - --parasolMaxBatches PARASOLMAXBATCHES - Maximum number of job batches the Parasol batch is - allowed to create. One batch is created for jobs with - a unique set of resource requirements. (default: 1000) --mesosEndpoint MESOSENDPOINT The host and port of the Mesos server separated by a colon. (default: :5050) @@ -254,7 +246,7 @@ Allows configuring Toil's data storage. filesystem-based job stores only. (Default=False) --caching BOOL Set caching options. This must be set to "false" to use a batch system that does not support - cleanup, such as Parasol. Set to "true" if caching + cleanup. Set to "true" if caching is desired. **Autoscaling Options** diff --git a/docs/running/hpcEnvironments.rst b/docs/running/hpcEnvironments.rst index 351318ef70..0166d448c2 100644 --- a/docs/running/hpcEnvironments.rst +++ b/docs/running/hpcEnvironments.rst @@ -51,7 +51,7 @@ Then make sure to log out and back in again for the setting to take effect. Standard Output/Error from Batch System Jobs -------------------------------------------- -Standard output and error from batch system jobs (except for the Parasol and Mesos batch systems) are redirected to files in the ``toil-`` directory created within the temporary directory specified by the ``--workDir`` option; see :ref:`optionsRef`. +Standard output and error from batch system jobs (except for the Mesos batch system) are redirected to files in the ``toil-`` directory created within the temporary directory specified by the ``--workDir`` option; see :ref:`optionsRef`. Each file is named as follows: ``toil_job__batch___.log``, where ```` is ``std_output`` for standard output, and ``std_error`` for standard error. HTCondor will also write job event log files with `` = job_events``. diff --git a/docs/running/introduction.rst b/docs/running/introduction.rst index fa781a59e3..cca88a7016 100644 --- a/docs/running/introduction.rst +++ b/docs/running/introduction.rst @@ -12,7 +12,7 @@ Toil is built in a modular way so that it can be used on lots of different syste The three configurable pieces are the - :ref:`jobStoreInterface`: A filepath or url that can host and centralize all files for a workflow (e.g. a local folder, or an AWS s3 bucket url). - - :ref:`batchSystemInterface`: Specifies either a local single-machine or a currently supported HPC environment (lsf, parasol, mesos, slurm, torque, htcondor, kubernetes, or grid_engine). + - :ref:`batchSystemInterface`: Specifies either a local single-machine or a currently supported HPC environment (lsf, mesos, slurm, torque, htcondor, kubernetes, or grid_engine). - :ref:`provisionerOverview`: For running in the cloud only. This specifies which cloud provider provides instances to do the "work" of your workflow. .. _jobStoreOverview: @@ -53,7 +53,7 @@ Batch System ------------ A Toil batch system is either a local single-machine (one computer) or a -currently supported cluster of computers (lsf, parasol, mesos, slurm, torque, +currently supported cluster of computers (lsf, mesos, slurm, torque, htcondor, or grid_engine) These environments manage individual worker nodes under a leader node to process the work required in a workflow. The leader and its workers all coordinate their tasks and files through a centralized job diff --git a/setup.cfg b/setup.cfg index f603a662d0..3f23ac2791 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,7 +28,6 @@ markers = local_cuda lsf mesos - parasol rsync server_mode slow diff --git a/src/toil/batchSystems/parasol.py b/src/toil/batchSystems/parasol.py deleted file mode 100644 index c329b85bba..0000000000 --- a/src/toil/batchSystems/parasol.py +++ /dev/null @@ -1,378 +0,0 @@ -# Copyright (C) 2015-2021 Regents of the University of California -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os -import re -import subprocess -import tempfile -import time -from argparse import ArgumentParser, _ArgumentGroup -from queue import Empty, Queue -from shutil import which -from threading import Thread -from typing import Dict, Optional, Union - -from toil.batchSystems.abstractBatchSystem import (BatchSystemSupport, - UpdatedBatchJobInfo) -from toil.batchSystems.options import OptionSetter -from toil.common import SYS_MAX_SIZE, Toil, make_open_interval_action -from toil.lib.iterables import concat -from toil.test import get_temp_file - -logger = logging.getLogger(__name__) - - -class ParasolBatchSystem(BatchSystemSupport): - """ - The interface for Parasol. - """ - - @classmethod - def supportsWorkerCleanup(cls): - return False - - @classmethod - def supportsAutoDeployment(cls): - return False - - def __init__(self, config, maxCores, maxMemory, maxDisk): - super().__init__(config, maxCores, maxMemory, maxDisk) - if maxMemory != SYS_MAX_SIZE: - logger.warning('The Parasol batch system does not support maxMemory.') - # Keep the name of the results file for the pstat2 command.. - command = config.parasolCommand - if os.path.sep not in command: - try: - command = which(command) - except StopIteration: - raise RuntimeError("Can't find %s on PATH." % command) - logger.debug('Using Parasol at %s', command) - self.parasolCommand = command - jobStoreType, path = Toil.parseLocator(config.jobStore) - if jobStoreType != 'file': - raise RuntimeError("The parasol batch system doesn't currently work with any " - "jobStore type except file jobStores.") - self.parasolResultsDir = tempfile.mkdtemp(dir=os.path.abspath(path)) - logger.debug("Using parasol results dir: %s", self.parasolResultsDir) - - # In Parasol, each results file corresponds to a separate batch, and all jobs in a batch - # have the same cpu and memory requirements. The keys to this dictionary are the (cpu, - # memory) tuples for each batch. A new batch is created whenever a job has a new unique - # combination of cpu and memory requirements. - self.resultsFiles = dict() - self.maxBatches = config.parasolMaxBatches - - # Allows the worker process to send back the IDs of jobs that have finished, so the batch - # system can decrease its used cpus counter - self.cpuUsageQueue = Queue() - - # Also stores finished job IDs, but is read by getUpdatedJobIDs(). - self.updatedJobsQueue = Queue() - - # Use this to stop the worker when shutting down - self.running = True - - self.worker = Thread(target=self.updatedJobWorker, args=()) - self.worker.start() - self.usedCpus = 0 - self.jobIDsToCpu = {} - - # Set of jobs that have been issued but aren't known to have finished or been killed yet. - # Jobs that end by themselves are removed in getUpdatedJob, and jobs that are killed are - # removed in killBatchJobs. - self.runningJobs = set() - - def _runParasol(self, command, autoRetry=True): - """ - Issue a parasol command using popen to capture the output. - - If the command fails then it will try pinging parasol until it gets a response. - When it gets a response it will recursively call the issue parasol command, - repeating this pattern for a maximum of N times. The final exit value will reflect this. - """ - command = list(concat(self.parasolCommand, command)) - while True: - logger.debug('Running %r', command) - process = subprocess.Popen(command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=-1) - stdout, stderr = process.communicate() - status = process.wait() - for line in stderr.decode('utf-8').split('\n'): - if line: logger.warning(line) - if status == 0: - return 0, stdout.decode('utf-8').split('\n') - message = 'Command %r failed with exit status %i' % (command, status) - if autoRetry: - logger.warning(message) - else: - logger.error(message) - return status, None - logger.warning('Waiting for a 10s, before trying again') - time.sleep(10) - - parasolOutputPattern = re.compile("your job ([0-9]+).*") - - def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None): - """Issue parasol with job commands.""" - self.check_resource_request(jobDesc) - - MiB = 1 << 20 - truncatedMemory = jobDesc.memory // MiB * MiB - # Look for a batch for jobs with these resource requirements, with - # the memory rounded down to the nearest megabyte. Rounding down - # meams the new job can't ever decrease the memory requirements - # of jobs already in the batch. - if len(self.resultsFiles) >= self.maxBatches: - raise RuntimeError('Number of batches reached limit of %i' % self.maxBatches) - try: - results = self.resultsFiles[(truncatedMemory, jobDesc.cores)] - except KeyError: - results = get_temp_file(rootDir=self.parasolResultsDir) - self.resultsFiles[(truncatedMemory, jobDesc.cores)] = results - - # Prefix the command with environment overrides, optionally looking them up from the - # current environment if the value is None - command = ' '.join(concat('env', self.__environment(job_environment), jobDesc.command)) - parasolCommand = ['-verbose', - '-ram=%i' % jobDesc.memory, - '-cpu=%i' % jobDesc.cores, - '-results=' + results, - 'add', 'job', command] - # Deal with the cpus - self.usedCpus += jobDesc.cores - while True: # Process finished results with no wait - try: - jobID = self.cpuUsageQueue.get_nowait() - except Empty: - break - if jobID in list(self.jobIDsToCpu.keys()): - self.usedCpus -= self.jobIDsToCpu.pop(jobID) - assert self.usedCpus >= 0 - while self.usedCpus > self.maxCores: # If we are still waiting - jobID = self.cpuUsageQueue.get() - if jobID in list(self.jobIDsToCpu.keys()): - self.usedCpus -= self.jobIDsToCpu.pop(jobID) - assert self.usedCpus >= 0 - # Now keep going - while True: - line = self._runParasol(parasolCommand)[1][0] - match = self.parasolOutputPattern.match(line) - if match is None: - # This is because parasol add job will return success, even if the job was not - # properly issued! - logger.debug('We failed to properly add the job, we will try again after a 5s.') - time.sleep(5) - else: - jobID = int(match.group(1)) - self.jobIDsToCpu[jobID] = jobDesc.cores - self.runningJobs.add(jobID) - logger.debug(f"Got the parasol job id: {jobID} from line: {line}") - return jobID - - def setEnv(self, name, value=None): - if value and ' ' in value: - raise ValueError('Parasol does not support spaces in environment variable values.') - return super().setEnv(name, value) - - def __environment(self, job_environment: Optional[Dict[str, str]] = None): - environment = self.environment.copy() - if job_environment: - environment.update(job_environment) - - return (k + '=' + (os.environ[k] if v is None else v) for k, v in list(environment.items())) - - def killBatchJobs(self, jobIDs): - """Kills the given jobs, represented as Job ids, then checks they are dead by checking - they are not in the list of issued jobs. - """ - while True: - for jobID in jobIDs: - if jobID in self.runningJobs: - self.runningJobs.remove(jobID) - exitValue = self._runParasol(['remove', 'job', str(jobID)], - autoRetry=False)[0] - logger.debug("Tried to remove jobID: %i, with exit value: %i" % (jobID, exitValue)) - runningJobs = self.getIssuedBatchJobIDs() - if set(jobIDs).difference(set(runningJobs)) == set(jobIDs): - break - logger.warning('Tried to kill some jobs, but something happened and they are still ' - 'going, will try again in 5s.') - time.sleep(5) - # Update the CPU usage, because killed jobs aren't written to the results file. - for jobID in jobIDs: - if jobID in list(self.jobIDsToCpu.keys()): - self.usedCpus -= self.jobIDsToCpu.pop(jobID) - - runningPattern = re.compile(r'r\s+([0-9]+)\s+[\S]+\s+[\S]+\s+([0-9]+)\s+[\S]+') - - def getJobIDsForResultsFile(self, resultsFile): - """Get all queued and running jobs for a results file.""" - jobIDs = [] - for line in self._runParasol(['-extended', 'list', 'jobs'])[1]: - fields = line.strip().split() - if len(fields) == 0 or fields[-1] != resultsFile: - continue - jobID = fields[0] - jobIDs.append(int(jobID)) - return set(jobIDs) - - def getIssuedBatchJobIDs(self): - """ - Gets the list of jobs issued to parasol in all results files, but not including jobs - created by other users. - """ - issuedJobs = set() - for resultsFile in self.resultsFiles.values(): - issuedJobs.update(self.getJobIDsForResultsFile(resultsFile)) - - return list(issuedJobs) - - def getRunningBatchJobIDs(self): - """Returns map of running jobIDs and the time they have been running.""" - # Example lines.. - # r 5410186 benedictpaten worker 1247029663 localhost - # r 5410324 benedictpaten worker 1247030076 localhost - runningJobs = {} - issuedJobs = self.getIssuedBatchJobIDs() - for line in self._runParasol(['pstat2'])[1]: - if line != '': - match = self.runningPattern.match(line) - if match is not None: - jobID = int(match.group(1)) - startTime = int(match.group(2)) - if jobID in issuedJobs: # It's one of our jobs - runningJobs[jobID] = time.time() - startTime - return runningJobs - - def getUpdatedBatchJob(self, maxWait): - while True: - try: - item = self.updatedJobsQueue.get(timeout=maxWait) - except Empty: - return None - try: - self.runningJobs.remove(item.jobID) - except KeyError: - # We tried to kill this job, but it ended by itself instead, so skip it. - pass - else: - return item - - def updatedJobWorker(self): - """ - We use the parasol results to update the status of jobs, adding them - to the list of updated jobs. - - Results have the following structure.. (thanks Mark D!) - - ==================== ============================================= - int status; Job status - wait() return format. 0 is good. - char host; Machine job ran on. - char jobId; Job queuing system job ID - char exe; Job executable file (no path) - int usrTicks; 'User' CPU time in ticks. - int sysTicks; 'System' CPU time in ticks. - unsigned submitTime; Job submission time in seconds since 1/1/1970 - unsigned startTime; Job start time in seconds since 1/1/1970 - unsigned endTime; Job end time in seconds since 1/1/1970 - char user; User who ran job - char errFile; Location of stderr file on host - ==================== ============================================= - - Plus you finally have the command name. - """ - resultsFiles = set() - resultsFileHandles = [] - try: - while self.running: - # Look for any new results files that have been created, and open them - newResultsFiles = set(os.listdir(self.parasolResultsDir)).difference(resultsFiles) - for newFile in newResultsFiles: - newFilePath = os.path.join(self.parasolResultsDir, newFile) - resultsFileHandles.append(open(newFilePath)) - resultsFiles.add(newFile) - for fileHandle in resultsFileHandles: - while self.running: - line = fileHandle.readline() - if not line: - break - assert line[-1] == '\n' - (status, host, jobId, exe, usrTicks, sysTicks, submitTime, startTime, - endTime, user, errFile, command) = line[:-1].split(None, 11) - status = int(status) - jobId = int(jobId) - if os.WIFEXITED(status): - status = os.WEXITSTATUS(status) - else: - status = -status - self.cpuUsageQueue.put(jobId) - startTime = int(startTime) - endTime = int(endTime) - if endTime == startTime: - # Both, start and end time is an integer so to get sub-second - # accuracy we use the ticks reported by Parasol as an approximation. - # This isn't documented but what Parasol calls "ticks" is actually a - # hundredth of a second. Parasol does the unit conversion early on - # after a job finished. Search paraNode.c for ticksToHundreths. We - # also cheat a little by always reporting at least one hundredth of a - # second. - usrTicks = int(usrTicks) - sysTicks = int(sysTicks) - wallTime = float(max(1, usrTicks + sysTicks)) * 0.01 - else: - wallTime = float(endTime - startTime) - self.updatedJobsQueue.put(UpdatedBatchJobInfo(jobID=jobId, exitStatus=status, wallTime=wallTime, exitReason=None)) - time.sleep(1) - except: - logger.warning("Error occurred while parsing parasol results files.") - raise - finally: - for fileHandle in resultsFileHandles: - fileHandle.close() - - def shutdown(self) -> None: - self.killBatchJobs(self.getIssuedBatchJobIDs()) # cleanup jobs - for results in self.resultsFiles.values(): - exitValue = self._runParasol(['-results=' + results, 'clear', 'sick'], - autoRetry=False)[0] - if exitValue is not None: - logger.warning("Could not clear sick status of the parasol batch %s" % results) - exitValue = self._runParasol(['-results=' + results, 'flushResults'], - autoRetry=False)[0] - if exitValue is not None: - logger.warning("Could not flush the parasol batch %s" % results) - self.running = False - logger.debug('Joining worker thread...') - self.worker.join() - logger.debug('... joined worker thread.') - for results in list(self.resultsFiles.values()): - os.remove(results) - os.rmdir(self.parasolResultsDir) - - @classmethod - def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - parser.add_argument("--parasol_command", "--parasolCommand", dest="parasolCommand", default='parasol', - help="The name or path of the parasol program. Will be looked up on PATH " - "unless it starts with a slash. (default: %(default)s).") - parser.add_argument("--parasol_max_batches", "--parasolMaxBatches", dest="parasolMaxBatches", default=1000, type=int, action=make_open_interval_action(1), - help="Maximum number of job batches the Parasol batch is allowed to create. One batch is " - "created for jobs with a a unique set of resource requirements. (default: %(default)s).") - - @classmethod - def setOptions(cls, setOption: OptionSetter): - setOption("parasolCommand") - setOption("parasolMaxBatches") diff --git a/src/toil/batchSystems/registry.py b/src/toil/batchSystems/registry.py index ca794d155d..93ffda64b1 100644 --- a/src/toil/batchSystems/registry.py +++ b/src/toil/batchSystems/registry.py @@ -73,11 +73,6 @@ def gridengine_batch_system_factory(): return GridEngineBatchSystem -def parasol_batch_system_factory(): - from toil.batchSystems.parasol import ParasolBatchSystem - return ParasolBatchSystem - - def lsf_batch_system_factory(): from toil.batchSystems.lsf import LSFBatchSystem return LSFBatchSystem @@ -117,7 +112,6 @@ def kubernetes_batch_system_factory(): _registry: Dict[str, Callable[[], Type["AbstractBatchSystem"]]] = { 'aws_batch' : aws_batch_batch_system_factory, - 'parasol' : parasol_batch_system_factory, 'single_machine' : single_machine_batch_system_factory, 'grid_engine' : gridengine_batch_system_factory, 'lsf' : lsf_batch_system_factory, diff --git a/src/toil/common.py b/src/toil/common.py index c091636558..db6b8fb2b6 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -150,8 +150,6 @@ class Config: manualMemArgs: bool run_local_jobs_on_workers: bool coalesceStatusCalls: bool - parasolCommand: str - parasolMaxBatches: int mesos_endpoint: Optional[str] mesos_framework_id: Optional[str] mesos_role: Optional[str] diff --git a/src/toil/test/__init__.py b/src/toil/test/__init__.py index 4629d24efb..d46c4910d3 100644 --- a/src/toil/test/__init__.py +++ b/src/toil/test/__init__.py @@ -490,14 +490,6 @@ def needs_mesos(test_item: MT) -> MT: return test_item -def needs_parasol(test_item: MT) -> MT: - """Use as decorator so tests are only run if Parasol is installed.""" - test_item = _mark_test('parasol', test_item) - if which('parasol'): - return test_item - return unittest.skip("Install Parasol to include this test.")(test_item) - - def needs_slurm(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if Slurm is installed.""" test_item = _mark_test('slurm', test_item) diff --git a/src/toil/test/batchSystems/batchSystemTest.py b/src/toil/test/batchSystems/batchSystemTest.py index 1bdcff51b4..47891762f5 100644 --- a/src/toil/test/batchSystems/batchSystemTest.py +++ b/src/toil/test/batchSystems/batchSystemTest.py @@ -31,7 +31,6 @@ # in order to import properly. Import them later, in tests # protected by annotations. from toil.batchSystems.mesos.test import MesosTestSupport -from toil.batchSystems.parasol import ParasolBatchSystem from toil.batchSystems.registry import (add_batch_system_factory, get_batch_system, get_batch_systems, @@ -52,11 +51,9 @@ needs_kubernetes_installed, needs_lsf, needs_mesos, - needs_parasol, needs_slurm, needs_torque, slow) -from toil.test.batchSystems.parasolTestSupport import ParasolTestSupport logger = logging.getLogger(__name__) @@ -247,10 +244,6 @@ def test_run_jobs(self): self.batchSystem.killBatchJobs([10]) def test_set_env(self): - # Parasol disobeys shell rules and splits the command at the space - # character into arguments before exec'ing it, whether the space is - # quoted, escaped or not. - # Start with a relatively safe script script_shell = 'if [ "x${FOO}" == "xbar" ] ; then exit 23 ; else exit 42 ; fi' @@ -948,82 +941,6 @@ def stop(self, fileStore): subprocess.check_call(self.cmd + ' -1', shell=True) -@slow -@needs_parasol -class ParasolBatchSystemTest(hidden.AbstractBatchSystemTest, ParasolTestSupport): - """ - Tests the Parasol batch system - """ - - def supportsWallTime(self): - return True - - def _createConfig(self): - config = super()._createConfig() - # can't use _getTestJobStorePath since that method removes the directory - config.jobStore = self._createTempDir('jobStore') - return config - - def createBatchSystem(self) -> AbstractBatchSystem: - memory = int(3e9) - self._startParasol(numCores=numCores, memory=memory) - - return ParasolBatchSystem(config=self.config, - maxCores=numCores, - maxMemory=memory, - maxDisk=1001) - - def tearDown(self): - super().tearDown() - self._stopParasol() - - def testBatchResourceLimits(self): - jobDesc1 = JobDescription(command="sleep 1000", - requirements=dict(memory=1 << 30, cores=1, - disk=1000, accelerators=[], - preemptible=preemptible), - jobName='testResourceLimits') - job1 = self.batchSystem.issueBatchJob(jobDesc1) - self.assertIsNotNone(job1) - jobDesc2 = JobDescription(command="sleep 1000", - requirements=dict(memory=2 << 30, cores=1, - disk=1000, accelerators=[], - preemptible=preemptible), - jobName='testResourceLimits') - job2 = self.batchSystem.issueBatchJob(jobDesc2) - self.assertIsNotNone(job2) - batches = self._getBatchList() - self.assertEqual(len(batches), 2) - # It would be better to directly check that the batches have the correct memory and cpu - # values, but Parasol seems to slightly change the values sometimes. - self.assertNotEqual(batches[0]['ram'], batches[1]['ram']) - # Need to kill one of the jobs because there are only two cores available - self.batchSystem.killBatchJobs([job2]) - job3 = self.batchSystem.issueBatchJob(jobDesc1) - self.assertIsNotNone(job3) - batches = self._getBatchList() - self.assertEqual(len(batches), 1) - - def _parseBatchString(self, batchString): - import re - batchInfo = dict() - memPattern = re.compile(r"(\d+\.\d+)([kgmbt])") - items = batchString.split() - batchInfo["cores"] = int(items[7]) - memMatch = memPattern.match(items[8]) - ramValue = float(memMatch.group(1)) - ramUnits = memMatch.group(2) - ramConversion = {'b': 1e0, 'k': 1e3, 'm': 1e6, 'g': 1e9, 't': 1e12} - batchInfo["ram"] = ramValue * ramConversion[ramUnits] - return batchInfo - - def _getBatchList(self): - # noinspection PyUnresolvedReferences - exitStatus, batchLines = self.batchSystem._runParasol(['list', 'batches']) - self.assertEqual(exitStatus, 0) - return [self._parseBatchString(line) for line in batchLines[1:] if line] - - @slow @needs_gridengine class GridEngineBatchSystemTest(hidden.AbstractGridEngineBatchSystemTest): diff --git a/src/toil/test/batchSystems/parasolTestSupport.py b/src/toil/test/batchSystems/parasolTestSupport.py deleted file mode 100644 index 2f0d0e0ba2..0000000000 --- a/src/toil/test/batchSystems/parasolTestSupport.py +++ /dev/null @@ -1,117 +0,0 @@ -# Copyright (C) 2015-2021 Regents of the University of California -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -import os -import signal -import subprocess -import tempfile -import threading -import time - -from toil import physicalMemory -from toil.lib.objects import InnerClass -from toil.lib.threading import cpu_count - -log = logging.getLogger(__name__) - - -class ParasolTestSupport: - """ - For test cases that need a running Parasol leader and worker on the local host - """ - - def _startParasol(self, numCores=None, memory=None): - if numCores is None: - numCores = cpu_count() - if memory is None: - memory = physicalMemory() - self.numCores = numCores - self.memory = memory - self.leader = self.ParasolLeaderThread() - self.leader.start() - self.worker = self.ParasolWorkerThread() - self.worker.start() - while self.leader.popen is None or self.worker.popen is None: - log.info('Waiting for leader and worker processes') - time.sleep(.1) - - def _stopParasol(self): - self.worker.popen.kill() - self.worker.join() - self.leader.popen.kill() - self.leader.join() - for path in ('para.results', 'parasol.jid'): - if os.path.exists(path): - os.remove(path) - - class ParasolThread(threading.Thread): - - # Lock is used because subprocess is NOT thread safe: http://tinyurl.com/pkp5pgq - lock = threading.Lock() - - def __init__(self): - threading.Thread.__init__(self) - self.popen = None - - def parasolCommand(self): - raise NotImplementedError - - def run(self): - command = self.parasolCommand() - with self.lock: - self.popen = subprocess.Popen(command) - status = self.popen.wait() - if status != 0 and status != -signal.SIGKILL: - log.error("Command '%s' failed with %i.", command, status) - raise subprocess.CalledProcessError(status, command) - log.info('Exiting %s', self.__class__.__name__) - - @InnerClass - class ParasolLeaderThread(ParasolThread): - - def __init__(self): - super().__init__() - self.machineList = None - - def run(self): - with tempfile.NamedTemporaryFile(prefix='machineList.txt', mode='w') as f: - self.machineList = f.name - # name - Network name - # cpus - Number of CPUs we can use - # ramSize - Megabytes of memory - # tempDir - Location of (local) temp dir - # localDir - Location of local data dir - # localSize - Megabytes of local disk - # switchName - Name of switch this is on - f.write('localhost {numCores} {ramSize} {tempDir} {tempDir} 1024 foo'.format( - numCores=self.outer.numCores, - tempDir=tempfile.gettempdir(), - ramSize=self.outer.memory / 1024 / 1024)) - f.flush() - super().run() - - def parasolCommand(self): - return ['paraHub', - '-spokes=1', - '-debug', - self.machineList] - - @InnerClass - class ParasolWorkerThread(ParasolThread): - def parasolCommand(self): - return ['paraNode', - '-cpu=%i' % self.outer.numCores, - '-randomDelay=0', - '-debug', - 'start'] diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index 6444b78ed4..ca65967a5e 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -57,7 +57,6 @@ needs_local_cuda, needs_lsf, needs_mesos, - needs_parasol, needs_slurm, needs_torque, needs_wes_server, @@ -761,12 +760,6 @@ def test_gridengine_cwl_conformance(self, **kwargs): def test_mesos_cwl_conformance(self, **kwargs): return self.test_run_conformance(batchSystem="mesos", **kwargs) - @slow - @needs_parasol - @unittest.skip - def test_parasol_cwl_conformance(self, **kwargs): - return self.test_run_conformance(batchSystem="parasol", **kwargs) - @slow @needs_kubernetes def test_kubernetes_cwl_conformance(self, **kwargs): @@ -810,12 +803,6 @@ def test_gridengine_cwl_conformance_with_caching(self): def test_mesos_cwl_conformance_with_caching(self): return self.test_mesos_cwl_conformance(caching=True) - @slow - @needs_parasol - @unittest.skip - def test_parasol_cwl_conformance_with_caching(self): - return self.test_parasol_cwl_conformance(caching=True) - @slow @needs_kubernetes def test_kubernetes_cwl_conformance_with_caching(self): diff --git a/src/toil/test/sort/sortTest.py b/src/toil/test/sort/sortTest.py index 0a6756d0c1..7b93ffb141 100755 --- a/src/toil/test/sort/sortTest.py +++ b/src/toil/test/sort/sortTest.py @@ -34,10 +34,8 @@ needs_google_storage, needs_gridengine, needs_mesos, - needs_parasol, needs_torque, slow) -from toil.test.batchSystems.parasolTestSupport import ParasolTestSupport from toil.test.sort.sort import (copySubRangeOfFile, getMidPoint, main, @@ -64,7 +62,7 @@ def runMain(options): @slow -class SortTest(ToilTest, MesosTestSupport, ParasolTestSupport): +class SortTest(ToilTest, MesosTestSupport): """ Tests Toil by sorting a file in parallel on various combinations of job stores and batch systems. @@ -238,15 +236,6 @@ def testFileGridEngine(self): def testFileTorqueEngine(self): self._toilSort(jobStoreLocator=self._getTestJobStorePath(), batchSystem='torque') - @needs_parasol - @unittest.skip("skipping until parasol support is less flaky (see github issue #449") - def testFileParasol(self): - self._startParasol() - try: - self._toilSort(jobStoreLocator=self._getTestJobStorePath(), batchSystem='parasol') - finally: - self._stopParasol() - testNo = 5 def testSort(self):