Skip to content

Commit 8152054

Browse files
committed
feat(storage:s3): multi-part upload: upload parts concurrently
1 parent 96d16ef commit 8152054

File tree

3 files changed

+194
-100
lines changed

3 files changed

+194
-100
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
2323
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
2424
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
25+
<suppress checks="ClassDataAbstractionCoupling" files="S3MultiPartOutputStream.java"/>
2526
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
2627
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
2728
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>

storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java

Lines changed: 83 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
import java.io.ByteArrayInputStream;
2020
import java.io.IOException;
21-
import java.io.InputStream;
2221
import java.io.OutputStream;
2322
import java.nio.ByteBuffer;
2423
import java.util.ArrayList;
2524
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ConcurrentLinkedQueue;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.atomic.AtomicInteger;
2629

2730
import com.amazonaws.services.s3.AmazonS3;
2831
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
@@ -31,14 +34,17 @@
3134
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
3235
import com.amazonaws.services.s3.model.PartETag;
3336
import com.amazonaws.services.s3.model.UploadPartRequest;
34-
import com.amazonaws.services.s3.model.UploadPartResult;
3537
import org.slf4j.Logger;
3638
import org.slf4j.LoggerFactory;
3739

3840
/**
3941
* S3 multipart output stream.
4042
* Enable uploads to S3 with unknown size by feeding input bytes to multiple parts and upload them on close.
4143
*
44+
* <p>OutputStream is used to write sequentially, but
45+
* uploading parts happen asynchronously to reduce full upload latency.
46+
* Concurrency happens within the output stream implementation and does not require changes on the callers.
47+
*
4248
* <p>Requires S3 client and starts a multipart transaction when instantiated. Do not reuse.
4349
*
4450
* <p>{@link S3MultiPartOutputStream} is not thread-safe.
@@ -54,8 +60,11 @@ public class S3MultiPartOutputStream extends OutputStream {
5460
final int partSize;
5561

5662
private final String uploadId;
57-
private final List<PartETag> partETags = new ArrayList<>();
63+
private final AtomicInteger partNumber = new AtomicInteger(0);
5864

65+
// holds async part upload operations building a list of partETags required when committing
66+
private CompletableFuture<ConcurrentLinkedQueue<PartETag>> partUploads =
67+
CompletableFuture.completedFuture(new ConcurrentLinkedQueue<>());
5968
private boolean closed;
6069

6170
public S3MultiPartOutputStream(final String bucketName,
@@ -87,15 +96,23 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
8796
return;
8897
}
8998
try {
90-
final ByteBuffer source = ByteBuffer.wrap(b, off, len);
91-
while (source.hasRemaining()) {
92-
final int transferred = Math.min(partBuffer.remaining(), source.remaining());
93-
final int offset = source.arrayOffset() + source.position();
94-
// TODO: get rid of this array copying
95-
partBuffer.put(source.array(), offset, transferred);
96-
source.position(source.position() + transferred);
99+
final ByteBuffer currentBatch = ByteBuffer.wrap(b, off, len);
100+
while (currentBatch.hasRemaining()) {
101+
// copy batch to part buffer
102+
final int toCopy = Math.min(partBuffer.remaining(), currentBatch.remaining());
103+
final int positionAfterCopying = currentBatch.position() + toCopy;
104+
currentBatch.limit(positionAfterCopying);
105+
partBuffer.put(currentBatch.slice());
106+
107+
// prepare current batch for next part
108+
currentBatch.clear(); // reset limit
109+
currentBatch.position(positionAfterCopying);
110+
97111
if (!partBuffer.hasRemaining()) {
98-
flushBuffer(0, partSize);
112+
partBuffer.position(0);
113+
partBuffer.limit(partSize);
114+
uploadPart(partBuffer.slice(), partSize);
115+
partBuffer.clear();
99116
}
100117
}
101118
} catch (final RuntimeException e) {
@@ -105,26 +122,48 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
105122
}
106123
}
107124

125+
/**
126+
* Completes pending part uploads
127+
*
128+
* @throws IOException if uploads fail and abort transaction
129+
*/
108130
@Override
109-
public void close() throws IOException {
110-
if (!isClosed()) {
131+
public void flush() throws IOException {
132+
try {
111133
if (partBuffer.position() > 0) {
112-
try {
113-
flushBuffer(partBuffer.arrayOffset(), partBuffer.position());
114-
} catch (final RuntimeException e) {
115-
log.error("Failed to upload last part {}, aborting transaction", uploadId, e);
116-
abortUpload();
117-
throw new IOException(e);
118-
}
134+
// flush missing bytes
135+
final int actualPartSize = partBuffer.position();
136+
partBuffer.position(0);
137+
partBuffer.limit(actualPartSize);
138+
uploadPart(partBuffer.slice(), actualPartSize);
139+
partBuffer.clear();
119140
}
120-
if (!partETags.isEmpty()) {
141+
142+
// wait for requests to be processed
143+
partUploads.join();
144+
} catch (final RuntimeException e) {
145+
log.error("Failed to upload parts {}, aborting transaction", uploadId, e);
146+
abortUpload();
147+
throw new IOException("Failed to flush upload part operations", e);
148+
}
149+
}
150+
151+
@Override
152+
public void close() throws IOException {
153+
if (!isClosed()) {
154+
flush();
155+
if (partNumber.get() > 0) {
121156
try {
122-
completeUpload();
157+
// wait for all uploads to complete successfully before committing
158+
final ConcurrentLinkedQueue<PartETag> tagsQueue = partUploads.get(); // TODO: maybe set a timeout?
159+
final ArrayList<PartETag> partETags = new ArrayList<>(tagsQueue);
160+
161+
completeUpload(partETags);
123162
log.debug("Completed multipart upload {}", uploadId);
124-
} catch (final RuntimeException e) {
163+
} catch (final RuntimeException | InterruptedException | ExecutionException e) {
125164
log.error("Failed to complete multipart upload {}, aborting transaction", uploadId, e);
126165
abortUpload();
127-
throw new IOException(e);
166+
throw new IOException("Failed to complete upload transaction", e);
128167
}
129168
} else {
130169
abortUpload();
@@ -136,7 +175,7 @@ public boolean isClosed() {
136175
return closed;
137176
}
138177

139-
private void completeUpload() {
178+
private void completeUpload(final List<PartETag> partETags) {
140179
final var request = new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags);
141180
client.completeMultipartUpload(request);
142181
closed = true;
@@ -148,24 +187,24 @@ private void abortUpload() {
148187
closed = true;
149188
}
150189

151-
private void flushBuffer(final int offset,
152-
final int actualPartSize) {
153-
final ByteArrayInputStream in = new ByteArrayInputStream(partBuffer.array(), offset, actualPartSize);
154-
uploadPart(in, actualPartSize);
155-
partBuffer.clear();
156-
}
157-
158-
private void uploadPart(final InputStream in, final int actualPartSize) {
159-
final int partNumber = partETags.size() + 1;
160-
final UploadPartRequest uploadPartRequest =
161-
new UploadPartRequest()
162-
.withBucketName(bucketName)
163-
.withKey(key)
164-
.withUploadId(uploadId)
165-
.withPartSize(actualPartSize)
166-
.withPartNumber(partNumber)
167-
.withInputStream(in);
168-
final UploadPartResult uploadResult = client.uploadPart(uploadPartRequest);
169-
partETags.add(uploadResult.getPartETag());
190+
private void uploadPart(final ByteBuffer partBuffer, final int actualPartSize) {
191+
final byte[] partContent = new byte[actualPartSize];
192+
partBuffer.get(partContent, 0, actualPartSize);
193+
194+
final var uploadPartRequest = new UploadPartRequest()
195+
.withBucketName(bucketName)
196+
.withKey(key)
197+
.withUploadId(uploadId)
198+
.withPartSize(actualPartSize)
199+
.withPartNumber(partNumber.incrementAndGet())
200+
.withInputStream(new ByteArrayInputStream(partContent));
201+
202+
// Run request async
203+
partUploads = partUploads.thenCombine(
204+
CompletableFuture.supplyAsync(() -> client.uploadPart(uploadPartRequest)),
205+
(partETags, result) -> {
206+
partETags.add(result.getPartETag());
207+
return partETags;
208+
});
170209
}
171210
}

0 commit comments

Comments
 (0)