Skip to content

Commit 96d16ef

Browse files
authored
Merge pull request #282 from aiven/jeqo/s3-abort
feat(storage:s3): abort on stream exception handling
2 parents 3aee1cb + a451b40 commit 96d16ef

File tree

2 files changed

+152
-57
lines changed

2 files changed

+152
-57
lines changed

storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323
import java.nio.ByteBuffer;
2424
import java.util.ArrayList;
2525
import java.util.List;
26-
import java.util.Objects;
2726

2827
import com.amazonaws.services.s3.AmazonS3;
2928
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
3029
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
31-
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
3230
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
3331
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
3432
import com.amazonaws.services.s3.model.PartETag;
@@ -42,6 +40,8 @@
4240
* Enable uploads to S3 with unknown size by feeding input bytes to multiple parts and upload them on close.
4341
*
4442
* <p>Requires S3 client and starts a multipart transaction when instantiated. Do not reuse.
43+
*
44+
* <p>{@link S3MultiPartOutputStream} is not thread-safe.
4545
*/
4646
public class S3MultiPartOutputStream extends OutputStream {
4747

@@ -80,60 +80,79 @@ public void write(final int b) throws IOException {
8080

8181
@Override
8282
public void write(final byte[] b, final int off, final int len) throws IOException {
83-
if (closed) {
83+
if (isClosed()) {
8484
throw new IllegalStateException("Already closed");
8585
}
8686
if (b.length == 0) {
8787
return;
8888
}
89-
final ByteBuffer source = ByteBuffer.wrap(b, off, len);
90-
while (source.hasRemaining()) {
91-
final int transferred = Math.min(partBuffer.remaining(), source.remaining());
92-
final int offset = source.arrayOffset() + source.position();
93-
// TODO: get rid of this array copying
94-
partBuffer.put(source.array(), offset, transferred);
95-
source.position(source.position() + transferred);
96-
if (!partBuffer.hasRemaining()) {
97-
flushBuffer(0, partSize);
89+
try {
90+
final ByteBuffer source = ByteBuffer.wrap(b, off, len);
91+
while (source.hasRemaining()) {
92+
final int transferred = Math.min(partBuffer.remaining(), source.remaining());
93+
final int offset = source.arrayOffset() + source.position();
94+
// TODO: get rid of this array copying
95+
partBuffer.put(source.array(), offset, transferred);
96+
source.position(source.position() + transferred);
97+
if (!partBuffer.hasRemaining()) {
98+
flushBuffer(0, partSize);
99+
}
98100
}
101+
} catch (final RuntimeException e) {
102+
log.error("Failed to write to stream on upload {}, aborting transaction", uploadId, e);
103+
abortUpload();
104+
throw new IOException(e);
99105
}
100106
}
101107

102108
@Override
103109
public void close() throws IOException {
104-
if (partBuffer.position() > 0) {
105-
flushBuffer(partBuffer.arrayOffset(), partBuffer.position());
106-
}
107-
if (Objects.nonNull(uploadId)) {
110+
if (!isClosed()) {
111+
if (partBuffer.position() > 0) {
112+
try {
113+
flushBuffer(partBuffer.arrayOffset(), partBuffer.position());
114+
} catch (final RuntimeException e) {
115+
log.error("Failed to upload last part {}, aborting transaction", uploadId, e);
116+
abortUpload();
117+
throw new IOException(e);
118+
}
119+
}
108120
if (!partETags.isEmpty()) {
109121
try {
110-
final CompleteMultipartUploadRequest request =
111-
new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags);
112-
final CompleteMultipartUploadResult result = client.completeMultipartUpload(request);
113-
log.debug("Completed multipart upload {} with result {}", uploadId, result);
114-
} catch (final Exception e) {
122+
completeUpload();
123+
log.debug("Completed multipart upload {}", uploadId);
124+
} catch (final RuntimeException e) {
115125
log.error("Failed to complete multipart upload {}, aborting transaction", uploadId, e);
116-
client.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, uploadId));
126+
abortUpload();
127+
throw new IOException(e);
117128
}
118129
} else {
119-
client.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, uploadId));
130+
abortUpload();
120131
}
121132
}
133+
}
134+
135+
public boolean isClosed() {
136+
return closed;
137+
}
138+
139+
private void completeUpload() {
140+
final var request = new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags);
141+
client.completeMultipartUpload(request);
142+
closed = true;
143+
}
144+
145+
private void abortUpload() {
146+
final var request = new AbortMultipartUploadRequest(bucketName, key, uploadId);
147+
client.abortMultipartUpload(request);
122148
closed = true;
123149
}
124150

