Skip to content

Commit 8b1a686

Browse files
authored
fix: do not signal onComplete when the incoming buffer length is less than the cipher block (#209)
1 parent 5ee8b08 commit 8b1a686

File tree

5 files changed

+166
-9
lines changed

5 files changed

+166
-9
lines changed

src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,21 @@ public void onNext(ByteBuffer byteBuffer) {
5050
if (amountToReadFromByteBuffer > 0) {
5151
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer);
5252
outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer);
53-
if (outputBuffer == null && amountToReadFromByteBuffer < cipher.getBlockSize()) {
54-
// The underlying data is too short to fill in the block cipher
55-
// This is true at the end of the file, so complete to get the final
56-
// bytes
57-
this.onComplete();
53+
if (outputBuffer == null || outputBuffer.length == 0) {
54+
// The underlying data is too short to fill in the block cipher.
55+
// Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
56+
// in practice SunJCE and ACCP return an empty buffer instead, hence checks for
57+
// null OR length == 0.
58+
if (contentRead.get() == contentLength) {
59+
// All content has been read, so complete to get the final bytes
60+
this.onComplete();
61+
}
62+
// Otherwise, wait for more bytes. To avoid blocking,
63+
// send an empty buffer to the wrapped subscriber.
64+
wrappedSubscriber.onNext(ByteBuffer.allocate(0));
65+
} else {
66+
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
5867
}
59-
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
6068
} else {
6169
// Do nothing
6270
wrappedSubscriber.onNext(byteBuffer);

src/main/java/software/amazon/encryption/s3/legacy/internal/AdjustedRangeSubscriber.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,22 @@ private void initializeForRead(long rangeBeginning, long rangeEnd) {
3939
this.virtualAvailable = (rangeEnd - rangeBeginning) + 1;
4040
}
4141

42-
4342
@Override
4443
public void onSubscribe(Subscription s) {
44+
// In edge cases where the beginning index exceeds the offset,
45+
// there is never valid data to read, so signal completion immediately.
46+
// Otherwise, the CipherSubscriber tries and fails to read the last block.
47+
// This probably should be an exception, but previous implementations
48+
// return an empty string; signalling onComplete accomplishes this result
49+
// and thus maintains compatibility.
50+
if (virtualAvailable <= 0) {
51+
wrappedSubscriber.onComplete();
52+
}
4553
wrappedSubscriber.onSubscribe(s);
4654
}
4755

4856
@Override
4957
public void onNext(ByteBuffer byteBuffer) {
50-
// In edge cases where the beginning index exceeds the offset,
51-
// there is never valid data to read, so signal completion immediately.
5258
if (virtualAvailable <= 0) {
5359
wrappedSubscriber.onComplete();
5460
}

src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import com.amazonaws.services.s3.model.EncryptionMaterials;
1414
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
1515
import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
16+
import org.apache.commons.io.IOUtils;
17+
import org.bouncycastle.jce.provider.BouncyCastleProvider;
1618
import org.junit.jupiter.api.BeforeAll;
1719
import org.junit.jupiter.api.Test;
1820
import software.amazon.awssdk.core.ResponseBytes;
@@ -29,18 +31,27 @@
2931
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
3032
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
3133
import software.amazon.awssdk.services.s3.model.S3Exception;
34+
import software.amazon.encryption.s3.utils.BoundedInputStream;
35+
import software.amazon.encryption.s3.utils.TinyBufferAsyncRequestBody;
3236

3337
import javax.crypto.KeyGenerator;
3438
import javax.crypto.SecretKey;
39+
import java.io.IOException;
40+
import java.io.InputStream;
3541
import java.security.NoSuchAlgorithmException;
42+
import java.security.Provider;
43+
import java.security.Security;
3644
import java.util.ArrayList;
3745
import java.util.List;
3846
import java.util.concurrent.CompletableFuture;
3947
import java.util.concurrent.CompletionException;
48+
import java.util.concurrent.ExecutorService;
49+
import java.util.concurrent.Executors;
4050

4151
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
4252
import static org.junit.jupiter.api.Assertions.assertEquals;
4353
import static org.junit.jupiter.api.Assertions.assertThrows;
54+
import static org.junit.jupiter.api.Assertions.assertTrue;
4455
import static software.amazon.encryption.s3.utils.S3EncryptionClientTestResources.BUCKET;
4556
import static software.amazon.encryption.s3.utils.S3EncryptionClientTestResources.appendTestSuffix;
4657
import static software.amazon.encryption.s3.utils.S3EncryptionClientTestResources.deleteObject;
@@ -407,4 +418,50 @@ public void copyObjectTransparentlyAsync() {
407418
v3AsyncClient.close();
408419
}
409420

421+
/**
422+
* Test which artificially limits the size of buffers using {@link TinyBufferAsyncRequestBody}.
423+
* This tests edge cases where network conditions result in buffers with length shorter than
424+
* the cipher's block size.
425+
* @throws IOException
426+
*/
427+
@Test
428+
public void tinyBufferTest() throws IOException {
429+
// BouncyCastle actually returns null buffers, unlike ACCP and SunJCE, which return empty buffers
430+
Security.addProvider(new BouncyCastleProvider());
431+
Provider provider = Security.getProvider("BC");
432+
final String objectKey = appendTestSuffix("tiny-buffer-async");
433+
434+
S3AsyncClient v3AsyncClient = S3AsyncEncryptionClient.builder()
435+
.aesKey(AES_KEY)
436+
.cryptoProvider(provider)
437+
.build();
438+
439+
// need enough data to split up
440+
final long inputLength = 1024;
441+
final InputStream input = new BoundedInputStream(inputLength);
442+
final InputStream inputClean = new BoundedInputStream(inputLength);
443+
444+
final ExecutorService exec = Executors.newSingleThreadExecutor();
445+
446+
// Use this request body to limit the buffer size
447+
TinyBufferAsyncRequestBody tinyBufferAsyncRequestBody = new TinyBufferAsyncRequestBody(AsyncRequestBody.fromInputStream(input, inputLength, exec));
448+
CompletableFuture<PutObjectResponse> futurePut = v3AsyncClient.putObject(builder -> builder
449+
.bucket(BUCKET)
450+
.key(objectKey)
451+
.build(), tinyBufferAsyncRequestBody);
452+
futurePut.join();
453+
454+
CompletableFuture<ResponseBytes<GetObjectResponse>> futureGet = v3AsyncClient.getObject(builder -> builder
455+
.bucket(BUCKET)
456+
.key(objectKey)
457+
.build(), AsyncResponseTransformer.toBytes());
458+
ResponseBytes<GetObjectResponse> getResponse = futureGet.join();
459+
assertTrue(IOUtils.contentEquals(inputClean, getResponse.asInputStream()));
460+
461+
// Cleanup
462+
deleteObject(BUCKET, objectKey, v3AsyncClient);
463+
v3AsyncClient.close();
464+
exec.shutdown();
465+
}
466+
410467
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package software.amazon.encryption.s3.utils;
2+
3+
import org.reactivestreams.Subscriber;
4+
import software.amazon.awssdk.core.async.AsyncRequestBody;
5+
6+
import java.nio.ByteBuffer;
7+
import java.util.Optional;
8+
9+
/**
10+
* AsyncRequestBody which wraps another AsyncRequestBody with a {@link TinyBufferSubscriber}.
11+
* This is useful for testing poor network conditions where buffers may not be larger than
12+
* the cipher's block size.
13+
*/
14+
public class TinyBufferAsyncRequestBody implements AsyncRequestBody {
15+
16+
private final AsyncRequestBody wrappedAsyncRequestBody;
17+
18+
public TinyBufferAsyncRequestBody(final AsyncRequestBody wrappedRequestBody) {
19+
wrappedAsyncRequestBody = wrappedRequestBody;
20+
}
21+
22+
@Override
23+
public Optional<Long> contentLength() {
24+
return wrappedAsyncRequestBody.contentLength();
25+
}
26+
27+
@Override
28+
public void subscribe(Subscriber<? super ByteBuffer> s) {
29+
wrappedAsyncRequestBody.subscribe(new TinyBufferSubscriber(s));
30+
}
31+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package software.amazon.encryption.s3.utils;
2+
3+
import org.reactivestreams.Subscriber;
4+
import org.reactivestreams.Subscription;
5+
import software.amazon.awssdk.utils.BinaryUtils;
6+
7+
import java.nio.ByteBuffer;
8+
9+
/**
10+
* Subscriber which purposefully limits the size of buffers sent to
11+
* the wrapped subscriber. This is useful for simulating adverse network conditions.
12+
*/
13+
public class TinyBufferSubscriber implements Subscriber<ByteBuffer> {
14+
15+
private final Subscriber<? super ByteBuffer> wrappedSubscriber;
16+
17+
public TinyBufferSubscriber(final Subscriber wrappedSubscriber){
18+
this.wrappedSubscriber = wrappedSubscriber;
19+
}
20+
21+
@Override
22+
public void onSubscribe(Subscription s) {
23+
wrappedSubscriber.onSubscribe(s);
24+
}
25+
26+
@Override
27+
public void onNext(ByteBuffer b) {
28+
int i = 0;
29+
// any value below GCM block size works
30+
int chunkSize = 5;
31+
while (b.remaining() > chunkSize) {
32+
ByteBuffer tb = b.slice();
33+
tb.limit(chunkSize);
34+
byte[] intermediateBuf = BinaryUtils.copyBytesFrom(tb, chunkSize);
35+
b.position(i + chunkSize);
36+
i += chunkSize;
37+
wrappedSubscriber.onNext(ByteBuffer.wrap(intermediateBuf));
38+
}
39+
// send the rest of the bytes
40+
ByteBuffer sb = b.slice();
41+
sb.limit(b.remaining());
42+
byte[] intermedBuf = BinaryUtils.copyBytesFrom(sb, chunkSize);
43+
wrappedSubscriber.onNext(ByteBuffer.wrap(intermedBuf));
44+
}
45+
46+
@Override
47+
public void onError(Throwable t) {
48+
wrappedSubscriber.onError(t);
49+
}
50+
51+
@Override
52+
public void onComplete() {
53+
wrappedSubscriber.onComplete();
54+
}
55+
}

0 commit comments

Comments
 (0)