Skip to content

Commit 004908c

Browse files
committed
feat(storage:s3): multi-part upload: upload parts concurrently
1 parent 96d16ef commit 004908c

File tree

3 files changed

+188
-100
lines changed

3 files changed

+188
-100
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
2323
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
2424
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
25+
<suppress checks="ClassDataAbstractionCoupling" files="S3MultiPartOutputStream.java"/>
2526
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
2627
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
2728
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>

storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java

Lines changed: 77 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
import java.io.ByteArrayInputStream;
2020
import java.io.IOException;
21-
import java.io.InputStream;
2221
import java.io.OutputStream;
2322
import java.nio.ByteBuffer;
2423
import java.util.ArrayList;
2524
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ConcurrentLinkedQueue;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.atomic.AtomicInteger;
2629

2730
import com.amazonaws.services.s3.AmazonS3;
2831
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
@@ -31,14 +34,17 @@
3134
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
3235
import com.amazonaws.services.s3.model.PartETag;
3336
import com.amazonaws.services.s3.model.UploadPartRequest;
34-
import com.amazonaws.services.s3.model.UploadPartResult;
3537
import org.slf4j.Logger;
3638
import org.slf4j.LoggerFactory;
3739

3840
/**
3941
* S3 multipart output stream.
4042
* Enable uploads to S3 with unknown size by feeding input bytes to multiple parts and upload them on close.
4143
*
44+
* <p>OutputStream is used to write sequentially, but
45+
* uploading parts happen asynchronously to reduce full upload latency.
46+
* Concurrency happens within the output stream implementation and does not require changes on the callers.
47+
*
4248
* <p>Requires S3 client and starts a multipart transaction when instantiated. Do not reuse.
4349
*
4450
* <p>{@link S3MultiPartOutputStream} is not thread-safe.
@@ -54,8 +60,11 @@ public class S3MultiPartOutputStream extends OutputStream {
5460
final int partSize;
5561

5662
private final String uploadId;
57-
private final List<PartETag> partETags = new ArrayList<>();
63+
private final AtomicInteger partNumber = new AtomicInteger(0);
5864

65+
// holds async part upload operations building a list of partETags required when committing
66+
private CompletableFuture<ConcurrentLinkedQueue<PartETag>> partUploads =
67+
CompletableFuture.completedFuture(new ConcurrentLinkedQueue<>());
5968
private boolean closed;
6069

6170
public S3MultiPartOutputStream(final String bucketName,
@@ -87,15 +96,22 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
8796
return;
8897
}
8998
try {
90-
final ByteBuffer source = ByteBuffer.wrap(b, off, len);
91-
while (source.hasRemaining()) {
92-
final int transferred = Math.min(partBuffer.remaining(), source.remaining());
93-
final int offset = source.arrayOffset() + source.position();
94-
// TODO: get rid of this array copying
95-
partBuffer.put(source.array(), offset, transferred);
96-
source.position(source.position() + transferred);
99+
final ByteBuffer currentBatch = ByteBuffer.wrap(b, off, len);
100+
while (currentBatch.hasRemaining()) {
101+
// copy batch to part buffer
102+
final int toCopy = Math.min(partBuffer.remaining(), currentBatch.remaining());
103+
final int positionAfterCopying = currentBatch.position() + toCopy;
104+
currentBatch.limit(positionAfterCopying);
105+
partBuffer.put(currentBatch.slice());
106+
107+
// prepare current batch for next part
108+
currentBatch.clear(); // reset limit
109+
currentBatch.position(positionAfterCopying);
97110
if (!partBuffer.hasRemaining()) {
98-
flushBuffer(0, partSize);
111+
partBuffer.position(0);
112+
partBuffer.limit(partSize);
113+
uploadPart(partBuffer.slice(), partSize);
114+
partBuffer.clear();
99115
}
100116
}
101117
} catch (final RuntimeException e) {
@@ -105,26 +121,43 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
105121
}
106122
}
107123

124+
/**
125+
* Completes pending part uploads
126+
* @throws IOException if uploads fail and abort transaction
127+
*/
108128
@Override
109-
public void close() throws IOException {
110-
if (!isClosed()) {
129+
public void flush() throws IOException {
130+
try {
111131
if (partBuffer.position() > 0) {
112-
try {
113-
flushBuffer(partBuffer.arrayOffset(), partBuffer.position());
114-
} catch (final RuntimeException e) {
115-
log.error("Failed to upload last part {}, aborting transaction", uploadId, e);
116-
abortUpload();
117-
throw new IOException(e);
118-
}
132+
final int actualPartSize = partBuffer.position();
133+
partBuffer.position(0);
134+
partBuffer.limit(actualPartSize);
135+
uploadPart(partBuffer.slice(), actualPartSize);
136+
partBuffer.clear();
119137
}
120-
if (!partETags.isEmpty()) {
138+
partUploads.join();
139+
} catch (final RuntimeException e) {
140+
log.error("Failed to upload parts {}, aborting transaction", uploadId, e);
141+
abortUpload();
142+
throw new IOException("Failed to flush upload part operations", e);
143+
}
144+
}
145+
146+
@Override
147+
public void close() throws IOException {
148+
if (!isClosed()) {
149+
flush();
150+
if (partNumber.get() > 0) {
121151
try {
122-
completeUpload();
152+
// wait for all uploads to complete successfully before committing
153+
final ConcurrentLinkedQueue<PartETag> tagsQueue = partUploads.get(); // TODO: maybe set a timeout?
154+
final ArrayList<PartETag> partETags = new ArrayList<>(tagsQueue);
155+
completeUpload(partETags);
123156
log.debug("Completed multipart upload {}", uploadId);
124-
} catch (final RuntimeException e) {
157+
} catch (final RuntimeException | InterruptedException | ExecutionException e) {
125158
log.error("Failed to complete multipart upload {}, aborting transaction", uploadId, e);
126159
abortUpload();
127-
throw new IOException(e);
160+
throw new IOException("Failed to complete upload transaction", e);
128161
}
129162
} else {
130163
abortUpload();
@@ -136,7 +169,7 @@ public boolean isClosed() {
136169
return closed;
137170
}
138171

139-
private void completeUpload() {
172+
private void completeUpload(final List<PartETag> partETags) {
140173
final var request = new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags);
141174
client.completeMultipartUpload(request);
142175
closed = true;
@@ -148,24 +181,24 @@ private void abortUpload() {
148181
closed = true;
149182
}
150183

151-
private void flushBuffer(final int offset,
152-
final int actualPartSize) {
153-
final ByteArrayInputStream in = new ByteArrayInputStream(partBuffer.array(), offset, actualPartSize);
154-
uploadPart(in, actualPartSize);
155-
partBuffer.clear();
156-
}
157-
158-
private void uploadPart(final InputStream in, final int actualPartSize) {
159-
final int partNumber = partETags.size() + 1;
160-
final UploadPartRequest uploadPartRequest =
161-
new UploadPartRequest()
162-
.withBucketName(bucketName)
163-
.withKey(key)
164-
.withUploadId(uploadId)
165-
.withPartSize(actualPartSize)
166-
.withPartNumber(partNumber)
167-
.withInputStream(in);
168-
final UploadPartResult uploadResult = client.uploadPart(uploadPartRequest);
169-
partETags.add(uploadResult.getPartETag());
184+
private void uploadPart(final ByteBuffer partBuffer, final int actualPartSize) {
185+
final byte[] partContent = new byte[actualPartSize];
186+
partBuffer.get(partContent, 0, actualPartSize);
187+
188+
final var uploadPartRequest = new UploadPartRequest()
189+
.withBucketName(bucketName)
190+
.withKey(key)
191+
.withUploadId(uploadId)
192+
.withPartSize(actualPartSize)
193+
.withPartNumber(partNumber.incrementAndGet())
194+
.withInputStream(new ByteArrayInputStream(partContent));
195+
196+
// Run request async
197+
partUploads = partUploads.thenCombine(
198+
CompletableFuture.supplyAsync(() -> client.uploadPart(uploadPartRequest)),
199+
(partETags, result) -> {
200+
partETags.add(result.getPartETag());
201+
return partETags;
202+
});
170203
}
171204
}

0 commit comments

Comments
 (0)