125151
private void flushBuffer(final int offset,
126-
final int actualPartSize) throws IOException {
127-
try {
128-
final ByteArrayInputStream in = new ByteArrayInputStream(partBuffer.array(), offset, actualPartSize);
129-
uploadPart(in, actualPartSize);
130-
partBuffer.clear();
131-
} catch (final Exception e) {
132-
log.error("Failed to upload part in multipart upload {}, aborting transaction", uploadId, e);
133-
client.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, uploadId));
134-
closed = true;
135-
throw new IOException(e);
136-
}
152+
final int actualPartSize) {
153+
final ByteArrayInputStream in = new ByteArrayInputStream(partBuffer.array(), offset, actualPartSize);
154+
uploadPart(in, actualPartSize);
155+
partBuffer.clear();
137156
}
138157

139158
private void uploadPart(final InputStream in, final int actualPartSize) {

storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java

Lines changed: 99 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.mockito.junit.jupiter.MockitoExtension;
4040

4141
import static org.assertj.core.api.Assertions.assertThat;
42+
import static org.assertj.core.api.Assertions.assertThatCode;
4243
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4344
import static org.mockito.ArgumentMatchers.any;
4445
import static org.mockito.Mockito.never;
@@ -76,29 +77,31 @@ void sendAbortForAnyExceptionWhileWriting() {
7677
when(mockedS3.uploadPart(any()))
7778
.thenThrow(testException);
7879

79-
assertThatThrownBy(() -> {
80-
try (final S3MultiPartOutputStream out =
81-
new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3)) {
82-
out.write(new byte[] {1, 2, 3});
83-
}
84-
}).isInstanceOf(IOException.class).hasCause(testException);
80+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 1, mockedS3);
81+
assertThatThrownBy(() -> out.write(new byte[] {1, 2, 3}))
82+
.isInstanceOf(IOException.class)
83+
.hasRootCause(testException);
84+
85+
assertThat(out.isClosed()).isTrue();
86+
// retry close to validate no exception is thrown and number of calls to complete/upload does not change
87+
assertThatCode(out::close).doesNotThrowAnyException();
8588

8689
verify(mockedS3).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
8790
verify(mockedS3).uploadPart(any(UploadPartRequest.class));
91+
verify(mockedS3, never()).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
8892
verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());
8993

9094
assertAbortMultipartUploadRequest(abortMultipartUploadRequestCaptor.getValue());
9195
}
9296

9397
@Test
94-
void sendAbortForAnyExceptionWhenClose() throws Exception {
98+
void sendAbortForAnyExceptionWhenClosingUpload() throws Exception {
9599
when(mockedS3.initiateMultipartUpload(any()))
96100
.thenReturn(newInitiateMultipartUploadResult());
97-
98101
when(mockedS3.uploadPart(any()))
99102
.thenThrow(RuntimeException.class);
100103

101-
final S3MultiPartOutputStream out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3);
104+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3);
102105

103106
final byte[] buffer = new byte[5];
104107
random.nextBytes(buffer);
@@ -109,12 +112,45 @@ void sendAbortForAnyExceptionWhenClose() throws Exception {
109112
.rootCause()
110113
.isInstanceOf(RuntimeException.class);
111114

115+
assertThat(out.isClosed()).isTrue();
116+
assertThatCode(out::close).doesNotThrowAnyException();
117+
112118
verify(mockedS3, never()).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
113119
verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());
114120

115121
assertAbortMultipartUploadRequest(abortMultipartUploadRequestCaptor.getValue());
116122
}
117123

124+
@Test
125+
void sendAbortForAnyExceptionWhenClosingComplete() throws Exception {
126+
when(mockedS3.initiateMultipartUpload(any()))
127+
.thenReturn(newInitiateMultipartUploadResult());
128+
when(mockedS3.uploadPart(any()))
129+
.thenReturn(newUploadPartResult(1, "SOME_ETAG#1"));
130+
when(mockedS3.completeMultipartUpload(any()))
131+
.thenThrow(RuntimeException.class);
132+
133+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3);
134+
135+
final byte[] buffer = new byte[5];
136+
random.nextBytes(buffer);
137+
out.write(buffer, 0, buffer.length);
138+
139+
assertThatThrownBy(out::close)
140+
.isInstanceOf(IOException.class)
141+
.rootCause()
142+
.isInstanceOf(RuntimeException.class);
143+
144+
assertThat(out.isClosed()).isTrue();
145+
assertThatCode(out::close).doesNotThrowAnyException();
146+
147+
verify(mockedS3).uploadPart(any(UploadPartRequest.class));
148+
verify(mockedS3).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
149+
verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());
150+
151+
assertAbortMultipartUploadRequest(abortMultipartUploadRequestCaptor.getValue());
152+
}
153+
118154
@Test
119155
void writesOneByte() throws Exception {
120156
when(mockedS3.initiateMultipartUpload(any()))
@@ -124,9 +160,12 @@ void writesOneByte() throws Exception {
124160
when(mockedS3.completeMultipartUpload(any()))
125161
.thenReturn(new CompleteMultipartUploadResult());
126162

127-
try (final S3MultiPartOutputStream out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3)) {
128-
out.write(1);
129-
}
163+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3);
164+
out.write(1);
165+
out.close();
166+
167+
assertThat(out.isClosed()).isTrue();
168+
assertThatCode(out::close).doesNotThrowAnyException();
130169

