Skip to content

Commit 82fd093

Browse files
committed
Move URLNotImplemented exception out to exceptions.py to prevent circular import and add new argument to make user deal with import worker disk size when streaming is not available. Fix a bug with file mutation as well.
1 parent 645fd25 commit 82fd093

File tree

6 files changed

+53
-57
lines changed

6 files changed

+53
-57
lines changed

Diff for: src/toil/cwl/cwltoil.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@
140140
NoSuchFileException,
141141
InvalidImportExportUrlException,
142142
LocatorException,
143-
UnimplementedURLException,
144143
)
144+
from toil.lib.exceptions import UnimplementedURLException
145145
from toil.jobStores.fileJobStore import FileJobStore
146146
from toil.jobStores.utils import JobStoreUnavailableException, generate_locator
147147
from toil.lib.io import mkdtemp
@@ -3622,7 +3622,7 @@ def run(self, file_store: AbstractFileStore) -> Any:
36223622
file_to_data = get_file_sizes(
36233623
filenames, file_store.jobStore, include_remote_files=self.options.reference_inputs
36243624
)
3625-
imports_job = ImportsJob(file_to_data, self.options.import_workers_threshold)
3625+
imports_job = ImportsJob(file_to_data, self.options.import_workers_threshold, self.options.import_workers_disk)
36263626
self.addChild(imports_job)
36273627
install_imports_job = CWLInstallImportsJob(
36283628
initialized_job_order=self.initialized_job_order,

Diff for: src/toil/job.py

+9-32
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,16 @@
7474
from toil.resource import ModuleDescriptor
7575
from toil.statsAndLogging import set_logging_from_options
7676

77+
from toil.lib.exceptions import UnimplementedURLException
78+
7779
if TYPE_CHECKING:
7880
from optparse import OptionParser
7981

8082
from toil.batchSystems.abstractBatchSystem import (
8183
BatchJobExitReason
8284
)
8385
from toil.fileStores.abstractFileStore import AbstractFileStore
84-
from toil.jobStores.abstractJobStore import (
85-
AbstractJobStore,
86-
UnimplementedURLException,
87-
)
86+
from toil.jobStores.abstractJobStore import AbstractJobStore
8887

8988
logger = logging.getLogger(__name__)
9089

@@ -3994,25 +3993,16 @@ def __init__(
39943993
self,
39953994
filenames: List[str],
39963995
disk_size: Optional[ParseableIndivisibleResource] = None,
3997-
stream: bool = True,
3998-
**kwargs: Any,
3996+
**kwargs: Any
39993997
):
40003998
"""
40013999
Setup importing files on a worker.
40024000
:param filenames: List of file URIs to import
40034001
:param disk_size: Designated disk space the worker can use when importing. Disregarded if stream is enabled.
4004-
:param stream: Whether to stream a file import or not. We don't have machinery to ensure
4005-
streaming, so if true, assume streaming works and don't give the worker a lot of disk space to work with.
4006-
If streaming fails, the worker will run out of resources and allocate a child job to handle the import with enough disk space.
40074002
:param kwargs: args for the superclass
40084003
"""
4009-
if stream:
4010-
super().__init__(local=False, **kwargs)
4011-
else:
4012-
super().__init__(local=False, disk=disk_size, **kwargs)
40134004
self.filenames = filenames
4014-
self.disk_size = disk_size
4015-
self.stream = stream
4005+
super().__init__(local=False, disk=disk_size, **kwargs)
40164006

40174007
@staticmethod
40184008
def import_files(
@@ -4045,21 +4035,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[Dict[str, FileID]]:
40454035
Import the workflow inputs and then create and run the workflow.
40464036
:return: Promise of workflow outputs
40474037
"""
4048-
try:
4049-
return self.import_files(self.filenames, file_store.jobStore)
4050-
except OSError as e:
4051-
# If the worker crashes due to running out of disk space and was not trying to
4052-
# stream the file import, then try a new import job without streaming by actually giving
4053-
# the worker enough disk space
4054-
# OSError 28 is no space left on device
4055-
if e.errno == 28 and self.stream is True:
4056-
non_streaming_import = WorkerImportJob(
4057-
self.filenames, self.disk_size, stream=False
4058-
)
4059-
self.addChild(non_streaming_import)
4060-
return non_streaming_import.rv()
4061-
else:
4062-
raise
4038+
return self.import_files(self.filenames, file_store.jobStore)
40634039

40644040

40654041
class ImportsJob(Job):
@@ -4073,6 +4049,7 @@ def __init__(
40734049
self,
40744050
file_to_data: Dict[str, FileMetadata],
40754051
max_batch_size: ParseableIndivisibleResource,
4052+
import_worker_disk: ParseableIndivisibleResource,
40764053
**kwargs: Any,
40774054
):
40784055
"""
@@ -4086,6 +4063,7 @@ def __init__(
40864063
super().__init__(local=True, **kwargs)
40874064
self._file_to_data = file_to_data
40884065
self._max_batch_size = max_batch_size
4066+
self._import_worker_disk = import_worker_disk
40894067

40904068
def run(
40914069
self, file_store: AbstractFileStore
@@ -4130,8 +4108,7 @@ def run(
41304108
# Create batch import jobs for each group of files
41314109
for batch in file_batches:
41324110
candidate_uris = [file_to_data[filename][0] for filename in batch]
4133-
batch_size = sum(file_to_data[filename][2] for filename in batch)
4134-
import_jobs.append(WorkerImportJob(candidate_uris, disk_size=batch_size))
4111+
import_jobs.append(WorkerImportJob(candidate_uris, disk_size=self._import_worker_disk))
41354112

41364113
for job in import_jobs:
41374114
self.addChild(job)

Diff for: src/toil/jobStores/abstractJobStore.py

+1-17
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
ServiceJobDescription,
4848
)
4949
from toil.lib.compatibility import deprecated
50+
from toil.lib.exceptions import UnimplementedURLException
5051
from toil.lib.io import WriteWatchingStream
5152
from toil.lib.memoize import memoize
5253
from toil.lib.retry import ErrorCondition, retry
@@ -83,23 +84,6 @@ def __init__(self, url: ParseResult) -> None:
8384
super().__init__("The URL '%s' is invalid." % url.geturl())
8485

8586

86-
class UnimplementedURLException(RuntimeError):
87-
def __init__(self, url: ParseResult, operation: str) -> None:
88-
"""
89-
Make a new exception to report that a URL scheme is not implemented, or
90-
that the implementation can't be loaded because its dependencies are
91-
not installed.
92-
93-
:param url: The given URL
94-
:param operation: Whether we are trying to 'import' or 'export'
95-
"""
96-
super().__init__(
97-
f"No available job store implementation can {operation} the URL "
98-
f"'{url.geturl()}'. Ensure Toil has been installed "
99-
f"with the appropriate extras."
100-
)
101-
102-
10387
class NoSuchJobException(Exception):
10488
"""Indicates that the specified job does not exist."""
10589

Diff for: src/toil/lib/exceptions.py

+18
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# 5.14.2018: copied into Toil from https://github.com/BD2KGenomics/bd2k-python-lib
1616

1717
import sys
18+
from urllib.parse import ParseResult
1819

1920

2021
# TODO: isn't this built in to Python 3 now?
@@ -61,3 +62,20 @@ def raise_(exc_type, exc_value, traceback) -> None:
6162
if exc.__traceback__ is not traceback:
6263
raise exc.with_traceback(traceback)
6364
raise exc
65+
66+
67+
class UnimplementedURLException(RuntimeError):
68+
def __init__(self, url: ParseResult, operation: str) -> None:
69+
"""
70+
Make a new exception to report that a URL scheme is not implemented, or
71+
that the implementation can't be loaded because its dependencies are
72+
not installed.
73+
74+
:param url: The given URL
75+
:param operation: Whether we are trying to 'import' or 'export'
76+
"""
77+
super().__init__(
78+
f"No available job store implementation can {operation} the URL "
79+
f"'{url.geturl()}'. Ensure Toil has been installed "
80+
f"with the appropriate extras."
81+
)

Diff for: src/toil/options/runner.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,18 @@ def add_runner_options(
3333
dest="import_workers_threshold",
3434
type=lambda x: human2bytes(str(x)),
3535
default="1 GiB",
36-
help="Specify the file size threshold that determines how many files go into a batched import. As many files will go into a batch import job until this threshold"
36+
help="Specify the file size threshold that determines how many files go into a batched import. As many files will go into a batch import job until this threshold "
3737
"is reached. This should be set in conjunction with the argument --runImportsOnWorkers."
3838
)
39+
import_workers_disk_argument = ["--importWorkersDisk"]
40+
if cwl:
41+
import_workers_disk_argument.append("--import-workers-disk")
42+
parser.add_argument(
43+
*import_workers_disk_argument,
44+
dest="import_workers_disk",
45+
type=lambda x: human2bytes(str(x)),
46+
default="1 MiB",
47+
help="Specify the disk size each import worker will get. This may be necessary when file streaming is not possible. For example, downloading from AWS to a GCE "
48+
"job store. If specified, this should be set to the largest file size of all files to import. This should be set in conjunction with the arguments "
49+
"--runImportsOnWorkers and --importWorkersThreshold."
50+
)

Diff for: src/toil/wdl/wdltoil.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@
9797
)
9898
from toil.jobStores.abstractJobStore import (
9999
AbstractJobStore,
100-
UnimplementedURLException,
101100
InvalidImportExportUrlException,
102101
LocatorException,
103102
)
103+
from toil.lib.exceptions import UnimplementedURLException
104104
from toil.lib.accelerators import get_individual_local_accelerators
105105
from toil.lib.conversions import VALID_PREFIXES, convert_units, human2bytes
106106
from toil.lib.io import mkdtemp, is_any_url, is_file_url, TOIL_URI_SCHEME, is_standard_url, is_toil_url, is_remote_url
@@ -1290,9 +1290,10 @@ def convert_file_to_uri(file: WDL.Value.File) -> WDL.Value.File:
12901290
file_id, task_path, dir_to_id[file_to_data[file.value][1]], file_basename
12911291
)
12921292

1293-
setattr(file, "virtualized_value", toil_uri)
1294-
file.value = candidate_uri
1295-
return file
1293+
# Don't mutate the original file object
1294+
new_file = WDL.Value.File(file.value)
1295+
setattr(new_file, "virtualized_value", toil_uri)
1296+
return new_file
12961297

12971298
return map_over_files_in_bindings(environment, convert_file_to_uri)
12981299

@@ -5326,6 +5327,7 @@ def __init__(
53265327
inputs_search_path: list[str],
53275328
import_remote_files: bool,
53285329
import_workers_threshold: ParseableIndivisibleResource,
5330+
import_workers_disk: ParseableIndivisibleResource,
53295331
**kwargs: Any,
53305332
):
53315333
"""
@@ -5339,6 +5341,7 @@ def __init__(
53395341
self._inputs_search_path = inputs_search_path
53405342
self._import_remote_files = import_remote_files
53415343
self._import_workers_threshold = import_workers_threshold
5344+
self._import_workers_disk = import_workers_disk
53425345

53435346
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
53445347
filenames = extract_workflow_inputs(self._inputs)
@@ -5347,8 +5350,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
53475350
file_store.jobStore,
53485351
self._inputs_search_path,
53495352
include_remote_files=self._import_remote_files,
5353+
execution_dir=self._wdl_options.get("execution_dir")
53505354
)
5351-
imports_job = ImportsJob(file_to_data, self._import_workers_threshold)
5355+
imports_job = ImportsJob(file_to_data, self._import_workers_threshold, self._import_workers_disk)
53525356
self.addChild(imports_job)
53535357
install_imports_job = WDLInstallImportsJob(
53545358
self._target.name, self._inputs, imports_job.rv()
@@ -5381,6 +5385,7 @@ def make_root_job(
53815385
inputs_search_path=inputs_search_path,
53825386
import_remote_files=options.reference_inputs,
53835387
import_workers_threshold=options.import_workers_threshold,
5388+
import_workers_disk=options.import_workers_disk
53845389
)
53855390
else:
53865391
# Run WDL imports on leader

0 commit comments

Comments
 (0)