Skip to content

Commit

Permalink
Actually implement new API in job store implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
adamnovak committed Jan 31, 2025
1 parent 7f96c2b commit abab772
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 24 deletions.
14 changes: 7 additions & 7 deletions src/toil/jobStores/aws/jobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,17 +657,17 @@ def _url_exists(cls, url: ParseResult, config: Config) -> bool:
return cls._get_is_directory(url)

@classmethod
def _get_size(cls, url: ParseResult) -> int:
def _get_size(cls, url: ParseResult, config: Config) -> int:
return get_object_for_url(url, existing=True, anonymous=config.aws_anonymous_url_access).content_length

@classmethod
def _read_from_url(cls, url: ParseResult, writable):
def _read_from_url(cls, url: ParseResult, writable, config: Config):
srcObj = get_object_for_url(url, existing=True, anonymous=config.aws_anonymous_url_access)
srcObj.download_fileobj(writable)
return (srcObj.content_length, False) # executable bit is always False

@classmethod
def _open_url(cls, url: ParseResult) -> IO[bytes]:
def _open_url(cls, url: ParseResult, config: Config) -> IO[bytes]:
src_obj = get_object_for_url(url, existing=True, anonymous=config.aws_anonymous_url_access)
response = src_obj.get()
# We should get back a response with a stream in 'Body'
Expand All @@ -677,7 +677,7 @@ def _open_url(cls, url: ParseResult) -> IO[bytes]:

@classmethod
def _write_to_url(
cls, readable, url: ParseResult, executable: bool = False
cls, readable, url: ParseResult, executable: bool, config: Config
) -> None:
dstObj = get_object_for_url(url, anonymous=config.aws_anonymous_url_access)

Expand All @@ -692,14 +692,14 @@ def _write_to_url(
)

@classmethod
def _list_url(cls, url: ParseResult) -> list[str]:
def _list_url(cls, url: ParseResult, config: Config) -> list[str]:
return list_objects_for_url(url, anonymous=config.aws_anonymous_url_access)

@classmethod
def _get_is_directory(cls, url: ParseResult) -> bool:
def _get_is_directory(cls, url: ParseResult, config: Config) -> bool:
# We consider it a directory if anything is in it.
# TODO: Can we just get the first item and not the whole list?
return len(cls._list_url(url)) > 0
return len(cls._list_url(url, config)) > 0

@classmethod
def _supports_url(cls, url: ParseResult, export: bool = False) -> bool:
Expand Down
16 changes: 8 additions & 8 deletions src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,15 @@ def _move_and_linkback(self, srcPath, destPath, executable):
os.chmod(destPath, os.stat(destPath).st_mode | stat.S_IXUSR)

@classmethod
def _url_exists(cls, url: ParseResult) -> bool:
def _url_exists(cls, url: ParseResult, config: Config) -> bool:
return os.path.exists(cls._extract_path_from_url(url))

@classmethod
def _get_size(cls, url):
def _get_size(cls, url: ParseResult, config: Config) -> int:
return os.stat(cls._extract_path_from_url(url)).st_size

@classmethod
def _read_from_url(cls, url, writable):
def _read_from_url(cls, url: ParseResult, writable: bool, config: Config) -> tuple[int, bool]:
"""
Writes the contents of a file to a source (writes url to writable)
using a ~10Mb buffer.
Expand All @@ -414,21 +414,21 @@ def _read_from_url(cls, url, writable):
"""

# we use a ~10Mb buffer to improve speed
with cls._open_url(url) as readable:
with cls._open_url(url, config) as readable:
shutil.copyfileobj(readable, writable, length=cls.BUFFER_SIZE)
# Return the number of bytes we read when we reached EOF.
executable = os.stat(readable.name).st_mode & stat.S_IXUSR
return readable.tell(), executable

