Skip to content

Commit f414093

Browse files
author
dsamaey
committed
Issue #747 fixed chunking
1 parent 69af9a8 commit f414093

File tree

3 files changed

+20
-19
lines changed

3 files changed

+20
-19
lines changed

openeo/rest/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
# TODO: get from config file
66
DEFAULT_DOWNLOAD_CHUNK_SIZE = 10_000_000 # 10MB
7-
7+
DEFAULT_DOWNLOAD_RANGE_SIZE = 500_000_000 # 500MB
88

99
DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX = 60
1010
DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL = 30

openeo/rest/job.py

+17-16
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from openeo.internal.warnings import deprecated, legacy_alias
1717
from openeo.rest import (
1818
DEFAULT_DOWNLOAD_CHUNK_SIZE,
19+
DEFAULT_DOWNLOAD_RANGE_SIZE,
1920
DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL,
2021
DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX,
2122
DEFAULT_JOB_STATUS_POLL_SOFT_ERROR_MAX,
@@ -36,7 +37,7 @@
3637

3738

3839
DEFAULT_JOB_RESULTS_FILENAME = "job-results.json"
39-
MAX_RETRIES_PER_CHUNK = 3
40+
MAX_RETRIES_PER_RANGE = 3
4041
RETRIABLE_STATUSCODES = [408, 429, 500, 501, 502, 503, 504]
4142

4243
class BatchJob:
@@ -384,7 +385,7 @@ def __repr__(self):
384385
)
385386

386387
def download(
387-
self, target: Optional[Union[Path, str]] = None, *, chunk_size: int = DEFAULT_DOWNLOAD_CHUNK_SIZE
388+
self, target: Optional[Union[Path, str]] = None, *, chunk_size: int = DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE
388389
) -> Path:
389390
"""
390391
Download asset to given location
@@ -399,7 +400,7 @@ def download(
399400
target = target / self.name
400401
ensure_dir(target.parent)
401402
logger.info("Downloading Job result asset {n!r} from {h!s} to {t!s}".format(n=self.name, h=self.href, t=target))
402-
self._download_to_file(url=self.href, target=target, chunk_size=chunk_size)
403+
self._download_to_file(url=self.href, target=target, chunk_size=chunk_size, range_size=range_size)
403404
return target
404405

405406
def _get_response(self, stream=True) -> requests.Response:
@@ -418,26 +419,26 @@ def load_bytes(self) -> bytes:
418419
# TODO: more `load` methods e.g.: load GTiff asset directly as numpy array
419420

420421

421-
def _download_to_file(self, url: str, target: Path, chunk_size: int):
422+
def _download_to_file(self, url: str, target: Path, *, chunk_size: int=DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE):
422423
head = self.job.connection.head(url, stream=True)
423-
if head.ok and head.headers.get("Accept-Ranges") == "bytes":
424+
if head.ok and head.headers.get("Accept-Ranges") == "bytes" and 'Content-Length' in head.headers:
424425
file_size = int(head.headers['Content-Length'])
425-
self._download_chunked(url=url, target=target, file_size=file_size, chunk_size=chunk_size)
426+
self._download_ranged(url=url, target=target, file_size=file_size, chunk_size=chunk_size, range_size=range_size)
426427
else:
427-
self._download_unchunked(url=url, target=target)
428+
self._download_all_at_once(url=url, target=target, chunk_size=chunk_size)
428429

429430

430-
def _download_chunked(self, url: str, target: Path, file_size: int, chunk_size: int):
431+
def _download_ranged(self, url: str, target: Path, file_size: int, *, chunk_size: int=DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE):
431432
with target.open('wb') as f:
432-
for from_byte_index in range(0, file_size, chunk_size):
433-
to_byte_index = min(from_byte_index + chunk_size - 1, file_size - 1)
434-
tries_left = MAX_RETRIES_PER_CHUNK
433+
for from_byte_index in range(0, file_size, range_size):
434+
to_byte_index = min(from_byte_index + range_size - 1, file_size - 1)
435+
tries_left = MAX_RETRIES_PER_RANGE
435436
while tries_left > 0:
436437
try:
437438
range_headers = {"Range": f"bytes={from_byte_index}-{to_byte_index}"}
438439
with self.job.connection.get(path=url, headers=range_headers, stream=True) as r:
439440
r.raise_for_status()
440-
shutil.copyfileobj(r.raw, f)
441+
shutil.copyfileobj(fsrc=r.raw, fdst=f, length=chunk_size)
441442
break
442443
except OpenEoApiPlainError as error:
443444
tries_left -= 1
@@ -448,11 +449,11 @@ def _download_chunked(self, url: str, target: Path, file_size: int, chunk_size:
448449
raise error
449450

450451

451-
def _download_unchunked(self, url: str, target: Path):
452+
def _download_all_at_once(self, url: str, target: Path, *, chunk_size: int=DEFAULT_DOWNLOAD_CHUNK_SIZE):
452453
with self.job.connection.get(path=url, stream=True) as r:
453454
r.raise_for_status()
454455
with target.open("wb") as f:
455-
shutil.copyfileobj(r.raw, f)
456+
shutil.copyfileobj(fsrc=r.raw, fdst=f, length=chunk_size)
456457

457458

458459
class MultipleAssetException(OpenEoClientException):
@@ -532,7 +533,7 @@ def get_asset(self, name: str = None) -> ResultAsset:
532533
"No asset {n!r} in: {a}".format(n=name, a=[a.name for a in assets])
533534
)
534535

535-
def download_file(self, target: Union[Path, str] = None, name: str = None, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE) -> Path:
536+
def download_file(self, target: Union[Path, str] = None, name: str = None, *, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE) -> Path:
536537
"""
537538
Download single asset. Can be used when there is only one asset in the
538539
:py:class:`JobResults`, or when the desired asset name is given explicitly.
@@ -544,7 +545,7 @@ def download_file(self, target: Union[Path, str] = None, name: str = None, chunk
544545
:return: path of downloaded asset
545546
"""
546547
try:
547-
return self.get_asset(name=name).download(target=target, chunk_size=chunk_size)
548+
return self.get_asset(name=name).download(target=target, chunk_size=chunk_size, range_size=range_size)
548549
except MultipleAssetException:
549550
raise OpenEoClientException(
550551
"Can not use `download_file` with multiple assets. Use `download_files` instead.")

tests/rest/test_job.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -600,10 +600,10 @@ def test_get_results_download_file(job_with_1_asset: BatchJob, tmp_path):
600600
with target.open("rb") as f:
601601
assert f.read() == TIFF_CONTENT
602602

603-
def test_get_results_download_chunked_file_using_head(job_with_chunked_asset_using_head: BatchJob, tmp_path):
603+
def test_get_results_download_file_ranged(job_with_chunked_asset_using_head: BatchJob, tmp_path):
604604
job = job_with_chunked_asset_using_head
605605
target = tmp_path / "result.tiff"
606-
res = job.get_results().download_file(target, chunk_size=1000)
606+
res = job.get_results().download_file(target, range_size=1000)
607607
assert res == target
608608
with target.open("rb") as f:
609609
assert f.read() == TIFF_CONTENT

0 commit comments

Comments
 (0)