Skip to content

Commit

Permalink
Restore old DRS copy_batch API; separate copy_batch with manifest (#374)
Browse files Browse the repository at this point in the history
* Restore old DRS copy_batch API; separate copy_batch with manifest

* Respond to review feedback

* Respond to review feedback from @DailyDreaming

* Respond to review feedback from @DailyDreaming
  • Loading branch information
jessebrennan authored Nov 29, 2021
1 parent c351379 commit ae862f4
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 13 deletions.
1 change: 1 addition & 0 deletions terra_notebook_utils/blobstore/copy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def _do_copy(src_blob: AnyBlob, dst_blob: AnyBlob, multipart_threshold: int, ind
logger.debug(f"Copied {src_blob.url} to {dst_blob.url}")
except Exception:
logger.exception(f"copy failed: '{src_blob.url}' to '{dst_blob.url}'")
# FIXME: The finally block is useless, but what was the intent? Probably idempotent delete
try:
dst_blob.delete()
finally:
Expand Down
13 changes: 7 additions & 6 deletions terra_notebook_utils/cli/commands/drs.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,20 @@ def drs_copy_batch(args: argparse.Namespace):
}
]
"""
args.workspace, args.workspace_namespace = CLIConfig.resolve(args.workspace, args.workspace_namespace)
kwargs: Dict[str, Any] = dict(workspace_name=args.workspace, workspace_namespace=args.workspace_namespace)
if CLIConfig.progress_indicator_type() is not None:
kwargs['indicator_type'] = CLIConfig.progress_indicator_type()
if args.drs_uris:
assert args.manifest is None, "Cannot use 'drs_uris' with '--manifest'"
manifest = [dict(drs_uri=uri, dst=args.dst) for uri in args.drs_uris]
assert args.dst is not None, "Must specify a destination with '--dst'"
drs.copy_batch_urls(args.drs_uris, args.dst, **kwargs)
elif args.manifest:
with open(args.manifest) as fh:
manifest = json.loads(fh.read())
drs.copy_batch_manifest(manifest, **kwargs)
else:
raise RuntimeError("Must supply either 'drs_uris' or '--manifest'")
args.workspace, args.workspace_namespace = CLIConfig.resolve(args.workspace, args.workspace_namespace)
kwargs: Dict[str, Any] = dict(workspace_name=args.workspace, workspace_namespace=args.workspace_namespace)
if CLIConfig.progress_indicator_type() is not None:
kwargs['indicator_type'] = CLIConfig.progress_indicator_type()
drs.copy_batch(manifest, **kwargs)

@drs_cli.command("head", arguments={
"drs_url": dict(type=str),
Expand Down
45 changes: 40 additions & 5 deletions terra_notebook_utils/drs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from functools import lru_cache
from collections import namedtuple
from typing import Dict, List, Tuple, Optional, Union
from typing import Dict, List, Tuple, Optional, Union, Iterable
from requests import Response

from terra_notebook_utils import WORKSPACE_BUCKET, WORKSPACE_NAME, MARTHA_URL, WORKSPACE_NAMESPACE, \
Expand Down Expand Up @@ -230,6 +230,10 @@ def _resolve_bucket_target(url: str, info: DRSInfo) -> Tuple[str, str]:
return bucket_name, key

def _resolve_local_target(filepath: str, info: DRSInfo) -> str:
# Checking for a file also prevents a batch_copy from overwriting
# the same file over and over again.
if os.path.isfile(filepath):
raise FileExistsError(f'Cannot copy {info.name} to {filepath} because file already exists')
if filepath.endswith(os.path.sep) or os.path.isdir(filepath):
filename = info.name or info.key.rsplit("/", 1)[-1]
filepath = os.path.join(os.path.abspath(filepath), filename)
Expand Down Expand Up @@ -290,6 +294,37 @@ def copy_to_bucket(drs_uri: str,
dst_url += f"/{dst_key}"
copy(drs_uri, dst_url, indicator_type, workspace_name, workspace_namespace)

def copy_batch(manifest: Optional[List[Dict[str, str]]] = None,
drs_urls: Optional[Iterable[str]] = None,
dst_pfx: Optional[str] = None,
indicator_type: Indicator = Indicator.notebook_bar if is_notebook() else Indicator.log,
workspace_name: Optional[str] = WORKSPACE_NAME,
workspace_namespace: Optional[str] = WORKSPACE_NAMESPACE):
if (manifest is None) == (drs_urls is None):
raise ValueError("Specify either 'manifest' or 'drs_urls' and 'dst_pfx'")
elif manifest is not None:
if dst_pfx is not None:
raise ValueError('dst_pfx not compatible with manifest')
copy_batch_manifest(manifest, indicator_type, workspace_name, workspace_namespace)
elif drs_urls is not None:
if dst_pfx is None:
raise ValueError('dst_pfx required with drs_urls')
copy_batch_urls(drs_urls, dst_pfx, indicator_type, workspace_name, workspace_namespace)
else:
assert False

def copy_batch_urls(drs_urls: Iterable[str],
dst_pfx: str,
indicator_type: Indicator = Indicator.notebook_bar if is_notebook() else Indicator.log,
workspace_name: Optional[str] = WORKSPACE_NAME,
workspace_namespace: Optional[str] = WORKSPACE_NAMESPACE):
enable_requester_pays(workspace_name, workspace_namespace)
with DRSCopyClient(indicator_type=indicator_type) as cc:
cc.workspace = workspace_name
cc.workspace_namespace = workspace_namespace
for drs_url in drs_urls:
cc.copy(drs_url, dst_pfx)

manifest_schema = {
"type": "array",
"items": {
Expand All @@ -302,10 +337,10 @@ def copy_to_bucket(drs_uri: str,
},
}

def copy_batch(manifest: List[Dict[str, str]],
indicator_type: Indicator=Indicator.notebook_bar if is_notebook() else Indicator.log,
workspace_name: Optional[str]=WORKSPACE_NAME,
workspace_namespace: Optional[str]=WORKSPACE_NAMESPACE):
def copy_batch_manifest(manifest: List[Dict[str, str]],
indicator_type: Indicator=Indicator.notebook_bar if is_notebook() else Indicator.log,
workspace_name: Optional[str]=WORKSPACE_NAME,
workspace_namespace: Optional[str]=WORKSPACE_NAMESPACE):
from jsonschema import validate
validate(instance=manifest, schema=manifest_schema)
enable_requester_pays(workspace_name, workspace_namespace)
Expand Down
30 changes: 28 additions & 2 deletions tests/test_drs.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,32 @@ def _gs_obj_exists(self, key: str) -> bool:

@testmode("controlled_access")
def test_copy_batch(self):
drs_urls = {
# 1631686 bytes # name property disapeard from DRS response :(
# "NWD522743.b38.irc.v1.cram.crai": "drs://dg.4503/95cc4ae1-dee7-4266-8b97-77cf46d83d35", # 1631686 bytes
"NWD522743.b38.irc.v1.cram.crai": "drs://dg.4503/95cc4ae1-dee7-4266-8b97-77cf46d83d35",
"data_phs001237.v2.p1.c1.avro.gz": "drs://dg.4503/26e11149-5deb-4cd7-a475-16997a825655", # 1115092 bytes
"RootStudyConsentSet_phs001237.TOPMed_WGS_WHI.v2.p1.c1.HMB-IRB.tar.gz":
"drs://dg.4503/e9c2caf2-b2a1-446d-92eb-8d5389e99ee3", # 332237 bytes

# "NWD961306.freeze5.v1.vcf.gz": "drs://dg.4503/6e73a376-f7fd-47ed-ac99-0567bb5a5993", # 2679331445 bytes
# "NWD531899.freeze5.v1.vcf.gz": "drs://dg.4503/651a4ad1-06b5-4534-bb2c-1f8ed51134f6", # 2679411265 bytes
}
pfx = f"test-batch-copy/{uuid4()}"
bucket = gs.get_client().bucket(WORKSPACE_BUCKET)
with self.subTest("gs bucket"):
drs.copy_batch(list(drs_urls.values()), f"gs://fc-9169fcd1-92ce-4d60-9d2d-d19fd326ff10/{pfx}")
for name in list(drs_urls.keys()):
blob = bucket.get_blob(f"{pfx}/{name}")
self.assertGreater(blob.size, 0)
with self.subTest("local filesystem"):
with tempfile.TemporaryDirectory() as dirname:
drs.copy_batch(list(drs_urls.values()), dirname)
names = [os.path.basename(path) for path in _list_tree(dirname)]
self.assertEqual(sorted(names), sorted(list(drs_urls.keys())))

@testmode("controlled_access")
def test_copy_batch_manifest(self):
drs_uris = {
"CCDG_13607_B01_GRM_WGS_2019-02-19_chr2.recalibrated_variants.annotated.clinical.txt": DRS_URI_003_MB,
"CCDG_13607_B01_GRM_WGS_2019-02-19_chr9.recalibrated_variants.annotated.clinical.txt": DRS_URI_370_KB,
Expand All @@ -341,7 +367,7 @@ def test_copy_batch(self):
manifest.extend([dict(drs_uri=uri, dst=dirname) for uri in drs_uris.values()])
manifest.extend([dict(drs_uri=uri, dst=os.path.join(dirname, name))
for name, uri in named_drs_uris.items()])
drs.copy_batch(manifest)
drs.copy_batch_manifest(manifest)
for name in dict(**drs_uris, **named_drs_uris):
blob = bucket.get_blob(f"{pfx}/{name}")
self.assertGreater(blob.size, 0)
Expand All @@ -351,7 +377,7 @@ def test_copy_batch(self):
with self.subTest("malformed manifest"):
manifest = [dict(a="b"), dict(drs_uri="drs://foo", dst=".")]
with self.assertRaises(jsonschema.exceptions.ValidationError):
drs.copy_batch(manifest)
drs.copy_batch_manifest(manifest)

@testmode("controlled_access")
def test_extract_tar_gz(self):
Expand Down

0 comments on commit ae862f4

Please sign in to comment.