Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 130 additions & 66 deletions s3fs/core.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# -*- coding: utf-8 -*-
import math
import asyncio
import errno
import io
import logging
import mimetypes
import os
import socket
from typing import Tuple, Optional
from typing import IO, Tuple, Optional
import weakref
import re

Expand Down Expand Up @@ -68,6 +69,9 @@ def setup_logging(level=None):
FSTimeoutError,
ResponseParserError,
)
MIN_CHUNK_SIZE = 5 * 2**20 # minimum part size for multipart upload is 5MiB
MAX_CHUNK_SIZE = 5 * 2**30 # maximum part size for S3 multipart upload is 5GiB
MAX_UPLOAD_PARTS = 10_000 # maximum number of parts for S3 multipart upload

if ClientPayloadError is not None:
S3_RETRYABLE_ERRORS += (ClientPayloadError,)
Expand Down Expand Up @@ -1230,7 +1234,7 @@ async def _put_file(
lpath,
rpath,
callback=_DEFAULT_CALLBACK,
chunksize=50 * 2**20,
chunksize=None,
max_concurrency=None,
mode="overwrite",
**kwargs,
Expand Down Expand Up @@ -1258,43 +1262,47 @@ async def _put_file(
if content_type is not None:
kwargs["ContentType"] = content_type

with open(lpath, "rb") as f0:
if size < min(5 * 2**30, 2 * chunksize):
chunk = f0.read()
if size < min(5 * 2**30, 2 * (chunksize or 5 * 2**20)):
with open(lpath, "rb") as f0:
await self._call_s3(
"put_object", Bucket=bucket, Key=key, Body=chunk, **kwargs, **match
"put_object",
Bucket=bucket,
Key=key,
Body=f0,
**kwargs,
**match,
)
callback.relative_update(size)
else:

mpu = await self._call_s3(
"create_multipart_upload", Bucket=bucket, Key=key, **kwargs
else:
mpu = await self._call_s3(
"create_multipart_upload", Bucket=bucket, Key=key, **kwargs
)
try:
out = await self._upload_file_part_concurrent(
bucket,
key,
mpu,
lpath,
size,
callback=callback,
chunksize=chunksize,
max_concurrency=max_concurrency,
)
try:
out = await self._upload_file_part_concurrent(
bucket,
key,
mpu,
f0,
callback=callback,
chunksize=chunksize,
max_concurrency=max_concurrency,
)
parts = [
{"PartNumber": i + 1, "ETag": o["ETag"]}
for i, o in enumerate(out)
]
await self._call_s3(
"complete_multipart_upload",
Bucket=bucket,
Key=key,
UploadId=mpu["UploadId"],
MultipartUpload={"Parts": parts},
**match,
)
except Exception:
await self._abort_mpu(bucket, key, mpu["UploadId"])
raise
parts = [
{"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out)
]
await self._call_s3(
"complete_multipart_upload",
Bucket=bucket,
Key=key,
UploadId=mpu["UploadId"],
MultipartUpload={"Parts": parts},
**match,
)
except Exception:
await self._abort_mpu(bucket, key, mpu["UploadId"])
raise

while rpath:
self.invalidate_cache(rpath)
rpath = self._parent(rpath)
Expand All @@ -1304,45 +1312,51 @@ async def _upload_file_part_concurrent(
bucket,
key,
mpu,
f0,
path,
filesize,
callback=_DEFAULT_CALLBACK,
chunksize=50 * 2**20,
chunksize=None,
max_concurrency=None,
):
max_concurrency = max_concurrency or self.max_concurrency
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")

async def _upload_chunk(chunk, part_number):
result = await self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part_number,
UploadId=mpu["UploadId"],
Body=chunk,
Key=key,
)
callback.relative_update(len(chunk))
return result

out = []
while True:
chunks = []
for i in range(max_concurrency):
chunk = f0.read(chunksize)
if chunk:
chunks.append(chunk)
if not chunks:
break
out.extend(
await asyncio.gather(
*[
_upload_chunk(chunk, len(out) + i)
for i, chunk in enumerate(chunks, 1)
]
default_chunksize = 50 * 2**20 # 50 MiB
chunksize = max(chunksize or default_chunksize, MIN_CHUNK_SIZE)
required_chunks = math.ceil(filesize / chunksize)
# adjust chunksize to fit within the MAX_UPLOAD_PARTS limit
if required_chunks > MAX_UPLOAD_PARTS:
chunksize = math.ceil(filesize / MAX_UPLOAD_PARTS)

num_parts = math.ceil(filesize / chunksize)
logger.debug(
"uploading %d parts with a chunksize of %d and a concurrency of %d",
num_parts,
chunksize,
max_concurrency,
)

async def _upload_part(part_number):
with open(path, mode="rb") as f:
start = chunksize * (part_number - 1)
f.seek(start)
end = min(start + chunksize, filesize)
size = end - start
file_chunk = FileChunk(f, start, size, f.name)
result = await self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part_number,
UploadId=mpu["UploadId"],
Body=file_chunk,
Key=key,
)
)
return out
callback.relative_update(size)
return result

coros = [_upload_part(part_number) for part_number in range(1, num_parts + 1)]
return await _run_coros_in_chunks(coros, batch_size=max_concurrency)

async def _get_file(
self, rpath, lpath, callback=_DEFAULT_CALLBACK, version_id=None, **kwargs
Expand Down Expand Up @@ -2575,3 +2589,53 @@ async def _call_and_read():
resp["Body"].close()

return await _error_wrapper(_call_and_read, retries=fs.retries)


class FileChunk(io.RawIOBase):
def __init__(self, fileobj: IO[bytes], offset: int, size: int, name: str):
self.fileobj = fileobj
self.offset = offset
self.size = size
self.position = 0
self.name = name

def readable(self):
return True

def writable(self):
return False

def seekable(self):
return self.fileobj.seekable()

def tell(self):
return self.position

def seek(self, position, whence=io.SEEK_SET):
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
self.fileobj.seek(self.offset + self.position)
return self.position

def readinto(self, b):
max_size = self.size - self.position
if max_size <= 0:
return 0

if len(b) > max_size:
b = memoryview(b)[:max_size]
res = self.fileobj.readinto(b)
if res != len(b):
raise RuntimeError("unexpected end of data")
self.position += res
assert self.fileobj.tell() == (self.offset + self.position)
return res