@classmethod
def _open_url(cls, url: ParseResult) -> IO[bytes]:
def _open_url(cls, url: ParseResult, config: Config) -> IO[bytes]:
"""
Open a file URL as a binary stream.
"""
return open(cls._extract_path_from_url(url), "rb")

@classmethod
def _write_to_url(cls, readable, url, executable=False):
def _write_to_url(cls, readable: IO[bytes], url: ParseResult, executable: bool, config: Config):
"""
Writes the contents of a file to a source (writes readable to url)
using a ~10Mb buffer.
Expand All @@ -445,7 +445,7 @@ def _write_to_url(cls, readable, url, executable=False):
)

@classmethod
def _list_url(cls, url: ParseResult) -> list[str]:
def _list_url(cls, url: ParseResult, config: Config) -> list[str]:
path = cls._extract_path_from_url(url)
listing = []
for p in os.listdir(path):
Expand All @@ -458,7 +458,7 @@ def _list_url(cls, url: ParseResult) -> list[str]:
return listing

@classmethod
def _get_is_directory(cls, url: ParseResult) -> bool:
def _get_is_directory(cls, url: ParseResult, config: Config) -> bool:
path = cls._extract_path_from_url(url)
return os.path.isdir(path)

Expand Down
14 changes: 7 additions & 7 deletions src/toil/jobStores/googleJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,25 +441,25 @@ def _get_blob_from_url(cls, url, exists=False):
return blob

@classmethod
def _url_exists(cls, url: ParseResult) -> bool:
def _url_exists(cls, url: ParseResult, config: Config) -> bool:
try:
cls._get_blob_from_url(url, exists=True)
return True
except NoSuchFileException:
return False

@classmethod
def _get_size(cls, url):
def _get_size(cls, url, config: Config):
return cls._get_blob_from_url(url, exists=True).size

@classmethod
def _read_from_url(cls, url, writable):
def _read_from_url(cls, url, writable, config: Config):
blob = cls._get_blob_from_url(url, exists=True)
blob.download_to_file(writable)
return blob.size, False

@classmethod
def _open_url(cls, url: ParseResult) -> IO[bytes]:
def _open_url(cls, url: ParseResult, config: Config) -> IO[bytes]:
blob = cls._get_blob_from_url(url, exists=True)
return blob.open("rb")

Expand All @@ -468,18 +468,18 @@ def _supports_url(cls, url, export=False):
return url.scheme.lower() == "gs"

@classmethod
def _write_to_url(cls, readable: bytes, url: str, executable: bool = False) -> None:
def _write_to_url(cls, readable: bytes, url: str, executable: bool, config: Config) -> None:
blob = cls._get_blob_from_url(url)
blob.upload_from_file(readable)

@classmethod
def _list_url(cls, url: ParseResult) -> list[str]:
def _list_url(cls, url: ParseResult, config: Config) -> list[str]:
raise NotImplementedError(
"Listing files in Google buckets is not yet implemented!"
)

@classmethod
def _get_is_directory(cls, url: ParseResult) -> bool:
def _get_is_directory(cls, url: ParseResult, config: Config) -> bool:
raise NotImplementedError(
"Checking directory status in Google buckets is not yet implemented!"
)
Expand Down
7 changes: 5 additions & 2 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,13 @@ async def toil_read_source(
# TODO: this is probably sync work that would be better as async work here
job_store.read_from_url(candidate_uri, destination_buffer)
except Exception as e:
# TODO: we need to assume any error is just a not-found,
# We need to assume any arbitrary error is just a not-found,
# because the exceptions thrown by read_from_url()
# implementations are not specified.
logger.debug("Tried to fetch %s from %s but got %s", uri, candidate_uri, e)
logger.debug("Tried to fetch %s from %s but got %s: %s", uri, candidate_uri, type(e), e)
if isinstance(e, TypeError):
# This is probably actually a bug
raise
continue
# If we get here, we got it probably.
try:
Expand Down

0 comments on commit abab772

Please sign in to comment.