131170
verify(mockedS3).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
132171
verify(mockedS3).uploadPart(uploadPartRequestCaptor.capture());
@@ -136,7 +175,8 @@ void writesOneByte() throws Exception {
136175
uploadPartRequestCaptor.getValue(),
137176
1,
138177
1,
139-
new byte[] {1});
178+
new byte[] {1}
179+
);
140180
assertCompleteMultipartUploadRequest(
141181
completeMultipartUploadRequestCaptor.getValue(),
142182
List.of(new PartETag(1, "SOME_ETAG"))
@@ -159,14 +199,16 @@ void writesMultipleMessages() throws Exception {
159199
.thenReturn(new CompleteMultipartUploadResult());
160200

161201
final List<byte[]> expectedMessagesList = new ArrayList<>();
162-
try (final S3MultiPartOutputStream out =
163-
new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3)) {
164-
for (int i = 0; i < 3; i++) {
165-
random.nextBytes(message);
166-
out.write(message, 0, message.length);
167-
expectedMessagesList.add(message);
168-
}
202+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3);
203+
for (int i = 0; i < 3; i++) {
204+
random.nextBytes(message);
205+
out.write(message, 0, message.length);
206+
expectedMessagesList.add(message);
169207
}
208+
out.close();
209+
210+
assertThat(out.isClosed()).isTrue();
211+
assertThatCode(out::close).doesNotThrowAnyException();
170212

171213
verify(mockedS3).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
172214
verify(mockedS3, times(3)).uploadPart(uploadPartRequestCaptor.capture());
@@ -215,8 +257,7 @@ void writesTailMessages() throws Exception {
215257
final byte[] expectedFullMessage = new byte[messageSize + 10];
216258
final byte[] expectedTailMessage = new byte[10];
217259

218-
final S3MultiPartOutputStream
219-
out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3);
260+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3);
220261
random.nextBytes(message);
221262
out.write(message);
222263
System.arraycopy(message, 0, expectedFullMessage, 0, message.length);
@@ -226,16 +267,51 @@ void writesTailMessages() throws Exception {
226267
System.arraycopy(message, 10, expectedTailMessage, 0, 10);
227268
out.close();
228269

270+
assertThat(out.isClosed()).isTrue();
271+
assertThatCode(out::close).doesNotThrowAnyException();
272+
229273
assertUploadPartRequest(uploadPartRequests.get(0), 30, 1, expectedFullMessage);
230274
assertUploadPartRequest(uploadPartRequests.get(1), 10, 2, expectedTailMessage);
231275

232276
verify(mockedS3).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
233277
verify(mockedS3, times(2)).uploadPart(any(UploadPartRequest.class));
234-
verify(mockedS3).completeMultipartUpload(completeMultipartUploadRequestCaptor.capture());
278+
verify(mockedS3, times(1)).completeMultipartUpload(completeMultipartUploadRequestCaptor.capture());
235279
assertCompleteMultipartUploadRequest(completeMultipartUploadRequestCaptor.getValue(),
236280
List.of(new PartETag(1, "SOME_ETAG#1"), new PartETag(2, "SOME_ETAG#2")));
237281
}
238282

283+
@Test
284+
void sendAbortIfNoWritingHappened() throws IOException {
285+
when(mockedS3.initiateMultipartUpload(any()))
286+
.thenReturn(newInitiateMultipartUploadResult());
287+
288+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3);
289+
out.close();
290+
291+
verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());
292+
assertAbortMultipartUploadRequest(abortMultipartUploadRequestCaptor.getValue());
293+
assertThat(out.isClosed()).isTrue();
294+
assertThatCode(out::close).doesNotThrowAnyException();
295+
}
296+
297+
@Test
298+
void failWhenUploadingPartAfterStreamIsClosed() throws IOException {
299+
when(mockedS3.initiateMultipartUpload(any()))
300+
.thenReturn(newInitiateMultipartUploadResult());
301+
302+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3);
303+
out.close();
304+
305+
verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());
306+
assertAbortMultipartUploadRequest(abortMultipartUploadRequestCaptor.getValue());
307+
assertThat(out.isClosed()).isTrue();
308+
assertThatCode(out::close).doesNotThrowAnyException();
309+
310+
assertThatThrownBy(() -> out.write(1))
311+
.isInstanceOf(IllegalStateException.class)
312+
.hasMessage("Already closed");
313+
}
314+
239315
private static InitiateMultipartUploadResult newInitiateMultipartUploadResult() {
240316
final InitiateMultipartUploadResult initiateMultipartUploadResult = new InitiateMultipartUploadResult();
241317
initiateMultipartUploadResult.setUploadId(UPLOAD_ID);

0 commit comments

Comments
 (0)