Skip to content

Commit

Permalink
Allow working with remote files in CWL and WDL workflows (#4690)
Browse files Browse the repository at this point in the history
* Start implementing real ToilFsAccess URL operations

* Implement URL opening for CWL

* Implement other ToilFsAccess operations without local copies

* Remove getSize spelling and pass mypy

* Add missing import

* Remove check for extremely old setuptools

* Add --reference-inputs option to toil-cwl-runner

* Allow files to be gotten by URI on the nodes

* Add some tests to exercise URL references

* Implement URI access and import logic in WDL interpreter

* Remove duplicated test

* Fixc some merge problems

* Satisfy MyPy

* Spell default correctly

* Actually hook up import bypass flag

* Actually pass self test when using URLs

* Make file job store volunteer for non-schemed URIs

* Revert "Make file job store volunteer for non-schemed URIs"

This reverts commit 3d1e8f6.

* Handle size requests for bare filenames

* Handle polling for URL existence

* Add a make test_debug target for getting test logs

* Add more logging to CWL streaming tests

* Contemplate multi-threaded access to the CachingFileStore from user code

* Allow downloading URLs in structures, and poll AWS directory existence right

* Update tests to a Debian with ARM Docker images

* Undo permission changes

* Add missing import

---------

Co-authored-by: Michael R. Crusoe <[email protected]>
  • Loading branch information
adamnovak and mr-c authored Dec 7, 2023
1 parent f60382e commit e80db2c
Show file tree
Hide file tree
Showing 16 changed files with 1,240 additions and 772 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ test: check_venv check_build_reqs
TOIL_OWNER_TAG="shared" \
python -m pytest --durations=0 --strict-markers --log-level DEBUG --log-cli-level INFO -r s $(cov) -n $(threads) --dist loadscope $(tests) -m "$(marker)"

test_debug: check_venv check_build_reqs
TOIL_OWNER_TAG="$(whoami)" \
python -m pytest --durations=0 --strict-markers --log-level DEBUG -s -o log_cli=true --log-cli-level DEBUG -r s $(tests) -m "$(marker)" --tb=native --maxfail=1


# This target will skip building docker and all docker based tests
# these are our travis tests; rename?
Expand Down
380 changes: 272 additions & 108 deletions src/toil/cwl/cwltoil.py

Large diffs are not rendered by default.

59 changes: 49 additions & 10 deletions src/toil/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

import logging
import os
from pathlib import PurePosixPath
import posixpath
import stat
from typing import (
Any,
Callable,
Expand All @@ -31,6 +34,7 @@

from toil.fileStores import FileID
from toil.fileStores.abstractFileStore import AbstractFileStore
from toil.jobStores.abstractJobStore import AbstractJobStore

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -128,6 +132,32 @@ def visit_cwl_class_and_reduce(

DirectoryStructure = Dict[str, Union[str, "DirectoryStructure"]]

def get_from_structure(dir_dict: DirectoryStructure, path: str) -> Union[str, DirectoryStructure, None]:
"""
Given a relative path, follow it in the given directory structure.
Return the string URI for files, the directory dict for
subdirectories, or None for nonexistent things.
"""

# Resolve .. and split into path components
parts = PurePosixPath(posixpath.normpath(path)).parts
if len(parts) == 0:
return dir_dict
if parts[0] in ('..', '/'):
raise RuntimeError(f"Path {path} not resolvable in virtual directory")
found: Union[str, DirectoryStructure] = dir_dict
for part in parts:
# Go down by each path component in turn
if isinstance(found, str):
# Looking for a subdirectory of a file, which doesn't exist
return None
if part not in found:
return None
found = found[part]
# Now we're at the place we want to be.
return found


def download_structure(
file_store: AbstractFileStore,
Expand All @@ -140,11 +170,12 @@ def download_structure(
Download nested dictionary from the Toil file store to a local path.
Guaranteed to fill the structure with real files, and not symlinks out of
it to elsewhere.
it to elsewhere. File URIs may be toilfile: URIs or any other URI that
Toil's job store system can read.
:param file_store: The Toil file store to download from.
:param index: Maps from downloaded file path back to input Toil URI.
:param index: Maps from downloaded file path back to input URI.
:param existing: Maps from file_store_id URI to downloaded file path.
Expand All @@ -171,16 +202,24 @@ def download_structure(
# This must be a file path uploaded to Toil.
if not isinstance(value, str):
raise RuntimeError(f"Did not find a file at {value}.")
if not value.startswith("toilfile:"):
raise RuntimeError(f"Did not find a filestore file at {value}")

logger.debug("Downloading contained file '%s'", name)
dest_path = os.path.join(into_dir, name)
# So download the file into place.
# Make sure to get a real copy of the file because we may need to
# mount the directory into a container as a whole.
file_store.readGlobalFile(
FileID.unpack(value[len("toilfile:") :]), dest_path, symlink=False
)

if value.startswith("toilfile:"):
# So download the file into place.
# Make sure to get a real copy of the file because we may need to
# mount the directory into a container as a whole.
file_store.readGlobalFile(
FileID.unpack(value[len("toilfile:") :]), dest_path, symlink=False
)
else:
# We need to download from some other kind of URL.
size, executable = AbstractJobStore.read_from_url(value, open(dest_path, 'wb'))
if executable:
# Make the written file executable
os.chmod(dest_path, os.stat(dest_path).st_mode | stat.S_IXUSR)

# Update the index dicts
# TODO: why?
index[dest_path] = value
Expand Down
19 changes: 18 additions & 1 deletion src/toil/fileStores/abstractFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
Generator,
Iterator,
List,
Literal,
Optional,
Set,
Tuple,
Type,
Union,
cast)
cast,
overload)

import dill

Expand Down Expand Up @@ -413,6 +415,21 @@ def readGlobalFile(
"""
raise NotImplementedError()

@overload
def readGlobalFileStream(
self,
fileStoreID: str,
encoding: Literal[None] = None,
errors: Optional[str] = None,
) -> ContextManager[IO[bytes]]:
...

@overload
def readGlobalFileStream(
self, fileStoreID: str, encoding: str, errors: Optional[str] = None
) -> ContextManager[IO[str]]:
...

@abstractmethod
def readGlobalFileStream(
self,
Expand Down
Loading

0 comments on commit e80db2c

Please sign in to comment.