diff --git a/Makefile b/Makefile index 2ace5221cd..59897505bd 100644 --- a/Makefile +++ b/Makefile @@ -135,6 +135,10 @@ test: check_venv check_build_reqs TOIL_OWNER_TAG="shared" \ python -m pytest --durations=0 --strict-markers --log-level DEBUG --log-cli-level INFO -r s $(cov) -n $(threads) --dist loadscope $(tests) -m "$(marker)" +test_debug: check_venv check_build_reqs + TOIL_OWNER_TAG="$(whoami)" \ + python -m pytest --durations=0 --strict-markers --log-level DEBUG -s -o log_cli=true --log-cli-level DEBUG -r s $(tests) -m "$(marker)" --tb=native --maxfail=1 + # This target will skip building docker and all docker based tests # these are our travis tests; rename? diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index edb0857140..0ceb0e0315 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -23,6 +23,8 @@ import datetime import errno import functools +import glob +import io import json import logging import os @@ -113,6 +115,7 @@ CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION, CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE, download_structure, + get_from_structure, visit_cwl_class_and_reduce, ) from toil.exceptions import FailedJobsException @@ -1312,73 +1315,160 @@ def download_to(url: str, dest: str) -> None: return destination def glob(self, pattern: str) -> List[str]: - # We know this falls back on _abs - return super().glob(pattern) + parse = urlparse(pattern) + if parse.scheme == "file": + pattern = os.path.abspath(unquote(parse.path)) + elif parse.scheme == "": + pattern = os.path.abspath(pattern) + else: + raise RuntimeError(f"Cannot efficiently support globbing on {parse.scheme} URIs") + + # Actually do the glob + return [schema_salad.ref_resolver.file_uri(f) for f in glob.glob(pattern)] def open(self, fn: str, mode: str) -> IO[Any]: - # TODO: Also implement JobStore-supported URLs through JobStore methods. - # We know this falls back on _abs - return super().open(fn, mode) + if "w" in mode or "x" in mode or "+" in mode or "a" in mode: + raise RuntimeError(f"Mode {mode} for opening {fn} involves writing") + + parse = urlparse(fn) + if parse.scheme in ["", "file"]: + # Handle local files + return open(self._abs(fn), mode) + elif parse.scheme == "toildir": + contents, subpath, cache_key = decode_directory(fn) + if cache_key in self.dir_to_download: + # This is already available locally, so fall back on the local copy + return open(self._abs(fn), mode) + else: + # We need to get the URI out of the virtual directory + if subpath is None: + raise RuntimeError(f"{fn} is a toildir directory") + uri = get_from_structure(contents, subpath) + if not isinstance(uri, str): + raise RuntimeError(f"{fn} does not point to a file") + # Recurse on that URI + return self.open(uri, mode) + elif parse.scheme == "toilfile": + if self.file_store is None: + raise RuntimeError("URL requires a file store: " + fn) + # Streaming access to Toil file store files requires being inside a + # context manager, which we can't require. So we need to download + # the file. + return open(self._abs(fn), mode) + else: + # This should be supported by a job store. + byte_stream = AbstractJobStore.open_url(fn) + if 'b' in mode: + # Pass stream along in binary + return byte_stream + else: + # Wrap it in a text decoder + return io.TextIOWrapper(byte_stream, encoding='utf-8') def exists(self, path: str) -> bool: """Test for file existence.""" - # toil's _abs() throws errors when files are not found and cwltool's _abs() does not - try: - # TODO: Also implement JobStore-supported URLs through JobStore methods. - return os.path.exists(self._abs(path)) - except NoSuchFileException: - return False + parse = urlparse(path) + if parse.scheme in ["", "file"]: + # Handle local files + # toil's _abs() throws errors when files are not found and cwltool's _abs() does not + try: + return os.path.exists(self._abs(path)) + except NoSuchFileException: + return False + elif parse.scheme == "toildir": + contents, subpath, cache_key = decode_directory(path) + if subpath is None: + # The toildir directory itself exists + return True + uri = get_from_structure(contents, subpath) + if uri is None: + # It's not in the virtual directory, so it doesn't exist + return False + if isinstance(uri, dict): + # Actually it's a subdirectory, so it exists. + return True + # We recurse and poll the URI directly to make sure it really exists + return self.exists(uri) + elif parse.scheme == "toilfile": + # TODO: we assume CWL can't call deleteGlobalFile and so the file always exists + return True + else: + # This should be supported by a job store. + return AbstractJobStore.url_exists(path) def size(self, path: str) -> int: - # This should avoid _abs for things actually in the file store, to - # prevent multiple downloads as in - # https://github.com/DataBiosphere/toil/issues/3665 - if path.startswith("toilfile:"): - if self.file_store is None: - raise RuntimeError("URL requires a file store: " + path) - return self.file_store.getGlobalFileSize( - FileID.unpack(path[len("toilfile:") :]) - ) - elif path.startswith("toildir:"): + parse = urlparse(path) + if parse.scheme in ["", "file"]: + return os.stat(self._abs(path)).st_size + elif parse.scheme == "toildir": # Decode its contents, the path inside it to the file (if any), and # the key to use for caching the directory. - here, subpath, cache_key = decode_directory(path) + contents, subpath, cache_key = decode_directory(path) # We can't get the size of just a directory. if subpath is None: raise RuntimeError(f"Attempted to check size of directory {path}") - for part in subpath.split("/"): - # Follow the path inside the directory contents. - here = cast(DirectoryContents, here[part]) + uri = get_from_structure(contents, subpath) - # We ought to end up with a toilfile: URI. - if not isinstance(here, str): + # We ought to end up with a URI. + if not isinstance(uri, str): raise RuntimeError(f"Did not find a file at {path}") - if not here.startswith("toilfile:"): - raise RuntimeError(f"Did not find a filestore file at {path}") - - return self.size(here) + return self.size(uri) + elif parse.scheme == "toilfile": + if self.file_store is None: + raise RuntimeError("URL requires a file store: " + path) + return self.file_store.getGlobalFileSize( + FileID.unpack(path[len("toilfile:") :]) + ) else: - # TODO: Also implement JobStore-supported URLs through JobStore methods. - # We know this falls back on _abs - return super().size(path) + # This should be supported by a job store. + size = AbstractJobStore.get_size(path) + if size is None: + # get_size can be unimplemented or unavailable + raise RuntimeError(f"Could not get size of {path}") + return size def isfile(self, fn: str) -> bool: parse = urlparse(fn) - if parse.scheme in ["toilfile", "toildir", "file", ""]: - # We know this falls back on _abs - return super().isfile(fn) + if parse.scheme in ["file", ""]: + return os.path.isfile(self._abs(fn)) + elif parse.scheme == "toilfile": + # TODO: we assume CWL can't call deleteGlobalFile and so the file always exists + return True + elif parse.scheme == "toildir": + contents, subpath, cache_key = decode_directory(fn) + if subpath is None: + # This is the toildir directory itself + return False + found = get_from_structure(contents, subpath) + # If we find a string, that's a file + # TODO: we assume CWL can't call deleteGlobalFile and so the file always exists + return isinstance(found, str) else: - return not AbstractJobStore.get_is_directory(fn) + return self.exists(fn) and not AbstractJobStore.get_is_directory(fn) def isdir(self, fn: str) -> bool: + logger.debug("ToilFsAccess checking type of %s", fn) parse = urlparse(fn) - if parse.scheme in ["toilfile", "toildir", "file", ""]: - # We know this falls back on _abs - return super().isdir(fn) + if parse.scheme in ["file", ""]: + return os.path.isdir(self._abs(fn)) + elif parse.scheme == "toilfile": + return False + elif parse.scheme == "toildir": + contents, subpath, cache_key = decode_directory(fn) + if subpath is None: + # This is the toildir directory itself. + # TODO: We assume directories can't be deleted. + return True + found = get_from_structure(contents, subpath) + # If we find a dict, that's a directory. + # TODO: We assume directories can't be deleted. + return isinstance(found, dict) else: - return AbstractJobStore.get_is_directory(fn) + status = AbstractJobStore.get_is_directory(fn) + logger.debug("AbstractJobStore said: %s", status) + return status def listdir(self, fn: str) -> List[str]: # This needs to return full URLs for everything in the directory. @@ -1386,12 +1476,25 @@ def listdir(self, fn: str) -> List[str]: logger.debug("ToilFsAccess listing %s", fn) parse = urlparse(fn) - if parse.scheme in ["toilfile", "toildir", "file", ""]: - # Download the file or directory to a local path + if parse.scheme in ["file", ""]: + # Find the local path directory = self._abs(fn) - # Now list it (it is probably a directory) return [abspath(quote(entry), fn) for entry in os.listdir(directory)] + elif parse.scheme == "toilfile": + raise RuntimeError(f"Cannot list a file: {fn}") + elif parse.scheme == "toildir": + contents, subpath, cache_key = decode_directory(fn) + here = contents + if subpath is not None: + got = get_from_structure(contents, subpath) + if got is None: + raise RuntimeError(f"Cannot list nonexistent directory: {fn}") + if isinstance(got, str): + raise RuntimeError(f"Cannot list file or dubdirectory of a file: {fn}") + here = got + # List all the things in here and make full URIs to them + return [os.path.join(fn, k) for k in here.keys()] else: return [ os.path.join(fn, entry.rstrip("/")) @@ -1413,7 +1516,7 @@ def toil_get_file( file_store: AbstractFileStore, index: Dict[str, str], existing: Dict[str, str], - file_store_id: str, + uri: str, streamable: bool = False, streaming_allowed: bool = True, pipe_threads: Optional[List[Tuple[Thread, int]]] = None, @@ -1430,9 +1533,9 @@ def toil_get_file( :param index: Maps from downloaded file path back to input Toil URI. - :param existing: Maps from file_store_id URI to downloaded file path. + :param existing: Maps from URI to downloaded file path. - :param file_store_id: The URI for the file to download. + :param uri: The URI for the file to download. :param streamable: If the file is has 'streamable' flag set @@ -1445,13 +1548,13 @@ def toil_get_file( pipe_threads_real = pipe_threads or [] # We can't use urlparse here because we need to handle the '_:' scheme and # urlparse sees that as a path and not a URI scheme. - if file_store_id.startswith("toildir:"): + if uri.startswith("toildir:"): # This is a file in a directory, or maybe a directory itself. # See ToilFsAccess and upload_directory. # We will go look for the actual file in the encoded directory # structure which will tell us where the toilfile: name for the file is. - parts = file_store_id[len("toildir:") :].split("/") + parts = uri[len("toildir:") :].split("/") contents = json.loads( base64.urlsafe_b64decode(parts[0].encode("utf-8")).decode("utf-8") ) @@ -1471,21 +1574,41 @@ def toil_get_file( download_structure(file_store, index, existing, contents, dest_path) # Return where we put it, but as a file:// URI return schema_salad.ref_resolver.file_uri(dest_path) - elif file_store_id.startswith("toilfile:"): - # This is a plain file with no context. + elif uri.startswith("_:"): + # Someone is asking us for an empty temp directory. + # We need to check this before the file path case because urlsplit() + # will call this a path with no scheme. + dest_path = file_store.getLocalTempDir() + return schema_salad.ref_resolver.file_uri(dest_path) + elif uri.startswith("file:") or urlsplit(uri).scheme == "": + # There's a file: scheme or no scheme, and we know this isn't a _: URL. + + # We need to support file: URIs and local paths, because we might be + # involved in moving files around on the local disk when uploading + # things after a job. We might want to catch cases where a leader + # filesystem file URI leaks in here, but we can't, so we just rely on + # the rest of the code to be correct. + return uri + else: + # This is a toilfile: uri or other remote URI def write_to_pipe( - file_store: AbstractFileStore, pipe_name: str, file_store_id: FileID + file_store: AbstractFileStore, pipe_name: str, uri: str ) -> None: try: with open(pipe_name, "wb") as pipe: - with file_store.jobStore.read_file_stream(file_store_id) as fi: - file_store.logAccess(file_store_id) - chunk_sz = 1024 - while True: - data = fi.read(chunk_sz) - if not data: - break - pipe.write(data) + if uri.startswith("toilfile:"): + # Stream from the file store + file_store_id = FileID.unpack(uri[len("toilfile:") :]) + with file_store.readGlobalFileStream(file_store_id) as fi: + chunk_sz = 1024 + while True: + data = fi.read(chunk_sz) + if not data: + break + pipe.write(data) + else: + # Stream from some other URI + AbstractJobStore.read_from_url(uri, pipe) except OSError as e: # The other side of the pipe may have been closed by the # reading thread, which is OK. @@ -1498,7 +1621,7 @@ def write_to_pipe( and not isinstance(file_store.jobStore, FileJobStore) ): logger.debug( - "Streaming file %s", FileID.unpack(file_store_id[len("toilfile:") :]) + "Streaming file %s", uri ) src_path = file_store.getLocalTempFileName() os.mkfifo(src_path) @@ -1507,43 +1630,40 @@ def write_to_pipe( args=( file_store, src_path, - FileID.unpack(file_store_id[len("toilfile:") :]), + uri, ), ) th.start() pipe_threads_real.append((th, os.open(src_path, os.O_RDONLY))) else: - src_path = file_store.readGlobalFile( - FileID.unpack(file_store_id[len("toilfile:") :]), symlink=True - ) - - # TODO: shouldn't we be using these as a cache? - index[src_path] = file_store_id - existing[file_store_id] = src_path + # We need to do a real file + if uri in existing: + # Already did it + src_path = existing[uri] + else: + if uri.startswith("toilfile:"): + # Download from the file store + file_store_id = FileID.unpack(uri[len("toilfile:") :]) + src_path = file_store.readGlobalFile( + file_store_id, symlink=True + ) + else: + # Download from the URI via the job store. + + # Figure out where it goes. + src_path = file_store.getLocalTempFileName() + # Open that path exclusively to make sure we created it + with open(src_path, 'xb') as fh: + # Download into the file + size, executable = AbstractJobStore.read_from_url(uri, fh) + if executable: + # Set the execute bit in the file's permissions + os.chmod(src_path, os.stat(src_path).st_mode | stat.S_IXUSR) + + index[src_path] = uri + existing[uri] = src_path return schema_salad.ref_resolver.file_uri(src_path) - elif file_store_id.startswith("_:"): - # Someone is asking us for an empty temp directory. - # We need to check this before the file path case because urlsplit() - # will call this a path with no scheme. - dest_path = file_store.getLocalTempDir() - return schema_salad.ref_resolver.file_uri(dest_path) - elif file_store_id.startswith("file:") or urlsplit(file_store_id).scheme == "": - # There's a file: scheme or no scheme, and we know this isn't a _: URL. - - # We need to support file: URIs and local paths, because we might be - # involved in moving files around on the local disk when uploading - # things after a job. We might want to catch cases where a leader - # filesystem file URI leaks in here, but we can't, so we just rely on - # the rest of the code to be correct. - return file_store_id - else: - raise RuntimeError( - f"Cannot obtain file {file_store_id} while on host " - f"{socket.gethostname()}; all imports must happen on the " - f"leader!" - ) - - + def write_file( writeFunc: Callable[[str], FileID], index: Dict[str, str], @@ -1599,6 +1719,7 @@ def import_files( existing: Dict[str, str], cwl_object: Optional[CWLObjectType], skip_broken: bool = False, + skip_remote: bool = False, bypass_file_store: bool = False, ) -> None: """ @@ -1637,12 +1758,16 @@ def import_files( :param skip_broken: If True, when files can't be imported because they e.g. don't exist, leave their locations alone rather than failing with an error. + :param skp_remote: If True, leave remote URIs in place instead of importing + files. + :param bypass_file_store: If True, leave file:// URIs in place instead of importing files and directories. """ tool_id = cwl_object.get("id", str(cwl_object)) if cwl_object else "" logger.debug("Importing files for %s", tool_id) + logger.debug("Importing files in %s", cwl_object) # We need to upload all files to the Toil filestore, and encode structure # recursively into all Directories' locations. But we cannot safely alter @@ -1742,7 +1867,7 @@ def visit_file_or_directory_up( # Upload the file itself, which will adjust its location. upload_file( - import_function, fileindex, existing, rec, skip_broken=skip_broken + import_function, fileindex, existing, rec, skip_broken=skip_broken, skip_remote=skip_remote ) # Make a record for this file under its name @@ -1847,11 +1972,16 @@ def upload_file( existing: Dict[str, str], file_metadata: CWLObjectType, skip_broken: bool = False, + skip_remote: bool = False ) -> None: """ - Update a file object so that the location is a reference to the toil file store. + Update a file object so that the file will be accessible from another machine. - Write the file object to the file store if necessary. + Uploads local files to the Toil file store, and sets their location to a + reference to the toil file store. + + Unless skip_remote is set, downloads remote files into the file store and + sets their locations to references into the file store as well. """ location = cast(str, file_metadata["location"]) if ( @@ -1874,7 +2004,10 @@ def upload_file( return else: raise cwl_utils.errors.WorkflowException("File is missing: %s" % location) - file_metadata["location"] = write_file(uploadfunc, fileindex, existing, location) + + if location.startswith("file://") or not skip_remote: + # This is a local file, or we also need to download and re-upload remote files + file_metadata["location"] = write_file(uploadfunc, fileindex, existing, location) logger.debug("Sending file at: %s", file_metadata["location"]) @@ -2071,34 +2204,50 @@ def _realpath( # At the end we should get a direct toilfile: URI file_id_or_contents = cast(str, here) + # This might be an e.g. S3 URI now + if not file_id_or_contents.startswith("toilfile:"): + # We need to import it so we can export it. + # TODO: Use direct S3 to S3 copy on exports as well + file_id_or_contents = ( + "toilfile:" + + toil.import_file(file_id_or_contents, symlink=False).pack() + ) + if file_id_or_contents.startswith("toilfile:"): # This is something we can export destUrl = "/".join(s.strip("/") for s in [destBucket, baseName]) - toil.exportFile( + toil.export_file( FileID.unpack(file_id_or_contents[len("toilfile:") :]), destUrl, ) # TODO: can a toildir: "file" get here? else: - # We are saving to the filesystem so we only really need exportFile for actual files. + # We are saving to the filesystem so we only really need export_file for actual files. if not os.path.exists(p.target) and p.type in [ "Directory", "WritableDirectory", ]: os.makedirs(p.target) if p.type in ["File", "WritableFile"]: - if p.resolved.startswith("toilfile:"): - # We can actually export this - os.makedirs(os.path.dirname(p.target), exist_ok=True) - toil.exportFile( - FileID.unpack(p.resolved[len("toilfile:") :]), - "file://" + p.target, - ) - elif p.resolved.startswith("/"): + if p.resolved.startswith("/"): # Probably staging and bypassing file store. Just copy. os.makedirs(os.path.dirname(p.target), exist_ok=True) shutil.copyfile(p.resolved, p.target) - # TODO: can a toildir: "file" get here? + else: + uri = p.resolved + if not uri.startswith("toilfile:"): + # We need to import so we can export + uri = ( + "toilfile:" + + toil.import_file(uri, symlink=False).pack() + ) + + # Actually export from the file store + os.makedirs(os.path.dirname(p.target), exist_ok=True) + toil.export_file( + FileID.unpack(uri[len("toilfile:") :]), + "file://" + p.target, + ) if p.type in [ "CreateFile", "CreateWritableFile", @@ -3560,6 +3709,15 @@ def add_cwl_options(parser: argparse.ArgumentParser) -> None: "paths are accessible in place from all nodes.", dest="bypass_file_store", ) + parser.add_argument( + "--reference-inputs", + action="store_true", + default=False, + help="Do not copy remote inputs into Toil's file " + "store and assume they are accessible in place from " + "all nodes.", + dest="reference_inputs", + ) parser.add_argument( "--disable-streaming", action="store_true", @@ -3747,6 +3905,10 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: # Otherwise, if it takes a File with loadContents from a URL, we won't # be able to load the contents when we need to. runtime_context.make_fs_access = ToilFsAccess + if options.reference_inputs and options.bypass_file_store: + # We can't do both of these at the same time. + logger.error("Cannot reference inputs when bypassing the file store") + return 1 loading_context = cwltool.main.setup_loadingContext(None, runtime_context, options) @@ -3942,6 +4104,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: existing, initialized_job_order, skip_broken=True, + skip_remote=options.reference_inputs, bypass_file_store=options.bypass_file_store, ) # Import all the files associated with tools (binaries, etc.). @@ -3956,6 +4119,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: fileindex, existing, skip_broken=True, + skip_remote=options.reference_inputs, bypass_file_store=options.bypass_file_store, ), ) diff --git a/src/toil/cwl/utils.py b/src/toil/cwl/utils.py index 86bb81c851..d043ebe9a2 100644 --- a/src/toil/cwl/utils.py +++ b/src/toil/cwl/utils.py @@ -16,6 +16,9 @@ import logging import os +from pathlib import PurePosixPath +import posixpath +import stat from typing import ( Any, Callable, @@ -31,6 +34,7 @@ from toil.fileStores import FileID from toil.fileStores.abstractFileStore import AbstractFileStore +from toil.jobStores.abstractJobStore import AbstractJobStore logger = logging.getLogger(__name__) @@ -128,6 +132,32 @@ def visit_cwl_class_and_reduce( DirectoryStructure = Dict[str, Union[str, "DirectoryStructure"]] +def get_from_structure(dir_dict: DirectoryStructure, path: str) -> Union[str, DirectoryStructure, None]: + """ + Given a relative path, follow it in the given directory structure. + + Return the string URI for files, the directory dict for + subdirectories, or None for nonexistent things. + """ + + # Resolve .. and split into path components + parts = PurePosixPath(posixpath.normpath(path)).parts + if len(parts) == 0: + return dir_dict + if parts[0] in ('..', '/'): + raise RuntimeError(f"Path {path} not resolvable in virtual directory") + found: Union[str, DirectoryStructure] = dir_dict + for part in parts: + # Go down by each path component in turn + if isinstance(found, str): + # Looking for a subdirectory of a file, which doesn't exist + return None + if part not in found: + return None + found = found[part] + # Now we're at the place we want to be. + return found + def download_structure( file_store: AbstractFileStore, @@ -140,11 +170,12 @@ def download_structure( Download nested dictionary from the Toil file store to a local path. Guaranteed to fill the structure with real files, and not symlinks out of - it to elsewhere. + it to elsewhere. File URIs may be toilfile: URIs or any other URI that + Toil's job store system can read. :param file_store: The Toil file store to download from. - :param index: Maps from downloaded file path back to input Toil URI. + :param index: Maps from downloaded file path back to input URI. :param existing: Maps from file_store_id URI to downloaded file path. @@ -171,16 +202,24 @@ def download_structure( # This must be a file path uploaded to Toil. if not isinstance(value, str): raise RuntimeError(f"Did not find a file at {value}.") - if not value.startswith("toilfile:"): - raise RuntimeError(f"Did not find a filestore file at {value}") + logger.debug("Downloading contained file '%s'", name) dest_path = os.path.join(into_dir, name) - # So download the file into place. - # Make sure to get a real copy of the file because we may need to - # mount the directory into a container as a whole. - file_store.readGlobalFile( - FileID.unpack(value[len("toilfile:") :]), dest_path, symlink=False - ) + + if value.startswith("toilfile:"): + # So download the file into place. + # Make sure to get a real copy of the file because we may need to + # mount the directory into a container as a whole. + file_store.readGlobalFile( + FileID.unpack(value[len("toilfile:") :]), dest_path, symlink=False + ) + else: + # We need to download from some other kind of URL. + size, executable = AbstractJobStore.read_from_url(value, open(dest_path, 'wb')) + if executable: + # Make the written file executable + os.chmod(dest_path, os.stat(dest_path).st_mode | stat.S_IXUSR) + # Update the index dicts # TODO: why? index[dest_path] = value diff --git a/src/toil/fileStores/abstractFileStore.py b/src/toil/fileStores/abstractFileStore.py index 600a7d3f36..472d57e5e2 100644 --- a/src/toil/fileStores/abstractFileStore.py +++ b/src/toil/fileStores/abstractFileStore.py @@ -26,12 +26,14 @@ Generator, Iterator, List, + Literal, Optional, Set, Tuple, Type, Union, - cast) + cast, + overload) import dill @@ -413,6 +415,21 @@ def readGlobalFile( """ raise NotImplementedError() + @overload + def readGlobalFileStream( + self, + fileStoreID: str, + encoding: Literal[None] = None, + errors: Optional[str] = None, + ) -> ContextManager[IO[bytes]]: + ... + + @overload + def readGlobalFileStream( + self, fileStoreID: str, encoding: str, errors: Optional[str] = None + ) -> ContextManager[IO[str]]: + ... + @abstractmethod def readGlobalFileStream( self, diff --git a/src/toil/fileStores/cachingFileStore.py b/src/toil/fileStores/cachingFileStore.py index e521c69824..54f5253c16 100644 --- a/src/toil/fileStores/cachingFileStore.py +++ b/src/toil/fileStores/cachingFileStore.py @@ -232,9 +232,11 @@ def __init__( # be able to tell that from showing up on a machine where a cache has # already been created. self.dbPath = os.path.join(self.coordination_dir, f'cache-{self.workflowAttemptNumber}.db') - # We need to hold onto both a connection (to commit) and a cursor (to actually use the database) - self.con = sqlite3.connect(self.dbPath, timeout=SQLITE_TIMEOUT_SECS) - self.cur = self.con.cursor() + + # Database connections are provided by magic properties self.con and + # self.cur that always have the right object for the current thread to + # use. They store stuff in this thread-local storage. + self._thread_local = threading.local() # Note that sqlite3 automatically starts a transaction when we go to # modify the database. @@ -242,6 +244,12 @@ def __init__( # write themselves), we need to COMMIT after every coherent set of # writes. + # Because we support multi-threaded access to files, but we talk to the + # database as one process with one identity for owning file references, + # we need to make sure only one thread of our process is trying to e.g. + # free up space in the cache for a file at a time. + self.process_identity_lock = threading.RLock() + # Set up the tables self._ensureTables(self.con) @@ -261,6 +269,37 @@ def __init__( # time. self.commitThread = None + @contextmanager + def as_process(self) -> Generator[str, None, None]: + """ + Assume the process's identity to act on the caching database. + + Yields the process's name in the caching database, and holds onto a + lock while your thread has it. + """ + with self.process_identity_lock: + yield get_process_name(self.coordination_dir) + + @property + def con(self) -> sqlite3.Connection: + """ + Get the database connection to be used for the current thread. + """ + if not hasattr(self._thread_local, 'con'): + # Connect to the database for this thread. + # TODO: We assume the connection closes when the thread goes away and can no longer use it. + self._thread_local.con = sqlite3.connect(self.dbPath, timeout=SQLITE_TIMEOUT_SECS) + return self._thread_local.con + + @property + def cur(self) -> sqlite3.Cursor: + """ + Get the main cursor to be used for the current thread. + """ + if not hasattr(self._thread_local, 'cur'): + # If we don't already have a main cursor for the thread, make one. + self._thread_local.cur = self.con.cursor() + return self._thread_local.cur @staticmethod @retry(infinite_retries=True, @@ -664,153 +703,137 @@ def _stealWorkFromTheDead(self): We don't actually process them here. We take action based on the states of files we own later. """ - me = get_process_name(self.coordination_dir) + with self.as_process() as me: - # Get a list of all file owner processes on this node. - # Exclude NULL because it comes out as 0 and we can't look for PID 0. - owners = [] - for row in self._read('SELECT DISTINCT owner FROM files WHERE owner IS NOT NULL'): - owners.append(row[0]) + # Get a list of all file owner processes on this node. + # Exclude NULL because it comes out as 0 and we can't look for PID 0. + owners = [] + for row in self._read('SELECT DISTINCT owner FROM files WHERE owner IS NOT NULL'): + owners.append(row[0]) - # Work out which of them have died. - deadOwners = [] - for owner in owners: - if not process_name_exists(self.coordination_dir, owner): - logger.debug('Owner %s is dead', owner) - deadOwners.append(owner) - else: - logger.debug('Owner %s is alive', owner) - - for owner in deadOwners: - # Try and adopt all the files that any dead owner had - - # If they were deleting, we delete. - # If they were downloading, we delete. Any outstanding references - # can't be in use since they are from the dead downloader. - # If they were uploading or uploadable, we mark as cached even - # though it never made it to the job store (and leave it unowned). - # - # Once the dead job that it was being uploaded from is cleaned up, - # and there are no longer any immutable references, it will be - # evicted as normal. Since the dead job can't have been marked - # successfully completed (since the file is still not uploaded), - # nobody is allowed to actually try and use the file. - # - # TODO: if we ever let other PIDs be responsible for writing our - # files asynchronously, this will need to change. - self._write([('UPDATE files SET owner = ?, state = ? WHERE owner = ? AND state = ?', - (me, 'deleting', owner, 'deleting')), - ('UPDATE files SET owner = ?, state = ? WHERE owner = ? AND state = ?', - (me, 'deleting', owner, 'downloading')), - ('UPDATE files SET owner = NULL, state = ? WHERE owner = ? AND (state = ? OR state = ?)', - ('cached', owner, 'uploadable', 'uploading'))]) - - logger.debug('Tried to adopt file operations from dead worker %s to ourselves as %s', owner, me) - - @classmethod - def _executePendingDeletions(cls, coordination_dir, con, cur): + # Work out which of them have died. + deadOwners = [] + for owner in owners: + if not process_name_exists(self.coordination_dir, owner): + logger.debug('Owner %s is dead', owner) + deadOwners.append(owner) + else: + logger.debug('Owner %s is alive', owner) + + for owner in deadOwners: + # Try and adopt all the files that any dead owner had + + # If they were deleting, we delete. + # If they were downloading, we delete. Any outstanding references + # can't be in use since they are from the dead downloader. + # If they were uploading or uploadable, we mark as cached even + # though it never made it to the job store (and leave it unowned). + # + # Once the dead job that it was being uploaded from is cleaned up, + # and there are no longer any immutable references, it will be + # evicted as normal. Since the dead job can't have been marked + # successfully completed (since the file is still not uploaded), + # nobody is allowed to actually try and use the file. + # + # TODO: if we ever let other PIDs be responsible for writing our + # files asynchronously, this will need to change. + self._write([('UPDATE files SET owner = ?, state = ? WHERE owner = ? AND state = ?', + (me, 'deleting', owner, 'deleting')), + ('UPDATE files SET owner = ?, state = ? WHERE owner = ? AND state = ?', + (me, 'deleting', owner, 'downloading')), + ('UPDATE files SET owner = NULL, state = ? WHERE owner = ? AND (state = ? OR state = ?)', + ('cached', owner, 'uploadable', 'uploading'))]) + + logger.debug('Tried to adopt file operations from dead worker %s to ourselves as %s', owner, me) + + def _executePendingDeletions(self): """ Delete all the files that are registered in the database as in the process of being deleted from the cache by us. Returns the number of files that were deleted. - - Implemented as a class method so it can use the database connection - appropriate to its thread without any chance of getting at the main - thread's connection and cursor in self. - - :param str coordination_dir: The coordination directory. - :param sqlite3.Connection con: Connection to the cache database. - :param sqlite3.Cursor cur: Cursor in the cache database. """ - me = get_process_name(coordination_dir) + with self.as_process() as me: - # Remember the file IDs we are deleting - deletedFiles = [] - for row in cls._static_read(cur, 'SELECT id, path FROM files WHERE owner = ? AND state = ?', (me, 'deleting')): - # Grab everything we are supposed to delete and delete it - fileID = row[0] - filePath = row[1] - try: - os.unlink(filePath) - logger.debug('Successfully deleted: %s', filePath) - except OSError: - # Probably already deleted - logger.debug('File already gone: %s', filePath) - # Still need to mark it as deleted + # Remember the file IDs we are deleting + deletedFiles = [] + for row in self._read('SELECT id, path FROM files WHERE owner = ? AND state = ?', (me, 'deleting')): + # Grab everything we are supposed to delete and delete it + fileID = row[0] + filePath = row[1] + try: + os.unlink(filePath) + logger.debug('Successfully deleted: %s', filePath) + except OSError: + # Probably already deleted + logger.debug('File already gone: %s', filePath) + # Still need to mark it as deleted - # Whether we deleted the file or just found out that it is gone, we - # need to take credit for deleting it so that we remove it from the - # database. - deletedFiles.append(fileID) + # Whether we deleted the file or just found out that it is gone, we + # need to take credit for deleting it so that we remove it from the + # database. + deletedFiles.append(fileID) - for fileID in deletedFiles: - # Drop all the files. They should have stayed in deleting state. We move them from there to not present at all. - # Also drop their references, if they had any from dead downloaders. - cls._static_write(con, cur, [('DELETE FROM files WHERE id = ? AND state = ?', (fileID, 'deleting')), - ('DELETE FROM refs WHERE file_id = ?', (fileID,))]) + for fileID in deletedFiles: + # Drop all the files. They should have stayed in deleting state. We move them from there to not present at all. + # Also drop their references, if they had any from dead downloaders. + self._write([('DELETE FROM files WHERE id = ? AND state = ?', (fileID, 'deleting')), + ('DELETE FROM refs WHERE file_id = ?', (fileID,))]) - return len(deletedFiles) + return len(deletedFiles) - def _executePendingUploads(self, con, cur): + def _executePendingUploads(self): """ Uploads all files in uploadable state that we own. Returns the number of files that were uploaded. - - Needs access to self to get at the job store for uploading files, but - still needs to take con and cur so it can run in a thread with the - thread's database connection. - - :param sqlite3.Connection con: Connection to the cache database. - :param sqlite3.Cursor cur: Cursor in the cache database. """ # Work out who we are - me = get_process_name(self.coordination_dir) - - # Record how many files we upload - uploadedCount = 0 - while True: - # Try and find a file we might want to upload - fileID = None - filePath = None - for row in self._static_read(cur, 'SELECT id, path FROM files WHERE state = ? AND owner = ? LIMIT 1', ('uploadable', me)): - fileID = row[0] - filePath = row[1] - - if fileID is None: - # Nothing else exists to upload - break - - # We need to set it to uploading in a way that we can detect that *we* won the update race instead of anyone else. - rowCount = self._static_write(con, cur, [('UPDATE files SET state = ? WHERE id = ? AND state = ?', ('uploading', fileID, 'uploadable'))]) - if rowCount != 1: - # We didn't manage to update it. Someone else (a running job if - # we are a committing thread, or visa versa) must have grabbed - # it. - logger.debug('Lost race to upload %s', fileID) - # Try again to see if there is something else to grab. - continue + with self.as_process() as me: + + # Record how many files we upload + uploadedCount = 0 + while True: + # Try and find a file we might want to upload + fileID = None + filePath = None + for row in self._static_read(self.cur, 'SELECT id, path FROM files WHERE state = ? AND owner = ? LIMIT 1', ('uploadable', me)): + fileID = row[0] + filePath = row[1] + + if fileID is None: + # Nothing else exists to upload + break - # Upload the file - logger.debug('Actually executing upload for file %s', fileID) - try: - self.jobStore.update_file(fileID, filePath) - except: - # We need to set the state back to 'uploadable' in case of any failures to ensure - # we can retry properly. - self._static_write(con, cur, [('UPDATE files SET state = ? WHERE id = ? AND state = ?', ('uploadable', fileID, 'uploading'))]) - raise + # We need to set it to uploading in a way that we can detect that *we* won the update race instead of anyone else. + rowCount = self._static_write(self.con, self.cur, [('UPDATE files SET state = ? WHERE id = ? AND state = ?', ('uploading', fileID, 'uploadable'))]) + if rowCount != 1: + # We didn't manage to update it. Someone else (a running job if + # we are a committing thread, or visa versa) must have grabbed + # it. + logger.debug('Lost race to upload %s', fileID) + # Try again to see if there is something else to grab. + continue + + # Upload the file + logger.debug('Actually executing upload for file %s', fileID) + try: + self.jobStore.update_file(fileID, filePath) + except: + # We need to set the state back to 'uploadable' in case of any failures to ensure + # we can retry properly. + self._static_write(self.con, self.cur, [('UPDATE files SET state = ? WHERE id = ? AND state = ?', ('uploadable', fileID, 'uploading'))]) + raise - # Count it for the total uploaded files value we need to return - uploadedCount += 1 + # Count it for the total uploaded files value we need to return + uploadedCount += 1 - # Remember that we uploaded it in the database - self._static_write(con, cur, [('UPDATE files SET state = ?, owner = NULL WHERE id = ?', ('cached', fileID))]) + # Remember that we uploaded it in the database + self._static_write(self.con, self.cur, [('UPDATE files SET state = ?, owner = NULL WHERE id = ?', ('cached', fileID))]) - return uploadedCount + return uploadedCount def _allocateSpaceForJob(self, newJobReqs): """ @@ -831,23 +854,23 @@ def _allocateSpaceForJob(self, newJobReqs): # This will take up space for us and potentially make the cache over-full. # But we won't actually let the job run and use any of this space until # the cache has been successfully cleared out. - me = get_process_name(self.coordination_dir) - self._write([('INSERT INTO jobs VALUES (?, ?, ?, ?)', (self.jobID, self.localTempDir, newJobReqs, me))]) + with self.as_process() as me: + self._write([('INSERT INTO jobs VALUES (?, ?, ?, ?)', (self.jobID, self.localTempDir, newJobReqs, me))]) - # Now we need to make sure that we can fit all currently cached files, - # and the parts of the total job requirements not currently spent on - # cached files, in under the total disk space limit. + # Now we need to make sure that we can fit all currently cached files, + # and the parts of the total job requirements not currently spent on + # cached files, in under the total disk space limit. - available = self.getCacheAvailable() + available = self.getCacheAvailable() - logger.debug('Available space with job: %d bytes', available) + logger.debug('Available space with job: %d bytes', available) - if available >= 0: - # We're fine on disk space - return + if available >= 0: + # We're fine on disk space + return - # Otherwise we need to clear stuff. - self._freeUpSpace() + # Otherwise we need to clear stuff. + self._freeUpSpace() @classmethod def _removeJob(cls, con, cur, jobID): @@ -903,66 +926,65 @@ def _tryToFreeUpSpace(self): Return whether we manage to get any space freed or not. """ - # First we want to make sure that dead jobs aren't holding - # references to files and keeping them from looking unused. - self._removeDeadJobs(self.coordination_dir, self.con) - - # Adopt work from any dead workers - self._stealWorkFromTheDead() + with self.as_process() as me: - if self._executePendingDeletions(self.coordination_dir, self.con, self.cur) > 0: - # We actually had something to delete, which we deleted. - # Maybe there is space now - logger.debug('Successfully executed pending deletions to free space') - return True + # First we want to make sure that dead jobs aren't holding + # references to files and keeping them from looking unused. + self._removeDeadJobs(self.coordination_dir, self.con) - if self._executePendingUploads(self.con, self.cur) > 0: - # We had something to upload. Maybe it can be evicted now. - logger.debug('Successfully executed pending uploads to free space') - return True + # Adopt work from any dead workers + self._stealWorkFromTheDead() - # Otherwise, not enough files could be found in deleting state to solve our problem. - # We need to put something into the deleting state. - # TODO: give other people time to finish their in-progress - # evictions before starting more, or we might evict everything as - # soon as we hit the cache limit. - - # Find something that has no non-mutable references and is not already being deleted. - self._read(""" - SELECT files.id FROM files WHERE files.state = 'cached' AND NOT EXISTS ( - SELECT NULL FROM refs WHERE refs.file_id = files.id AND refs.state != 'mutable' - ) LIMIT 1 - """) - row = self.cur.fetchone() - if row is None: - # Nothing can be evicted by us. - # Someone else might be in the process of evicting something that will free up space for us too. - # Or someone mught be uploading something and we have to wait for them to finish before it can be deleted. - logger.debug('Could not find anything to evict! Cannot free up space!') - return False - - # Otherwise we found an eviction candidate. - fileID = row[0] + if self._executePendingDeletions() > 0: + # We actually had something to delete, which we deleted. + # Maybe there is space now + logger.debug('Successfully executed pending deletions to free space') + return True + + if self._executePendingUploads() > 0: + # We had something to upload. Maybe it can be evicted now. + logger.debug('Successfully executed pending uploads to free space') + return True + + # Otherwise, not enough files could be found in deleting state to solve our problem. + # We need to put something into the deleting state. + # TODO: give other people time to finish their in-progress + # evictions before starting more, or we might evict everything as + # soon as we hit the cache limit. + + # Find something that has no non-mutable references and is not already being deleted. + self._read(""" + SELECT files.id FROM files WHERE files.state = 'cached' AND NOT EXISTS ( + SELECT NULL FROM refs WHERE refs.file_id = files.id AND refs.state != 'mutable' + ) LIMIT 1 + """) + row = self.cur.fetchone() + if row is None: + # Nothing can be evicted by us. + # Someone else might be in the process of evicting something that will free up space for us too. + # Or someone mught be uploading something and we have to wait for them to finish before it can be deleted. + logger.debug('Could not find anything to evict! Cannot free up space!') + return False - # Work out who we are - me = get_process_name(self.coordination_dir) + # Otherwise we found an eviction candidate. + fileID = row[0] - # Try and grab it for deletion, subject to the condition that nothing has started reading it - self._write([(""" - UPDATE files SET owner = ?, state = ? WHERE id = ? AND state = ? - AND owner IS NULL AND NOT EXISTS ( - SELECT NULL FROM refs WHERE refs.file_id = files.id AND refs.state != 'mutable' - ) - """, - (me, 'deleting', fileID, 'cached'))]) + # Try and grab it for deletion, subject to the condition that nothing has started reading it + self._write([(""" + UPDATE files SET owner = ?, state = ? WHERE id = ? AND state = ? + AND owner IS NULL AND NOT EXISTS ( + SELECT NULL FROM refs WHERE refs.file_id = files.id AND refs.state != 'mutable' + ) + """, + (me, 'deleting', fileID, 'cached'))]) - logger.debug('Evicting file %s', fileID) + logger.debug('Evicting file %s', fileID) - # Whether we actually got it or not, try deleting everything we have to delete - if self._executePendingDeletions(self.coordination_dir, self.con, self.cur) > 0: - # We deleted something - logger.debug('Successfully executed pending deletions to free space') - return True + # Whether we actually got it or not, try deleting everything we have to delete + if self._executePendingDeletions() > 0: + # We deleted something + logger.debug('Successfully executed pending deletions to free space') + return True def _freeUpSpace(self): """ @@ -1075,60 +1097,60 @@ def writeGlobalFile(self, localFileName, cleanup=False, executable=False): # TODO: this empty file could leak if we die now... fileID = self.jobStore.getEmptyFileStoreID(creatorID, cleanup, os.path.basename(localFileName)) # Work out who we are - me = get_process_name(self.coordination_dir) + with self.as_process() as me: - # Work out where the file ought to go in the cache - cachePath = self._getNewCachingPath(fileID) + # Work out where the file ought to go in the cache + cachePath = self._getNewCachingPath(fileID) - # Create a file in uploadable state and a reference, in the same transaction. - # Say the reference is an immutable reference - self._write([('INSERT INTO files VALUES (?, ?, ?, ?, ?)', (fileID, cachePath, fileSize, 'uploadable', me)), - ('INSERT INTO refs VALUES (?, ?, ?, ?)', (absLocalFileName, fileID, creatorID, 'immutable'))]) + # Create a file in uploadable state and a reference, in the same transaction. + # Say the reference is an immutable reference + self._write([('INSERT INTO files VALUES (?, ?, ?, ?, ?)', (fileID, cachePath, fileSize, 'uploadable', me)), + ('INSERT INTO refs VALUES (?, ?, ?, ?)', (absLocalFileName, fileID, creatorID, 'immutable'))]) - if absLocalFileName.startswith(self.localTempDir) and not os.path.islink(absLocalFileName): - # We should link into the cache, because the upload is coming from our local temp dir (and not via a symlink in there) - try: - # Try and hardlink the file into the cache. - # This can only fail if the system doesn't have hardlinks, or the - # file we're trying to link to has too many hardlinks to it - # already, or something. - os.link(absLocalFileName, cachePath) + if absLocalFileName.startswith(self.localTempDir) and not os.path.islink(absLocalFileName): + # We should link into the cache, because the upload is coming from our local temp dir (and not via a symlink in there) + try: + # Try and hardlink the file into the cache. + # This can only fail if the system doesn't have hardlinks, or the + # file we're trying to link to has too many hardlinks to it + # already, or something. + os.link(absLocalFileName, cachePath) - linkedToCache = True + linkedToCache = True - logger.debug('Hardlinked file %s into cache at %s; deferring write to job store', localFileName, cachePath) - assert not os.path.islink(cachePath), "Symlink %s has invaded cache!" % cachePath + logger.debug('Hardlinked file %s into cache at %s; deferring write to job store', localFileName, cachePath) + assert not os.path.islink(cachePath), "Symlink %s has invaded cache!" % cachePath - # Don't do the upload now. Let it be deferred until later (when the job is committing). - except OSError: - # We couldn't make the link for some reason + # Don't do the upload now. Let it be deferred until later (when the job is committing). + except OSError: + # We couldn't make the link for some reason + linkedToCache = False + else: + # If you are uploading a file that physically exists outside the + # local temp dir, it should not be linked into the cache. On + # systems that support it, we could end up with a + # hardlink-to-symlink in the cache if we break this rule, allowing + # files to vanish from our cache. linkedToCache = False - else: - # If you are uploading a file that physically exists outside the - # local temp dir, it should not be linked into the cache. On - # systems that support it, we could end up with a - # hardlink-to-symlink in the cache if we break this rule, allowing - # files to vanish from our cache. - linkedToCache = False - if not linkedToCache: - # If we can't do the link into the cache and upload from there, we - # have to just upload right away. We can't guarantee sufficient - # space to make a full copy in the cache, if we aren't allowed to - # take this copy away from the writer. + if not linkedToCache: + # If we can't do the link into the cache and upload from there, we + # have to just upload right away. We can't guarantee sufficient + # space to make a full copy in the cache, if we aren't allowed to + # take this copy away from the writer. - # Change the reference to 'mutable', which it will be. - # And drop the file altogether. - self._write([('UPDATE refs SET state = ? WHERE path = ? AND file_id = ?', ('mutable', absLocalFileName, fileID)), - ('DELETE FROM files WHERE id = ?', (fileID,))]) + # Change the reference to 'mutable', which it will be. + # And drop the file altogether. + self._write([('UPDATE refs SET state = ? WHERE path = ? AND file_id = ?', ('mutable', absLocalFileName, fileID)), + ('DELETE FROM files WHERE id = ?', (fileID,))]) - # Save the file to the job store right now - logger.debug('Actually executing upload immediately for file %s', fileID) - self.jobStore.update_file(fileID, absLocalFileName) + # Save the file to the job store right now + logger.debug('Actually executing upload immediately for file %s', fileID) + self.jobStore.update_file(fileID, absLocalFileName) - # Ship out the completed FileID object with its real size. - return FileID.forPath(fileID, absLocalFileName) + # Ship out the completed FileID object with its real size. + return FileID.forPath(fileID, absLocalFileName) def readGlobalFile(self, fileStoreID, userPath=None, cache=True, mutable=False, symlink=False): @@ -1276,130 +1298,130 @@ def _readGlobalFileMutablyWithCache(self, fileStoreID, localFilePath, readerID): """ # Work out who we are - me = get_process_name(self.coordination_dir) - - # Work out where to cache the file if it isn't cached already - cachedPath = self._getNewCachingPath(fileStoreID) - - # Start a loop until we can do one of these - while True: - # Try and create a downloading entry if no entry exists - logger.debug('Trying to make file record for id %s', fileStoreID) - self._write([('INSERT OR IGNORE INTO files VALUES (?, ?, ?, ?, ?)', - (fileStoreID, cachedPath, self.getGlobalFileSize(fileStoreID), 'downloading', me))]) - - # See if we won the race - self._read('SELECT COUNT(*) FROM files WHERE id = ? AND state = ? AND owner = ?', (fileStoreID, 'downloading', me)) - if self.cur.fetchone()[0] > 0: - # We are responsible for downloading the file - logger.debug('We are now responsible for downloading file %s', fileStoreID) - - # Make sure we have space for this download. - self._freeUpSpace() - - # Do the download into the cache. - self._downloadToCache(fileStoreID, cachedPath) - - # Now, we may have to immediately give away this file, because - # we don't have space for two copies. - # If so, we can't let it go to cached state, because someone - # else might make a reference to it, and we may get stuck with - # two readers, one cached copy, and space for two copies total. + with self.as_process() as me: - # Make the copying reference - self._write([('INSERT INTO refs VALUES (?, ?, ?, ?)', - (localFilePath, fileStoreID, readerID, 'copying'))]) - - # Fulfill it with a full copy or by giving away the cached copy - self._fulfillCopyingReference(fileStoreID, cachedPath, localFilePath) + # Work out where to cache the file if it isn't cached already + cachedPath = self._getNewCachingPath(fileStoreID) - # Now we're done - return localFilePath + # Start a loop until we can do one of these + while True: + # Try and create a downloading entry if no entry exists + logger.debug('Trying to make file record for id %s', fileStoreID) + self._write([('INSERT OR IGNORE INTO files VALUES (?, ?, ?, ?, ?)', + (fileStoreID, cachedPath, self.getGlobalFileSize(fileStoreID), 'downloading', me))]) - else: - logger.debug('Someone else is already responsible for file %s', fileStoreID) - - # A record already existed for this file. - # Try and create an immutable or copying reference to an entry that - # is in 'cached' or 'uploadable' or 'uploading' state. - # It might be uploading because *we* are supposed to be uploading it. - logger.debug('Trying to make reference to file %s', fileStoreID) - self._write([('INSERT INTO refs SELECT ?, id, ?, ? FROM files WHERE id = ? AND (state = ? OR state = ? OR state = ?)', - (localFilePath, readerID, 'copying', fileStoreID, 'cached', 'uploadable', 'uploading'))]) - - # See if we got it - self._read('SELECT COUNT(*) FROM refs WHERE path = ? and file_id = ?', (localFilePath, fileStoreID)) + # See if we won the race + self._read('SELECT COUNT(*) FROM files WHERE id = ? AND state = ? AND owner = ?', (fileStoreID, 'downloading', me)) if self.cur.fetchone()[0] > 0: - # The file is cached and we can copy or link it - logger.debug('Obtained reference to file %s', fileStoreID) - - # Get the path it is actually at in the cache, instead of where we wanted to put it - for row in self._read('SELECT path FROM files WHERE id = ?', (fileStoreID,)): - cachedPath = row[0] - - - while self.getCacheAvailable() < 0: - # Since we now have a copying reference, see if we have used too much space. - # If so, try to free up some space by deleting or uploading, but - # don't loop forever if we can't get enough. - self._tryToFreeUpSpace() - - if self.getCacheAvailable() >= 0: - # We made room - break - - # See if we have no other references and we can give away the file. - # Change it to downloading owned by us if we can grab it. - self._write([(""" - UPDATE files SET files.owner = ?, files.state = ? WHERE files.id = ? AND files.state = ? - AND files.owner IS NULL AND NOT EXISTS ( - SELECT NULL FROM refs WHERE refs.file_id = files.id AND refs.state != 'mutable' - ) - """, - (me, 'downloading', fileStoreID, 'cached'))]) - - if self._giveAwayDownloadingFile(fileStoreID, cachedPath, localFilePath): - # We got ownership of the file and managed to give it away. - return localFilePath + # We are responsible for downloading the file + logger.debug('We are now responsible for downloading file %s', fileStoreID) - # If we don't have space, and we couldn't make space, and we - # couldn't get exclusive control of the file to give it away, we - # need to wait for one of those people with references to the file - # to finish and give it up. - # TODO: work out if that will never happen somehow. - time.sleep(self.contentionBackoff) + # Make sure we have space for this download. + self._freeUpSpace() - # OK, now we have space to make a copy. + # Do the download into the cache. + self._downloadToCache(fileStoreID, cachedPath) - if self.forceDownloadDelay is not None: - # Wait around to simulate a big file for testing - time.sleep(self.forceDownloadDelay) + # Now, we may have to immediately give away this file, because + # we don't have space for two copies. + # If so, we can't let it go to cached state, because someone + # else might make a reference to it, and we may get stuck with + # two readers, one cached copy, and space for two copies total. - # Make the copy - atomic_copy(cachedPath, localFilePath) + # Make the copying reference + self._write([('INSERT INTO refs VALUES (?, ?, ?, ?)', + (localFilePath, fileStoreID, readerID, 'copying'))]) - # Change the reference to mutable - self._write([('UPDATE refs SET state = ? WHERE path = ? AND file_id = ?', ('mutable', localFilePath, fileStoreID))]) + # Fulfill it with a full copy or by giving away the cached copy + self._fulfillCopyingReference(fileStoreID, cachedPath, localFilePath) # Now we're done return localFilePath else: - # We didn't get a reference. Maybe it is still downloading. - logger.debug('Could not obtain reference to file %s', fileStoreID) + logger.debug('Someone else is already responsible for file %s', fileStoreID) + + # A record already existed for this file. + # Try and create an immutable or copying reference to an entry that + # is in 'cached' or 'uploadable' or 'uploading' state. + # It might be uploading because *we* are supposed to be uploading it. + logger.debug('Trying to make reference to file %s', fileStoreID) + self._write([('INSERT INTO refs SELECT ?, id, ?, ? FROM files WHERE id = ? AND (state = ? OR state = ? OR state = ?)', + (localFilePath, readerID, 'copying', fileStoreID, 'cached', 'uploadable', 'uploading'))]) + + # See if we got it + self._read('SELECT COUNT(*) FROM refs WHERE path = ? and file_id = ?', (localFilePath, fileStoreID)) + if self.cur.fetchone()[0] > 0: + # The file is cached and we can copy or link it + logger.debug('Obtained reference to file %s', fileStoreID) + + # Get the path it is actually at in the cache, instead of where we wanted to put it + for row in self._read('SELECT path FROM files WHERE id = ?', (fileStoreID,)): + cachedPath = row[0] + + + while self.getCacheAvailable() < 0: + # Since we now have a copying reference, see if we have used too much space. + # If so, try to free up some space by deleting or uploading, but + # don't loop forever if we can't get enough. + self._tryToFreeUpSpace() + + if self.getCacheAvailable() >= 0: + # We made room + break + + # See if we have no other references and we can give away the file. + # Change it to downloading owned by us if we can grab it. + self._write([(""" + UPDATE files SET files.owner = ?, files.state = ? WHERE files.id = ? AND files.state = ? + AND files.owner IS NULL AND NOT EXISTS ( + SELECT NULL FROM refs WHERE refs.file_id = files.id AND refs.state != 'mutable' + ) + """, + (me, 'downloading', fileStoreID, 'cached'))]) + + if self._giveAwayDownloadingFile(fileStoreID, cachedPath, localFilePath): + # We got ownership of the file and managed to give it away. + return localFilePath + + # If we don't have space, and we couldn't make space, and we + # couldn't get exclusive control of the file to give it away, we + # need to wait for one of those people with references to the file + # to finish and give it up. + # TODO: work out if that will never happen somehow. + time.sleep(self.contentionBackoff) + + # OK, now we have space to make a copy. + + if self.forceDownloadDelay is not None: + # Wait around to simulate a big file for testing + time.sleep(self.forceDownloadDelay) + + # Make the copy + atomic_copy(cachedPath, localFilePath) + + # Change the reference to mutable + self._write([('UPDATE refs SET state = ? WHERE path = ? AND file_id = ?', ('mutable', localFilePath, fileStoreID))]) + + # Now we're done + return localFilePath - # Loop around again and see if either we can download it or we can get a reference to it. + else: + # We didn't get a reference. Maybe it is still downloading. + logger.debug('Could not obtain reference to file %s', fileStoreID) - # If we didn't get a download or a reference, adopt and do work - # from dead workers and loop again. - # We may have to wait for someone else's download or delete to - # finish. If they die, we will notice. - self._removeDeadJobs(self.coordination_dir, self.con) - self._stealWorkFromTheDead() - self._executePendingDeletions(self.coordination_dir, self.con, self.cur) + # Loop around again and see if either we can download it or we can get a reference to it. - # Wait for other people's downloads to progress before re-polling. - time.sleep(self.contentionBackoff) + # If we didn't get a download or a reference, adopt and do work + # from dead workers and loop again. + # We may have to wait for someone else's download or delete to + # finish. If they die, we will notice. + self._removeDeadJobs(self.coordination_dir, self.con) + self._stealWorkFromTheDead() + self._executePendingDeletions() + + # Wait for other people's downloads to progress before re-polling. + time.sleep(self.contentionBackoff) def _fulfillCopyingReference(self, fileStoreID, cachedPath, localFilePath): """ @@ -1459,27 +1481,27 @@ def _giveAwayDownloadingFile(self, fileStoreID, cachedPath, localFilePath): """ # Work out who we are - me = get_process_name(self.coordination_dir) + with self.as_process() as me: - # See if we actually own this file and can giove it away - self._read('SELECT COUNT(*) FROM files WHERE id = ? AND state = ? AND owner = ?', - (fileStoreID, 'downloading', me)) - if self.cur.fetchone()[0] > 0: - # Now we have exclusive control of the cached copy of the file, so we can give it away. + # See if we actually own this file and can giove it away + self._read('SELECT COUNT(*) FROM files WHERE id = ? AND state = ? AND owner = ?', + (fileStoreID, 'downloading', me)) + if self.cur.fetchone()[0] > 0: + # Now we have exclusive control of the cached copy of the file, so we can give it away. - # Don't fake a delay here; this should be a rename always. + # Don't fake a delay here; this should be a rename always. - # We are giving it away - shutil.move(cachedPath, localFilePath) - # Record that. - self._write([('UPDATE refs SET state = ? WHERE path = ? AND file_id = ?', ('mutable', localFilePath, fileStoreID)), - ('DELETE FROM files WHERE id = ?', (fileStoreID,))]) + # We are giving it away + shutil.move(cachedPath, localFilePath) + # Record that. + self._write([('UPDATE refs SET state = ? WHERE path = ? AND file_id = ?', ('mutable', localFilePath, fileStoreID)), + ('DELETE FROM files WHERE id = ?', (fileStoreID,))]) - # Now we're done - return True - else: - # We don't own this file in 'downloading' state - return False + # Now we're done + return True + else: + # We don't own this file in 'downloading' state + return False def _createLinkFromCache(self, cachedPath, localFilePath, symlink=True): """ @@ -1530,108 +1552,108 @@ def _readGlobalFileWithCache(self, fileStoreID, localFilePath, symlink, readerID # Now we know to use the cache, and that we don't require a mutable copy. # Work out who we are - me = get_process_name(self.coordination_dir) - - # Work out where to cache the file if it isn't cached already - cachedPath = self._getNewCachingPath(fileStoreID) - - # Start a loop until we can do one of these - while True: - # Try and create a downloading entry if no entry exists. - # Make sure to create a reference at the same time if it succeeds, to bill it against our job's space. - # Don't create the mutable reference yet because we might not necessarily be able to clear that space. - logger.debug('Trying to make file downloading file record and reference for id %s', fileStoreID) - self._write([('INSERT OR IGNORE INTO files VALUES (?, ?, ?, ?, ?)', - (fileStoreID, cachedPath, self.getGlobalFileSize(fileStoreID), 'downloading', me)), - ('INSERT INTO refs SELECT ?, id, ?, ? FROM files WHERE id = ? AND state = ? AND owner = ?', - (localFilePath, readerID, 'immutable', fileStoreID, 'downloading', me))]) - - # See if we won the race - self._read('SELECT COUNT(*) FROM files WHERE id = ? AND state = ? AND owner = ?', (fileStoreID, 'downloading', me)) - if self.cur.fetchone()[0] > 0: - # We are responsible for downloading the file (and we have the reference) - logger.debug('We are now responsible for downloading file %s', fileStoreID) - - # Make sure we have space for this download. - self._freeUpSpace() - - # Do the download into the cache. - self._downloadToCache(fileStoreID, cachedPath) - - # Try and make the link before we let the file go to cached state. - # If we fail we may end up having to give away the file we just downloaded. - if self._createLinkFromCache(cachedPath, localFilePath, symlink): - # We made the link! - - # Change file state from downloading to cached so other people can use it - self._write([('UPDATE files SET state = ?, owner = NULL WHERE id = ?', - ('cached', fileStoreID))]) - - # Now we're done! - return localFilePath - else: - # We could not make a link. We need to make a copy. - - # Change the reference to copying. - self._write([('UPDATE refs SET state = ? WHERE path = ? AND file_id = ?', ('copying', localFilePath, fileStoreID))]) + with self.as_process() as me: + + # Work out where to cache the file if it isn't cached already + cachedPath = self._getNewCachingPath(fileStoreID) + + # Start a loop until we can do one of these + while True: + # Try and create a downloading entry if no entry exists. + # Make sure to create a reference at the same time if it succeeds, to bill it against our job's space. + # Don't create the mutable reference yet because we might not necessarily be able to clear that space. + logger.debug('Trying to make file downloading file record and reference for id %s', fileStoreID) + self._write([('INSERT OR IGNORE INTO files VALUES (?, ?, ?, ?, ?)', + (fileStoreID, cachedPath, self.getGlobalFileSize(fileStoreID), 'downloading', me)), + ('INSERT INTO refs SELECT ?, id, ?, ? FROM files WHERE id = ? AND state = ? AND owner = ?', + (localFilePath, readerID, 'immutable', fileStoreID, 'downloading', me))]) + + # See if we won the race + self._read('SELECT COUNT(*) FROM files WHERE id = ? AND state = ? AND owner = ?', (fileStoreID, 'downloading', me)) + if self.cur.fetchone()[0] > 0: + # We are responsible for downloading the file (and we have the reference) + logger.debug('We are now responsible for downloading file %s', fileStoreID) - # Fulfill it with a full copy or by giving away the cached copy - self._fulfillCopyingReference(fileStoreID, cachedPath, localFilePath) + # Make sure we have space for this download. + self._freeUpSpace() - # Now we're done - return localFilePath + # Do the download into the cache. + self._downloadToCache(fileStoreID, cachedPath) - else: - logger.debug('We already have an entry in the cache database for file %s', fileStoreID) - - # A record already existed for this file. - # Try and create an immutable reference to an entry that - # is in 'cached' or 'uploadable' or 'uploading' state. - # It might be uploading because *we* are supposed to be uploading it. - logger.debug('Trying to make reference to file %s', fileStoreID) - self._write([('INSERT INTO refs SELECT ?, id, ?, ? FROM files WHERE id = ? AND (state = ? OR state = ? OR state = ?)', - (localFilePath, readerID, 'immutable', fileStoreID, 'cached', 'uploadable', 'uploading'))]) - - # See if we got it - self._read('SELECT COUNT(*) FROM refs WHERE path = ? and file_id = ?', (localFilePath, fileStoreID)) - if self.cur.fetchone()[0] > 0: - # The file is cached and we can copy or link it - logger.debug('Obtained reference to file %s', fileStoreID) + # Try and make the link before we let the file go to cached state. + # If we fail we may end up having to give away the file we just downloaded. + if self._createLinkFromCache(cachedPath, localFilePath, symlink): + # We made the link! - # Get the path it is actually at in the cache, instead of where we wanted to put it - for row in self._read('SELECT path FROM files WHERE id = ?', (fileStoreID,)): - cachedPath = row[0] + # Change file state from downloading to cached so other people can use it + self._write([('UPDATE files SET state = ?, owner = NULL WHERE id = ?', + ('cached', fileStoreID))]) - if self._createLinkFromCache(cachedPath, localFilePath, symlink): - # We managed to make the link + # Now we're done! return localFilePath else: - # We can't make the link. We need a copy instead. + # We could not make a link. We need to make a copy. - # We could change the reference to copying, see if - # there's space, make the copy, try and get ahold of - # the file if there isn't space, and give it away, but - # we already have code for that for mutable downloads, - # so just clear the reference and download mutably. + # Change the reference to copying. + self._write([('UPDATE refs SET state = ? WHERE path = ? AND file_id = ?', ('copying', localFilePath, fileStoreID))]) - self._write([('DELETE FROM refs WHERE path = ? AND file_id = ?', (localFilePath, fileStoreID))]) + # Fulfill it with a full copy or by giving away the cached copy + self._fulfillCopyingReference(fileStoreID, cachedPath, localFilePath) + + # Now we're done + return localFilePath - return self._readGlobalFileMutablyWithCache(fileStoreID, localFilePath, readerID) else: - logger.debug('Could not obtain reference to file %s', fileStoreID) + logger.debug('We already have an entry in the cache database for file %s', fileStoreID) + + # A record already existed for this file. + # Try and create an immutable reference to an entry that + # is in 'cached' or 'uploadable' or 'uploading' state. + # It might be uploading because *we* are supposed to be uploading it. + logger.debug('Trying to make reference to file %s', fileStoreID) + self._write([('INSERT INTO refs SELECT ?, id, ?, ? FROM files WHERE id = ? AND (state = ? OR state = ? OR state = ?)', + (localFilePath, readerID, 'immutable', fileStoreID, 'cached', 'uploadable', 'uploading'))]) + + # See if we got it + self._read('SELECT COUNT(*) FROM refs WHERE path = ? and file_id = ?', (localFilePath, fileStoreID)) + if self.cur.fetchone()[0] > 0: + # The file is cached and we can copy or link it + logger.debug('Obtained reference to file %s', fileStoreID) + + # Get the path it is actually at in the cache, instead of where we wanted to put it + for row in self._read('SELECT path FROM files WHERE id = ?', (fileStoreID,)): + cachedPath = row[0] + + if self._createLinkFromCache(cachedPath, localFilePath, symlink): + # We managed to make the link + return localFilePath + else: + # We can't make the link. We need a copy instead. - # If we didn't get a download or a reference, adopt and do work from dead workers and loop again. - # We may have to wait for someone else's download or delete to - # finish. If they die, we will notice. - self._removeDeadJobs(self.coordination_dir, self.con) - self._stealWorkFromTheDead() - # We may have acquired ownership of partially-downloaded - # files, now in deleting state, that we need to delete - # before we can download them. - self._executePendingDeletions(self.coordination_dir, self.con, self.cur) + # We could change the reference to copying, see if + # there's space, make the copy, try and get ahold of + # the file if there isn't space, and give it away, but + # we already have code for that for mutable downloads, + # so just clear the reference and download mutably. - # Wait for other people's downloads to progress. - time.sleep(self.contentionBackoff) + self._write([('DELETE FROM refs WHERE path = ? AND file_id = ?', (localFilePath, fileStoreID))]) + + return self._readGlobalFileMutablyWithCache(fileStoreID, localFilePath, readerID) + else: + logger.debug('Could not obtain reference to file %s', fileStoreID) + + # If we didn't get a download or a reference, adopt and do work from dead workers and loop again. + # We may have to wait for someone else's download or delete to + # finish. If they die, we will notice. + self._removeDeadJobs(self.coordination_dir, self.con) + self._stealWorkFromTheDead() + # We may have acquired ownership of partially-downloaded + # files, now in deleting state, that we need to delete + # before we can download them. + self._executePendingDeletions() + + # Wait for other people's downloads to progress. + time.sleep(self.contentionBackoff) @contextmanager def _with_copying_reference_to_upload(self, file_store_id: FileID, reader_id: str, local_file_path: Optional[str] = None) -> Generator: @@ -1688,7 +1710,7 @@ def readGlobalFileStream(self, fileStoreID, encoding=None, errors=None): # Try and grab a reference to the file if it is being uploaded. if ref_path is not None: # We have an update in the cache that isn't written back yet. - # So we must stream from the ceche for consistency. + # So we must stream from the cache for consistency. # The ref file is not actually copied to; find the actual file # in the cache @@ -1703,7 +1725,7 @@ def readGlobalFileStream(self, fileStoreID, encoding=None, errors=None): # Pass along the results of the open context manager on the # file in the cache. yield result - # When we exit the with the copying reference will go away and + # When we exit the with, the copying reference will go away and # the file will be allowed to leave the cache again. else: # No local update, so we can stream from the job store @@ -1772,25 +1794,25 @@ def deleteGlobalFile(self, fileStoreID): raise # Work out who we are - me = get_process_name(self.coordination_dir) + with self.as_process() as me: - # Make sure nobody else has references to it - for row in self._read('SELECT job_id FROM refs WHERE file_id = ? AND state != ?', (fileStoreID, 'mutable')): - raise RuntimeError(f'Deleted file ID {fileStoreID} which is still in use by job {row[0]}') - # TODO: should we just let other jobs and the cache keep the file until - # it gets evicted, and only delete at the back end? + # Make sure nobody else has references to it + for row in self._read('SELECT job_id FROM refs WHERE file_id = ? AND state != ?', (fileStoreID, 'mutable')): + raise RuntimeError(f'Deleted file ID {fileStoreID} which is still in use by job {row[0]}') + # TODO: should we just let other jobs and the cache keep the file until + # it gets evicted, and only delete at the back end? - # Pop the file into deleting state owned by us if it exists - self._write([('UPDATE files SET state = ?, owner = ? WHERE id = ?', ('deleting', me, fileStoreID))]) + # Pop the file into deleting state owned by us if it exists + self._write([('UPDATE files SET state = ?, owner = ? WHERE id = ?', ('deleting', me, fileStoreID))]) - # Finish the delete if the file is present - self._executePendingDeletions(self.coordination_dir, self.con, self.cur) + # Finish the delete if the file is present + self._executePendingDeletions() - # Add the file to the list of files to be deleted from the job store - # once the run method completes. - self.filesToDelete.add(str(fileStoreID)) - self.logToMaster('Added file with ID \'%s\' to the list of files to be' % fileStoreID + - ' globally deleted.', level=logging.DEBUG) + # Add the file to the list of files to be deleted from the job store + # once the run method completes. + self.filesToDelete.add(str(fileStoreID)) + self.logToMaster('Added file with ID \'%s\' to the list of files to be' % fileStoreID + + ' globally deleted.', level=logging.DEBUG) @deprecated(new_function_name='export_file') def exportFile(self, jobStoreFileID: FileID, dstUrl: str) -> None: @@ -1805,7 +1827,7 @@ def export_file(self, file_id: FileID, dst_uri: str) -> None: # until they are done. # For safety and simplicity, we just execute all pending uploads now. - self._executePendingUploads(self.con, self.cur) + self._executePendingUploads() # Then we let the job store export. TODO: let the export come from the # cache? How would we write the URL? @@ -1873,18 +1895,12 @@ def startCommitThread(self, state_to_commit: Optional[JobDescription]): self.waitForPreviousCommit() try: - # Reconnect to the database from this thread. The main thread can - # keep using self.con and self.cur. We need to do this because - # SQLite objects are tied to a thread. - con = sqlite3.connect(self.dbPath, timeout=SQLITE_TIMEOUT_SECS) - cur = con.cursor() - logger.debug('Committing file uploads asynchronously') # Finish all uploads - self._executePendingUploads(con, cur) + self._executePendingUploads() # Finish all deletions out of the cache (not from the job store) - self._executePendingDeletions(self.coordination_dir, con, cur) + self._executePendingDeletions() if state_to_commit is not None: # Do all the things that make this job not redoable @@ -2010,7 +2026,9 @@ def _removeDeadJobs(cls, coordination_dir, con): # Get a cursor cur = con.cursor() - # Work out our process name for taking ownership of jobs + # We're allowed to assign jobs to us without acquiring the process + # identity lock; we know it won't interfere with any of the other logic + # happening under our process's identity in the database. me = get_process_name(coordination_dir) # Get all the dead worker PIDs diff --git a/src/toil/fileStores/nonCachingFileStore.py b/src/toil/fileStores/nonCachingFileStore.py index 65fcc3f663..4308e2b754 100644 --- a/src/toil/fileStores/nonCachingFileStore.py +++ b/src/toil/fileStores/nonCachingFileStore.py @@ -21,14 +21,17 @@ from typing import (IO, Any, Callable, + ContextManager, DefaultDict, Dict, Generator, Iterator, List, + Literal, Optional, Union, - cast) + cast, + overload) import dill @@ -156,7 +159,25 @@ def readGlobalFile(self, fileStoreID: str, userPath: Optional[str] = None, cache self.logAccess(fileStoreID, localFilePath) return localFilePath - @contextmanager + @overload + def readGlobalFileStream( + self, + fileStoreID: str, + encoding: Literal[None] = None, + errors: Optional[str] = None, + ) -> ContextManager[IO[bytes]]: + ... + + @overload + def readGlobalFileStream( + self, fileStoreID: str, encoding: str, errors: Optional[str] = None + ) -> ContextManager[IO[str]]: + ... + + # TODO: This seems to hit https://github.com/python/mypy/issues/11373 + # But that is supposedly fixed. + + @contextmanager # type: ignore def readGlobalFileStream(self, fileStoreID: str, encoding: Optional[str] = None, errors: Optional[str] = None) -> Iterator[Union[IO[bytes], IO[str]]]: with self.jobStore.read_file_stream(fileStoreID, encoding=encoding, errors=errors) as f: self.logAccess(fileStoreID) diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index a05981367c..ca5bc6d1ab 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -34,6 +34,7 @@ Tuple, Union, ValuesView, + cast, overload) if sys.version_info >= (3, 8): @@ -536,29 +537,28 @@ def _default_export_file(self, otherCls: 'AbstractJobStore', jobStoreFileID: Fil otherCls._write_to_url(readable, url, executable) @classmethod - def list_url(cls, src_uri: str) -> List[str]: + def url_exists(cls, src_uri: str) -> bool: """ - List the directory at the given URL. Returned path components can be - joined with '/' onto the passed URL to form new URLs. Those that end in - '/' correspond to directories. The provided URL may or may not end with - '/'. + Return True if the file at the given URI exists, and False otherwise. - Currently supported schemes are: - - - 's3' for objects in Amazon S3 - e.g. s3://bucket/prefix/ - - - 'file' for local files - e.g. file:///local/dir/path/ + :param src_uri: URL that points to a file or object in the storage + mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. + """ + parseResult = urlparse(src_uri) + otherCls = cls._findJobStoreForUrl(parseResult) + return otherCls._url_exists(parseResult) - :param str src_uri: URL that points to a directory or prefix in the storage mechanism of a - supported URL scheme e.g. a prefix in an AWS s3 bucket. + @classmethod + def get_size(cls, src_uri: str) -> Optional[int]: + """ + Get the size in bytes of the file at the given URL, or None if it cannot be obtained. - :return: A list of URL components in the given directory, already URL-encoded. + :param src_uri: URL that points to a file or object in the storage + mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. """ parseResult = urlparse(src_uri) otherCls = cls._findJobStoreForUrl(parseResult) - return otherCls._list_url(parseResult) + return otherCls._get_size(parseResult) @classmethod def get_is_directory(cls, src_uri: str) -> bool: @@ -571,18 +571,29 @@ def get_is_directory(cls, src_uri: str) -> bool: return otherCls._get_is_directory(parseResult) @classmethod - @abstractmethod - def _get_is_directory(cls, url: ParseResult) -> bool: + def list_url(cls, src_uri: str) -> List[str]: """ - Return True if the thing at the given URL is a directory, and False if - it is a file or it is known not to exist. The URL may or may not end in + List the directory at the given URL. Returned path components can be + joined with '/' onto the passed URL to form new URLs. Those that end in + '/' correspond to directories. The provided URL may or may not end with '/'. - :param url: URL that points to a file or object, or directory or prefix, - in the storage mechanism of a supported URL scheme e.g. a blob - in an AWS s3 bucket. + Currently supported schemes are: + + - 's3' for objects in Amazon S3 + e.g. s3://bucket/prefix/ + + - 'file' for local files + e.g. file:///local/dir/path/ + + :param str src_uri: URL that points to a directory or prefix in the storage mechanism of a + supported URL scheme e.g. a prefix in an AWS s3 bucket. + + :return: A list of URL components in the given directory, already URL-encoded. """ - raise NotImplementedError + parseResult = urlparse(src_uri) + otherCls = cls._findJobStoreForUrl(parseResult) + return otherCls._list_url(parseResult) @classmethod def read_from_url(cls, src_uri: str, writable: IO[bytes]) -> Tuple[int, bool]: @@ -592,28 +603,54 @@ def read_from_url(cls, src_uri: str, writable: IO[bytes]) -> Tuple[int, bool]: Raises FileNotFoundError if the URL doesn't exist. :return: The size of the file in bytes and whether the executable permission bit is set - :rtype: Tuple[int, bool] """ parseResult = urlparse(src_uri) otherCls = cls._findJobStoreForUrl(parseResult) return otherCls._read_from_url(parseResult, writable) + + @classmethod + def open_url(cls, src_uri: str) -> IO[bytes]: + """ + Read from the given URI. + + Raises FileNotFoundError if the URL doesn't exist. + Has a readable stream interface, unlike :meth:`read_from_url` which + takes a writable stream. + """ + parseResult = urlparse(src_uri) + otherCls = cls._findJobStoreForUrl(parseResult) + return otherCls._open_url(parseResult) + @classmethod - @deprecated(new_function_name='get_size') - def getSize(cls, url: ParseResult) -> None: - return cls.get_size(url) + @abstractmethod + def _url_exists(cls, url: ParseResult) -> bool: + """ + Return True if the item at the given URL exists, and Flase otherwise. + """ + raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod - def get_size(cls, src_uri: ParseResult) -> None: + def _get_size(cls, url: ParseResult) -> Optional[int]: """ - Get the size in bytes of the file at the given URL, or None if it cannot be obtained. + Get the size of the object at the given URL, or None if it cannot be obtained. + """ + raise NotImplementedError(f"No implementation for {url}") - :param src_uri: URL that points to a file or object in the storage - mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. + @classmethod + @abstractmethod + def _get_is_directory(cls, url: ParseResult) -> bool: """ - raise NotImplementedError + Return True if the thing at the given URL is a directory, and False if + it is a file or it is known not to exist. The URL may or may not end in + '/'. + :param url: URL that points to a file or object, or directory or prefix, + in the storage mechanism of a supported URL scheme e.g. a blob + in an AWS s3 bucket. + """ + raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod @@ -622,8 +659,6 @@ def _read_from_url(cls, url: ParseResult, writable: IO[bytes]) -> Tuple[int, boo Reads the contents of the object at the specified location and writes it to the given writable stream. - Raises FileNotFoundError if the URL doesn't exist. - Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes. Raises FileNotFoundError if the thing at the URL is not found. @@ -634,46 +669,58 @@ def _read_from_url(cls, url: ParseResult, writable: IO[bytes]) -> Tuple[int, boo :param IO[bytes] writable: a writable stream :return: The size of the file in bytes and whether the executable permission bit is set - :rtype: Tuple[int, bool] """ - raise NotImplementedError() + raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod - def _write_to_url(cls, readable: Union[IO[bytes], IO[str]], url: ParseResult, executable: bool = False) -> None: + def _list_url(cls, url: ParseResult) -> List[str]: """ - Reads the contents of the given readable stream and writes it to the object at the - specified location. Raises FileNotFoundError if the URL doesn't exist.. + List the contents of the given URL, which may or may not end in '/' - Refer to AbstractJobStore.importFile documentation for currently supported URL schemes. + Returns a list of URL components. Those that end in '/' are meant to be + directories, while those that do not are meant to be files. - :param Union[IO[bytes], IO[str]] readable: a readable stream + Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes. - :param ParseResult url: URL that points to a file or object in the storage - mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. + :param ParseResult url: URL that points to a directory or prefix in the + storage mechanism of a supported URL scheme e.g. a prefix in an AWS s3 + bucket. - :param bool executable: determines if the file has executable permissions + :return: The children of the given URL, already URL-encoded if + appropriate. (If the URL is a bare path, no encoding is done.) """ - raise NotImplementedError() + raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod - def _list_url(cls, url: ParseResult) -> List[str]: + def _open_url(cls, url: ParseResult) -> IO[bytes]: """ - List the contents of the given URL, which may or may not end in '/' - - Returns a list of URL components. Those that end in '/' are meant to be - directories, while those that do not are meant to be files. + Get a stream of the object at the specified location. Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes. - :param ParseResult url: URL that points to a directory or prefix in the - storage mechanism of a supported URL scheme e.g. a prefix in an AWS s3 - bucket. + Raises FileNotFoundError if the thing at the URL is not found. + """ + raise NotImplementedError(f"No implementation for {url}") - :return: The children of the given URL, already URL-encoded. + @classmethod + @abstractmethod + def _write_to_url(cls, readable: Union[IO[bytes], IO[str]], url: ParseResult, executable: bool = False) -> None: """ - raise NotImplementedError() + Reads the contents of the given readable stream and writes it to the object at the + specified location. Raises FileNotFoundError if the URL doesn't exist.. + + Refer to AbstractJobStore.importFile documentation for currently supported URL schemes. + + :param Union[IO[bytes], IO[str]] readable: a readable stream + + :param ParseResult url: URL that points to a file or object in the storage + mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. + + :param bool executable: determines if the file has executable permissions + """ + raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod @@ -689,7 +736,7 @@ def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: :return bool: returns true if the cls supports the URL """ - raise NotImplementedError() + raise NotImplementedError(f"No implementation for {url}") @abstractmethod def destroy(self) -> None: @@ -1695,6 +1742,16 @@ class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: return url.scheme.lower() in ('http', 'https', 'ftp') and not export + @classmethod + def _url_exists(cls, url: ParseResult) -> bool: + try: + # TODO: Figure out how to HEAD instead of this. + with cls._open_url(url): + return True + except: + pass + return False + @classmethod @retry( errors=[ @@ -1702,7 +1759,7 @@ def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: ErrorCondition(error=HTTPError, error_codes=[408, 500, 503]), ] ) - def get_size(cls, url: ParseResult) -> Optional[int]: + def _get_size(cls, url: ParseResult) -> Optional[int]: if url.scheme.lower() == 'ftp': return None with closing(urlopen(url.geturl())) as readable: @@ -1710,6 +1767,27 @@ def get_size(cls, url: ParseResult) -> Optional[int]: size = readable.info().get('content-length') return int(size) if size is not None else None + @classmethod + def _read_from_url( + cls, url: ParseResult, writable: Union[IO[bytes], IO[str]] + ) -> Tuple[int, bool]: + # We can't actually retry after we start writing. + # TODO: Implement retry with byte range requests + with cls._open_url(url) as readable: + # Make something to count the bytes we get + # We need to put the actual count in a container so our + # nested function can modify it without creating its own + # local with the same name. + size = [0] + def count(l: int) -> None: + size[0] += l + counter = WriteWatchingStream(writable) + counter.onWrite(count) + + # Do the download + shutil.copyfileobj(readable, counter) + return size[0], False + @classmethod @retry( errors=[ @@ -1717,27 +1795,9 @@ def get_size(cls, url: ParseResult) -> Optional[int]: ErrorCondition(error=HTTPError, error_codes=[408, 500, 503]), ] ) - def _read_from_url( - cls, url: ParseResult, writable: Union[IO[bytes], IO[str]] - ) -> Tuple[int, bool]: - # We can only retry on errors that happen as responses to the request. - # If we start getting file data, and the connection drops, we fail. - # So we don't have to worry about writing the start of the file twice. + def _open_url(cls, url: ParseResult) -> IO[bytes]: try: - with closing(urlopen(url.geturl())) as readable: - # Make something to count the bytes we get - # We need to put the actual count in a container so our - # nested function can modify it without creating its own - # local with the same name. - size = [0] - def count(l: int) -> None: - size[0] += l - counter = WriteWatchingStream(writable) - counter.onWrite(count) - - # Do the download - shutil.copyfileobj(readable, counter) - return size[0], False + return cast(IO[bytes], closing(urlopen(url.geturl()))) except HTTPError as e: if e.code == 404: # Translate into a FileNotFoundError for detecting diff --git a/src/toil/jobStores/aws/jobStore.py b/src/toil/jobStores/aws/jobStore.py index fcda110148..28994bd34a 100644 --- a/src/toil/jobStores/aws/jobStore.py +++ b/src/toil/jobStores/aws/jobStore.py @@ -23,7 +23,7 @@ import uuid from contextlib import contextmanager from io import BytesIO -from typing import List, Optional +from typing import List, Optional, IO from urllib.parse import ParseResult, parse_qs, urlencode, urlsplit, urlunsplit import boto.s3.connection @@ -465,7 +465,17 @@ def _export_file(self, otherCls, file_id, uri): super()._default_export_file(otherCls, file_id, uri) @classmethod - def get_size(cls, url): + def _url_exists(cls, url: ParseResult) -> bool: + try: + get_object_for_url(url, existing=True) + return True + except FileNotFoundError: + # Not a file + # Might be a directory. + return cls._get_is_directory(url) + + @classmethod + def _get_size(cls, url): return get_object_for_url(url, existing=True).content_length @classmethod @@ -477,6 +487,15 @@ def _read_from_url(cls, url, writable): False # executable bit is always False ) + @classmethod + def _open_url(cls, url: ParseResult) -> IO[bytes]: + src_obj = get_object_for_url(url, existing=True) + response = src_obj.get() + # We should get back a response with a stream in 'Body' + if 'Body' not in response: + raise RuntimeError(f"Could not fetch body stream for {url}") + return response['Body'] + @classmethod def _write_to_url(cls, readable, url, executable=False): dstObj = get_object_for_url(url) diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index 4298e2ed7f..09fe0079d3 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -362,7 +362,11 @@ def _move_and_linkback(self, srcPath, destPath, executable): os.chmod(destPath, os.stat(destPath).st_mode | stat.S_IXUSR) @classmethod - def get_size(cls, url): + def _url_exists(cls, url: ParseResult) -> bool: + return os.path.exists(cls._extract_path_from_url(url)) + + @classmethod + def _get_size(cls, url): return os.stat(cls._extract_path_from_url(url)).st_size @classmethod @@ -376,12 +380,18 @@ def _read_from_url(cls, url, writable): """ # we use a ~10Mb buffer to improve speed - with open(cls._extract_path_from_url(url), 'rb') as readable: + with cls._open_url(url) as readable: shutil.copyfileobj(readable, writable, length=cls.BUFFER_SIZE) # Return the number of bytes we read when we reached EOF. executable = os.stat(readable.name).st_mode & stat.S_IXUSR return readable.tell(), executable + @classmethod + def _open_url(cls, url: ParseResult) -> IO[bytes]: + """ + Open a file URL as a binary stream. + """ + return open(cls._extract_path_from_url(url), 'rb') @classmethod def _write_to_url(cls, readable, url, executable=False): diff --git a/src/toil/jobStores/googleJobStore.py b/src/toil/jobStores/googleJobStore.py index b1657ca9ae..09824eaa87 100644 --- a/src/toil/jobStores/googleJobStore.py +++ b/src/toil/jobStores/googleJobStore.py @@ -20,7 +20,7 @@ from contextlib import contextmanager from functools import wraps from io import BytesIO -from typing import List, Optional +from typing import IO, List, Optional from urllib.parse import ParseResult from google.api_core.exceptions import (GoogleAPICallError, @@ -390,7 +390,15 @@ def _get_blob_from_url(cls, url, exists=False): return blob @classmethod - def get_size(cls, url): + def _url_exists(cls, url: ParseResult) -> bool: + try: + cls._get_blob_from_url(url, exists=True) + return True + except NoSuchFileException: + return False + + @classmethod + def _get_size(cls, url): return cls._get_blob_from_url(url, exists=True).size @classmethod @@ -399,6 +407,11 @@ def _read_from_url(cls, url, writable): blob.download_to_file(writable) return blob.size, False + @classmethod + def _open_url(cls, url: ParseResult) -> IO[bytes]: + blob = cls._get_blob_from_url(url, exists=True) + return blob.open("rb") + @classmethod def _supports_url(cls, url, export=False): return url.scheme.lower() == 'gs' diff --git a/src/toil/lib/aws/utils.py b/src/toil/lib/aws/utils.py index 6278ec6d7d..3e26691358 100644 --- a/src/toil/lib/aws/utils.py +++ b/src/toil/lib/aws/utils.py @@ -342,6 +342,8 @@ def get_object_for_url(url: ParseResult, existing: Optional[bool] = None) -> "Ob """ Extracts a key (object) from a given parsed s3:// URL. + If existing is true and the object does not exist, raises FileNotFoundError. + :param bool existing: If True, key is expected to exist. If False, key is expected not to exists and it will be created. If None, the key will be created if it doesn't exist. """ @@ -383,7 +385,7 @@ def get_object_for_url(url: ParseResult, existing: Optional[bool] = None) -> "Ob else: raise if existing is True and not objExists: - raise RuntimeError(f"Key '{key_name}' does not exist in bucket '{bucket_name}'.") + raise FileNotFoundError(f"Key '{key_name}' does not exist in bucket '{bucket_name}'.") elif existing is False and objExists: raise RuntimeError(f"Key '{key_name}' exists in bucket '{bucket_name}'.") diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index a8abf7c54a..cbe5619833 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -235,6 +235,7 @@ def _tester(self, cwlfile, jobfile, expect, main_args=[], out_name="output", out # Don't just dump output in the working directory. main_args.extend( [ + "--logDebug", "--outdir", self.outDir ] @@ -413,6 +414,9 @@ def test_download_http(self): def test_download_https(self): self.download("download_https.json", self._tester) + def test_download_https_reference(self): + self.download("download_https.json", partial(self._tester, main_args=["--reference-inputs"])) + def test_download_file(self): self.download("download_file.json", self._tester) @@ -420,6 +424,10 @@ def test_download_file(self): def test_download_directory_s3(self): self.download_directory("download_directory_s3.json", self._tester) + @needs_aws_s3 + def test_download_directory_s3_reference(self): + self.download_directory("download_directory_s3.json", partial(self._tester, main_args=["--reference-inputs"])) + def test_download_directory_file(self): self.download_directory("download_directory_file.json", self._tester) @@ -533,7 +541,7 @@ def path_with_bogus_rev(): pass @needs_aws_s3 - def test_streamable(self): + def test_streamable(self, extra_args: List[str] = None): """ Test that a file with 'streamable'=True is a named pipe. This is a CWL1.2 feature. @@ -546,12 +554,16 @@ def test_streamable(self): st = StringIO() args = [ + "--logDebug", "--outdir", self.outDir, jobstore, os.path.join(self.rootDir, cwlfile), os.path.join(self.rootDir, jobfile), ] + if extra_args: + args = extra_args + args + log.info("Run CWL run: %s", " ".join(args)) cwltoil.main(args, stdout=st) out = json.loads(st.getvalue()) out[out_name].pop("http://commonwl.org/cwltool#generation", None) @@ -561,6 +573,13 @@ def test_streamable(self): with open(out[out_name]["location"][len("file://") :]) as f: self.assertEqual(f.read().strip(), "When is s4 coming out?") + @needs_aws_s3 + def test_streamable_reference(self): + """ + Test that a streamable file is a stream even when passed around by URI. + """ + self.test_streamable(extra_args=["--reference-inputs"]) + def test_preemptible(self): """ Tests that the http://arvados.org/cwl#UsePreemptible extension is supported. diff --git a/src/toil/test/cwl/revsort.cwl b/src/toil/test/cwl/revsort.cwl index a6b2774ade..419027b99a 100644 --- a/src/toil/test/cwl/revsort.cwl +++ b/src/toil/test/cwl/revsort.cwl @@ -10,7 +10,7 @@ cwlVersion: v1.0 # in which the command line tools will execute. hints: - class: DockerRequirement - dockerPull: debian:8 + dockerPull: debian:12 # The inputs array defines the structure of the input object that describes diff --git a/src/toil/test/cwl/revsort2.cwl b/src/toil/test/cwl/revsort2.cwl index 8f4717e3c7..c8e1ecd4c3 100644 --- a/src/toil/test/cwl/revsort2.cwl +++ b/src/toil/test/cwl/revsort2.cwl @@ -10,7 +10,7 @@ cwlVersion: v1.0 # in which the command line tools will execute. hints: - class: DockerRequirement - dockerPull: debian:8 + dockerPull: debian:12 # The inputs array defines the structure of the input object that describes diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 7e2225e20b..8d19c69577 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -4,6 +4,8 @@ import subprocess import unittest import uuid +from typing import Any, Dict, List, Optional, Set +from unittest.mock import patch from unittest.mock import patch from typing import Any, Dict, List, Set @@ -107,25 +109,13 @@ def test_MD5sum(self): assert os.path.exists(result['ga4ghMd5.value']) assert os.path.basename(result['ga4ghMd5.value']) == 'md5sum.txt' - def test_empty_file_path(self): - """Test if empty File type inputs are protected against""" - wdl = os.path.abspath('src/toil/test/wdl/md5sum/md5sum.1.0.wdl') - json_file = os.path.abspath('src/toil/test/wdl/md5sum/empty_file.json') - - p = subprocess.Popen(self.base_command + [wdl, json_file, '-o', self.output_dir, '--logDebug'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - retval = p.wait() - - assert retval != 0 - assert b'Could not find' in stderr - @needs_singularity_or_docker - def test_miniwdl_self_test(self): + def test_miniwdl_self_test(self, extra_args: Optional[List[str]] = None) -> None: """Test if the MiniWDL self test runs and produces the expected output.""" wdl_file = os.path.abspath('src/toil/test/wdl/miniwdl_self_test/self_test.wdl') json_file = os.path.abspath('src/toil/test/wdl/miniwdl_self_test/inputs.json') - result_json = subprocess.check_output(self.base_command + [wdl_file, json_file, '--logDebug', '-o', self.output_dir, '--outputDialect', 'miniwdl']) + result_json = subprocess.check_output(self.base_command + [wdl_file, json_file, '--logDebug', '-o', self.output_dir, '--outputDialect', 'miniwdl'] + (extra_args or [])) result = json.loads(result_json) # Expect MiniWDL-style output with a designated "dir" @@ -149,6 +139,13 @@ def test_miniwdl_self_test(self): assert 'hello_caller.messages' in outputs assert outputs['hello_caller.messages'] == ["Hello, Alyssa P. Hacker!", "Hello, Ben Bitdiddle!"] + @needs_singularity_or_docker + def test_miniwdl_self_test_by_reference(self) -> None: + """ + Test if the MiniWDL self test works when passing input files by URL reference. + """ + self.test_miniwdl_self_test(extra_args=["--referenceInputs=True"]) + @slow @needs_docker_cuda def test_giraffe_deepvariant(self): diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index bfc02e1012..df3943ab89 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -21,6 +21,7 @@ import re import shlex import shutil +import stat import subprocess import sys import uuid @@ -450,15 +451,25 @@ def _call_eager(self, expr: "WDL.Expr.Apply", arguments: List[WDL.Value.Base]) - total_size = 0.0 for uri in file_uris: # Sum up the sizes of all the files, if any. - if uri.startswith(TOIL_URI_SCHEME): - # This is a Toil File ID we encoded; we have the size - # available. - file_id, _, _ = unpack_toil_uri(uri) - # Use the encoded size - total_size += file_id.size + if is_url(uri): + if uri.startswith(TOIL_URI_SCHEME): + # This is a Toil File ID we encoded; we have the size + # available. + file_id, _, _ = unpack_toil_uri(uri) + # Use the encoded size + total_size += file_id.size + else: + # This is some other kind of remote file. + # We need to get its size from the URI. + item_size = AbstractJobStore.get_size(uri) + if item_size is None: + # User asked for the size and we can't figure it out efficiently, so bail out. + raise RuntimeError(f"Attempt to check the size of {uri} failed") + total_size += item_size else: - # We need to fetch it and get its size. - total_size += os.path.getsize(self.stdlib._devirtualize_filename(uri)) + # This is actually a file we can use locally. + local_path = self.stdlib._devirtualize_filename(uri) + total_size += os.path.getsize(local_path) if len(arguments) > 1: # Need to convert units. See @@ -472,6 +483,14 @@ def _call_eager(self, expr: "WDL.Expr.Apply", arguments: List[WDL.Value.Base]) - # Return the result as a WDL float value return WDL.Value.Float(total_size) +def is_url(filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: + """ + Decide if a filename is a known kind of URL + """ + for scheme in schemes: + if filename.startswith(scheme): + return True + return False # Both the WDL code itself **and** the commands that it runs will deal in # "virtualized" filenames. @@ -525,15 +544,6 @@ def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = self._execution_dir = execution_dir - def _is_url(self, filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: - """ - Decide if a filename is a known kind of URL - """ - for scheme in schemes: - if filename.startswith(scheme): - return True - return False - @memoize def _devirtualize_filename(self, filename: str) -> str: """ @@ -543,31 +553,49 @@ def _devirtualize_filename(self, filename: str) -> str: # TODO: Support people doing path operations (join, split, get parent directory) on the virtualized filenames. # TODO: For task inputs, we are supposed to make sure to put things in the same directory if they came from the same directory. See - if filename.startswith(TOIL_URI_SCHEME): - # This is a reference to the Toil filestore. - # Deserialize the FileID - file_id, parent_id, file_basename = unpack_toil_uri(filename) - - # Decide where it should be put - # This is a URI with the "parent" UUID attached to the filename - # Use UUID as folder name rather than a new temp folder to reduce internal clutter - dir_path = os.path.join(self._file_store.localTempDir, parent_id) - if not os.path.exists(parent_id): + if is_url(filename): + if filename.startswith(TOIL_URI_SCHEME): + # This is a reference to the Toil filestore. + # Deserialize the FileID + file_id, parent_id, file_basename = unpack_toil_uri(filename) + + # Decide where it should be put. + # This is a URI with the "parent" UUID attached to the filename. + # Use UUID as folder name rather than a new temp folder to reduce internal clutter. + # Put the UUID in the destination path in order for tasks to + # see where to put files depending on their parents. + dir_path = os.path.join(self._file_store.localTempDir, parent_id) + + else: + # Parse the URL and extract the basename + file_basename = os.path.basename(urlsplit(filename).path) + # Get the URL to the directory this thing came from. Remember + # URLs are interpreted relative to the directory the thing is + # in, not relative to the thing. + parent_url = urljoin(filename, ".") + # Turn it into a string we can make a directory for + dir_path = os.path.join(self._file_store.localTempDir, quote(parent_url, safe='')) + + if not os.path.exists(dir_path): + # Make sure the chosen directory exists os.mkdir(dir_path) - # Put the UUID in the destination path in order for tasks to see where to put files depending on their parents + # And decide the file goes in it. dest_path = os.path.join(dir_path, file_basename) - # And get a local path to the file - result = self._file_store.readGlobalFile(file_id, dest_path) - elif self._is_url(filename): - # This is some other URL that we think Toil knows how to read. - # Import into the job store from here and then download to the node. - # TODO: Can we predict all the URLs that can be used up front and do them all on the leader, where imports are meant to happen? - imported = self._file_store.import_file(filename) - if imported is None: - raise FileNotFoundError(f"Could not import URL {filename}") - # And get a local path to the file - result = self._file_store.readGlobalFile(imported) + if filename.startswith(TOIL_URI_SCHEME): + # Get a local path to the file + result = self._file_store.readGlobalFile(file_id, dest_path) + else: + # Download to a local file with the right name and execute bit. + # Open it exclusively + with open(dest_path, 'xb') as dest_file: + # And save to it + size, executable = AbstractJobStore.read_from_url(filename, dest_file) + if executable: + # Set the execute bit in the file's permissions + os.chmod(dest_path, os.stat(dest_path).st_mode | stat.S_IXUSR) + + result = dest_path else: # This is a local file # To support relative paths, join the execution dir and filename @@ -587,7 +615,8 @@ def _virtualize_filename(self, filename: str) -> str: from a local path in write_dir, 'virtualize' into the filename as it should present in a File value """ - if self._is_url(filename): + + if is_url(filename): # Already virtual logger.debug('Already virtualized %s as WDL file %s', filename, filename) return filename @@ -631,7 +660,7 @@ def _devirtualize_filename(self, filename: str) -> str: Any WDL-side filenames which are paths will be paths in the container. """ - if self._is_url(filename): + if is_url(filename): # We shouldn't have to deal with URLs here; we want to have exactly # two nicely stacked/back-to-back layers of virtualization, joined # on the out-of-container paths. @@ -771,7 +800,7 @@ def _devirtualize_filename(self, filename: str) -> str: Any WDL-side filenames which are relative will be relative to the current directory override, if set. """ - if not self._is_url(filename) and not filename.startswith('/'): + if not is_url(filename) and not filename.startswith('/'): # We are getting a bare relative path from the WDL side. # Find a real path to it relative to the current directory override. work_dir = '.' if not self._current_directory_override else self._current_directory_override @@ -788,7 +817,7 @@ def _virtualize_filename(self, filename: str) -> str: filenames. """ - if not self._is_url(filename) and not filename.startswith('/'): + if not is_url(filename) and not filename.startswith('/'): # We are getting a bare relative path the supposedly devirtualized side. # Find a real path to it relative to the current directory override. work_dir = '.' if not self._current_directory_override else self._current_directory_override @@ -816,6 +845,7 @@ def evaluate_named_expression(context: Union[WDL.Error.SourceNode, WDL.Error.Sou # Do the actual evaluation value = expression.eval(environment, stdlib) + logger.debug("Got value %s of type %s", value, value.type) except Exception: # If something goes wrong, dump. logger.exception("Expression evaluation failed for %s: %s", name, expression) @@ -848,7 +878,11 @@ def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePo if not v.type.optional and inputs_dict is not None: # This is done to enable passing in a string into a task input of file type expected_type = inputs_dict.get(k, None) - new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, expected_type, v, environment, stdlib)) + try: + new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, expected_type, v, environment, stdlib)) + except FileNotFoundError as e: + # MiniWDL's type coercion will raise this when trying to make a File out of Null. + raise WDL.Error.EvalError(context, f"Cannot evaluate expression for {k} with value {v}") return new_bindings def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDL.Value.Base: @@ -925,13 +959,16 @@ def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None: task_container.input_path_map[host_path] = container_path task_container.input_path_map_rev[container_path] = host_path -def import_files(environment: WDLBindings, toil: Toil, path: Optional[List[str]] = None) -> WDLBindings: +def import_files(environment: WDLBindings, toil: Toil, path: Optional[List[str]] = None, skip_remote: bool = False) -> WDLBindings: """ Make sure all File values embedded in the given bindings are imported, using the given Toil object. :param path: If set, try resolving input location relative to the URLs or - directories in this list. + directories in this list. + + :param skip_remote: If set, don't try to import files from remote + locations. Leave them as URIs. """ path_to_id: Dict[str, uuid.UUID] = {} @memoize @@ -945,9 +982,23 @@ def import_file_from_uri(uri: str) -> str: # Try each place it could be according to WDL finding logic. tried.append(candidate_uri) try: - # Try to import the file. Don't raise if we can't find it, just - # return None! - imported = toil.import_file(candidate_uri, check_existence=False) + if skip_remote and is_url(candidate_uri): + # Use remote URIs in place. But we need to find the one that exists. + if not AbstractJobStore.url_exists(candidate_uri): + # Wasn't found there + continue + # Now we know this exists, so pass it through + return candidate_uri + else: + # Actually import + # Try to import the file. Don't raise if we can't find it, just + # return None! + imported = toil.import_file(candidate_uri, check_existence=False) + if imported is None: + # Wasn't found there + continue + logger.info('Imported %s', candidate_uri) + except UnimplementedURLException as e: # We can't find anything that can even support this URL scheme. # Report to the user, they are probably missing an extra. @@ -958,6 +1009,7 @@ def import_file_from_uri(uri: str) -> str: # we have no auth. logger.error("Something went wrong importing %s", candidate_uri) raise + if imported is None: # Wasn't found there continue @@ -970,10 +1022,25 @@ def import_file_from_uri(uri: str) -> str: # We can't have files with no basename because we need to # download them at that basename later. raise RuntimeError(f"File {candidate_uri} has no basename and so cannot be a WDL File") - + # Was actually found + if is_url(candidate_uri): + # Might be a file URI or other URI. + # We need to make sure file URIs and local paths that point to + # the same place are treated the same. + parsed = urlsplit(candidate_uri) + if parsed.scheme == "file:": + # This is a local file URI. Convert to a path for source directory tracking. + parent_dir = os.path.dirname(unquote(parsed.path)) + else: + # This is some other URL. Get the URL to the parent directory and use that. + parent_dir = urljoin(candidate_uri, ".") + else: + # Must be a local path + parent_dir = os.path.dirname(candidate_uri) + # Pack a UUID of the parent directory - dir_id = path_to_id.setdefault(os.path.dirname(candidate_uri), uuid.uuid4()) + dir_id = path_to_id.setdefault(parent_dir, uuid.uuid4()) return pack_toil_uri(imported, dir_id, file_basename) @@ -997,12 +1064,22 @@ def drop_if_missing(value_type: WDL.Type.Base, filename: str) -> Optional[str]: """ Return None if a file doesn't exist, or its path if it does. """ - effective_path = os.path.abspath(os.path.join(work_dir, filename)) - if os.path.exists(effective_path): - return filename + logger.debug("Consider file %s", filename) + + if is_url(filename): + if filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists(filename): + # We assume anything in the filestore actually exists. + return filename + else: + logger.warning('File %s with type %s does not actually exist at its URI', filename, value_type) + return None else: - logger.debug('File %s with type %s does not actually exist at %s', filename, value_type, effective_path) - return None + effective_path = os.path.abspath(os.path.join(work_dir, filename)) + if os.path.exists(effective_path): + return filename + else: + logger.warning('File %s with type %s does not actually exist at %s', filename, value_type, effective_path) + return None return map_over_typed_files_in_bindings(environment, drop_if_missing) @@ -1076,6 +1153,7 @@ def map_over_typed_files_in_value(value: WDL.Value.Base, transform: Callable[[WD if new_path is None: # Assume the transform checked types if we actually care about the # result. + logger.warning("File %s became Null", value) return WDL.Value.Null() else: # Make whatever the value is around the new path. @@ -2562,6 +2640,8 @@ def main() -> None: help=("Directory or URI prefix to save output files at. By default a new directory is created in the current directory.")) parser.add_argument("--outputFile", "-m", dest="output_file", type=str, default=None, help="File or URI to save output JSON to.") + parser.add_argument("--referenceInputs", dest="reference_inputs", type="bool", default=False, # type: ignore + help="Pass input files by URL") options = parser.parse_args(sys.argv[1:]) @@ -2629,7 +2709,7 @@ def main() -> None: inputs_search_path.append(match.group(0)) # Import any files in the bindings - input_bindings = import_files(input_bindings, toil, inputs_search_path) + input_bindings = import_files(input_bindings, toil, inputs_search_path, skip_remote=options.reference_inputs) # TODO: Automatically set a good MINIWDL__SINGULARITY__IMAGE_CACHE ? @@ -2649,32 +2729,37 @@ def devirtualize_output(filename: str) -> str: 'devirtualize' a file using the "toil" object instead of a filestore. Returns its local path. """ - if filename.startswith(TOIL_URI_SCHEME): - # This is a reference to the Toil filestore. - # Deserialize the FileID and required basename - file_id, parent_id, file_basename = unpack_toil_uri(filename) + if is_url(filename): + if filename.startswith(TOIL_URI_SCHEME): + # This is a reference to the Toil filestore. + # Deserialize the FileID and required basename + file_id, parent_id, file_basename = unpack_toil_uri(filename) + else: + # Parse the URL and extract the basename + file_basename = os.path.basename(urlsplit(filename).path) + # Figure out where it should go. # If a UUID is included, it will be omitted + # TODO: Deal with name collisions in the export directory dest_name = os.path.join(output_directory, file_basename) - # TODO: Deal with name collisions - # Export the file - toil.export_file(file_id, dest_name) + + if filename.startswith(TOIL_URI_SCHEME): + # Export the file + toil.export_file(file_id, dest_name) + else: + # Download to a local file with the right name and execute bit. + # Open it exclusively + with open(dest_name, 'xb') as dest_file: + # And save to it + size, executable = AbstractJobStore.read_from_url(filename, dest_file) + if executable: + # Set the execute bit in the file's permissions + os.chmod(dest_name, os.stat(dest_name).st_mode | stat.S_IXUSR) + # And return where we put it return dest_name - elif filename.startswith('http:') or filename.startswith('https:') or filename.startswith('s3:') or filename.startswith('gs:'): - # This is a URL that we think Toil knows how to read. - imported = toil.import_file(filename) - if imported is None: - raise FileNotFoundError(f"Could not import URL {filename}") - # Get a basename from the URL. - # TODO: Deal with name collisions - file_basename = os.path.basename(urlsplit(filename).path) - # Do the same as we do for files we actually made. - dest_name = os.path.join(output_directory, file_basename) - toil.export_file(imported, dest_name) - return dest_name else: - # Not a fancy file + # We already had a path return filename # Make all the files local files