Skip to content

Commit 708cc14

Browse files
committed
feat(storage:s3): multi-part upload: upload parts concurrently
1 parent 96d16ef commit 708cc14

File tree

3 files changed

+175
-97
lines changed

3 files changed

+175
-97
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
2323
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
2424
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
25+
<suppress checks="ClassDataAbstractionCoupling" files="S3MultiPartOutputStream.java"/>
2526
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
2627
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
2728
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>

storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java

Lines changed: 67 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
import java.io.ByteArrayInputStream;
2020
import java.io.IOException;
21-
import java.io.InputStream;
2221
import java.io.OutputStream;
2322
import java.nio.ByteBuffer;
2423
import java.util.ArrayList;
2524
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ConcurrentLinkedQueue;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.atomic.AtomicInteger;
2629

2730
import com.amazonaws.services.s3.AmazonS3;
2831
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
@@ -31,14 +34,17 @@
3134
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
3235
import com.amazonaws.services.s3.model.PartETag;
3336
import com.amazonaws.services.s3.model.UploadPartRequest;
34-
import com.amazonaws.services.s3.model.UploadPartResult;
3537
import org.slf4j.Logger;
3638
import org.slf4j.LoggerFactory;
3739

3840
/**
3941
* S3 multipart output stream.
4042
* Enable uploads to S3 with unknown size by feeding input bytes to multiple parts and upload them on close.
4143
*
44+
* <p>OutputStream is used to write sequentially, but
45+
* uploading parts happen asynchronously to reduce full upload latency.
46+
* Concurrency happens within the output stream implementation and does not require changes on the callers.
47+
*
4248
* <p>Requires S3 client and starts a multipart transaction when instantiated. Do not reuse.
4349
*
4450
* <p>{@link S3MultiPartOutputStream} is not thread-safe.
@@ -54,8 +60,11 @@ public class S3MultiPartOutputStream extends OutputStream {
5460
final int partSize;
5561

5662
private final String uploadId;
57-
private final List<PartETag> partETags = new ArrayList<>();
63+
private final AtomicInteger partNumber = new AtomicInteger(0);
5864

65+
// holds async part upload operations building a list of partETags required when committing
66+
private CompletableFuture<ConcurrentLinkedQueue<PartETag>> partUploads =
67+
CompletableFuture.completedFuture(new ConcurrentLinkedQueue<>());
5968
private boolean closed;
6069

6170
public S3MultiPartOutputStream(final String bucketName,
@@ -89,13 +98,17 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
8998
try {
9099
final ByteBuffer source = ByteBuffer.wrap(b, off, len);
91100
while (source.hasRemaining()) {
92-
final int transferred = Math.min(partBuffer.remaining(), source.remaining());
93-
final int offset = source.arrayOffset() + source.position();
94-
// TODO: get rid of this array copying
95-
partBuffer.put(source.array(), offset, transferred);
96-
source.position(source.position() + transferred);
101+
final int toCopy = Math.min(partBuffer.remaining(), source.remaining());
102+
final int positionAfterCopying = source.position() + toCopy;
103+
source.limit(positionAfterCopying);
104+
partBuffer.put(source.slice());
105+
source.clear(); // reset limit
106+
source.position(positionAfterCopying);
97107
if (!partBuffer.hasRemaining()) {
98-
flushBuffer(0, partSize);
108+
partBuffer.position(0);
109+
partBuffer.limit(partSize);
110+
partUpload(partBuffer.slice(), partSize);
111+
partBuffer.clear();
99112
}
100113
}
101114
} catch (final RuntimeException e) {
@@ -106,22 +119,35 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
106119
}
107120

108121
@Override
109-
public void close() throws IOException {
110-
if (!isClosed()) {
122+
public void flush() throws IOException {
123+
try {
111124
if (partBuffer.position() > 0) {
112-
try {
113-
flushBuffer(partBuffer.arrayOffset(), partBuffer.position());
114-
} catch (final RuntimeException e) {
115-
log.error("Failed to upload last part {}, aborting transaction", uploadId, e);
116-
abortUpload();
117-
throw new IOException(e);
118-
}
125+
final int actualPartSize = partBuffer.position();
126+
partBuffer.position(0);
127+
partBuffer.limit(actualPartSize);
128+
partUpload(partBuffer.slice(), actualPartSize);
129+
partBuffer.clear();
119130
}
120-
if (!partETags.isEmpty()) {
131+
partUploads.join();
132+
} catch (final RuntimeException e) {
133+
log.error("Failed to upload parts {}, aborting transaction", uploadId, e);
134+
abortUpload();
135+
throw new IOException("Failed to flush operations", e);
136+
}
137+
}
138+
139+
@Override
140+
public void close() throws IOException {
141+
if (!isClosed()) {
142+
flush();
143+
if (partNumber.get() > 0) {
121144
try {
122-
completeUpload();
145+
// wait for all uploads to complete successfully before committing
146+
final ConcurrentLinkedQueue<PartETag> tagsQueue = partUploads.get(); // TODO: maybe set a timeout?
147+
final ArrayList<PartETag> partETags = new ArrayList<>(tagsQueue);
148+
completeUpload(partETags);
123149
log.debug("Completed multipart upload {}", uploadId);
124-
} catch (final RuntimeException e) {
150+
} catch (final RuntimeException | InterruptedException | ExecutionException e) {
125151
log.error("Failed to complete multipart upload {}, aborting transaction", uploadId, e);
126152
abortUpload();
127153
throw new IOException(e);
@@ -136,7 +162,7 @@ public boolean isClosed() {
136162
return closed;
137163
}
138164

139-
private void completeUpload() {
165+
private void completeUpload(final List<PartETag> partETags) {
140166
final var request = new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags);
141167
client.completeMultipartUpload(request);
142168
closed = true;
@@ -148,24 +174,24 @@ private void abortUpload() {
148174
closed = true;
149175
}
150176

151-
private void flushBuffer(final int offset,
152-
final int actualPartSize) {
153-
final ByteArrayInputStream in = new ByteArrayInputStream(partBuffer.array(), offset, actualPartSize);
154-
uploadPart(in, actualPartSize);
155-
partBuffer.clear();
156-
}
157-
158-
private void uploadPart(final InputStream in, final int actualPartSize) {
159-
final int partNumber = partETags.size() + 1;
160-
final UploadPartRequest uploadPartRequest =
161-
new UploadPartRequest()
162-
.withBucketName(bucketName)
163-
.withKey(key)
164-
.withUploadId(uploadId)
165-
.withPartSize(actualPartSize)
166-
.withPartNumber(partNumber)
167-
.withInputStream(in);
168-
final UploadPartResult uploadResult = client.uploadPart(uploadPartRequest);
169-
partETags.add(uploadResult.getPartETag());
177+
private void partUpload(final ByteBuffer partBuffer, final int actualPartSize) {
178+
final byte[] partContent = new byte[actualPartSize];
179+
partBuffer.get(partContent, 0, actualPartSize);
180+
181+
final var uploadPartRequest = new UploadPartRequest()
182+
.withBucketName(bucketName)
183+
.withKey(key)
184+
.withUploadId(uploadId)
185+
.withPartSize(actualPartSize)
186+
.withPartNumber(partNumber.incrementAndGet())
187+
.withInputStream(new ByteArrayInputStream(partContent));
188+
189+
// Run request async
190+
partUploads = partUploads.thenCombine(
191+
CompletableFuture.supplyAsync(() -> client.uploadPart(uploadPartRequest)),
192+
(partETags, result) -> {
193+
partETags.add(result.getPartETag());
194+
return partETags;
195+
});
170196
}
171197
}

0 commit comments

Comments
 (0)