-
Notifications
You must be signed in to change notification settings - Fork 947
V4a async payload signing #6475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "type": "feature", | ||
| "category": "AWS SDK for Java v2", | ||
| "contributor": "", | ||
| "description": "Add support for signing async payloads in the default `AwsV4aHttpSigner`." | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,13 +20,19 @@ | |
| import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD; | ||
| import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER; | ||
| import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_UNSIGNED_PAYLOAD_TRAILER; | ||
| import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_DECODED_CONTENT_LENGTH; | ||
| import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_TRAILER; | ||
| import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerUtils.computeAndMoveContentLength; | ||
|
|
||
| import java.io.InputStream; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import org.reactivestreams.Publisher; | ||
| import software.amazon.awssdk.annotations.SdkInternalApi; | ||
| import software.amazon.awssdk.checksums.SdkChecksum; | ||
| import software.amazon.awssdk.checksums.spi.ChecksumAlgorithm; | ||
|
|
@@ -35,10 +41,13 @@ | |
| import software.amazon.awssdk.http.SdkHttpRequest; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.CredentialScope; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.NoOpPayloadChecksumStore; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.AsyncChunkEncodedPayload; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChecksumTrailerProvider; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedInputStream; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPayload; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPublisher; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.SyncChunkEncodedPayload; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.TrailerProvider; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.io.ChecksumInputStream; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.io.ResettableContentStreamProvider; | ||
| import software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerUtils; | ||
| import software.amazon.awssdk.http.auth.spi.signer.PayloadChecksumStore; | ||
|
|
@@ -55,6 +64,8 @@ | |
| @SdkInternalApi | ||
| public final class AwsChunkedV4aPayloadSigner implements V4aPayloadSigner { | ||
| private static final Logger LOG = Logger.loggerFor(AwsChunkedV4aPayloadSigner.class); | ||
| // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes> | ||
| private static final int CHUNK_SIGNATURE_EXTENSION_LENGTH = 161; | ||
|
|
||
| private final CredentialScope credentialScope; | ||
| private final int chunkSize; | ||
|
|
@@ -83,59 +94,131 @@ public ContentStreamProvider sign(ContentStreamProvider payload, V4aRequestSigni | |
| .chunkSize(chunkSize) | ||
| .header(chunk -> Integer.toHexString(chunk.remaining()).getBytes(StandardCharsets.UTF_8)); | ||
|
|
||
| preExistingTrailers.forEach(trailer -> chunkedEncodedInputStreamBuilder.addTrailer(() -> trailer)); | ||
| SyncChunkEncodedPayload chunkedPayload = new SyncChunkEncodedPayload(chunkedEncodedInputStreamBuilder); | ||
|
|
||
| signCommon(chunkedPayload, requestSigningResult); | ||
|
|
||
| return new ResettableContentStreamProvider(chunkedEncodedInputStreamBuilder::build); | ||
| } | ||
|
|
||
| /** | ||
| * Given a payload and result of request signing, sign the payload via the SigV4 process. | ||
| */ | ||
| @Override | ||
| public Publisher<ByteBuffer> signAsync(Publisher<ByteBuffer> payload, V4aRequestSigningResult requestSigningResult) { | ||
| ChunkedEncodedPublisher.Builder chunkedStreamBuilder = ChunkedEncodedPublisher.builder() | ||
| .publisher(payload) | ||
| .chunkSize(chunkSize) | ||
| .addEmptyTrailingChunk(true); | ||
| AsyncChunkEncodedPayload chunkedPayload = new AsyncChunkEncodedPayload(chunkedStreamBuilder); | ||
|
|
||
| signCommon(chunkedPayload, requestSigningResult); | ||
|
|
||
| return chunkedStreamBuilder.build(); | ||
| } | ||
|
|
||
| private ChunkedEncodedPayload signCommon(ChunkedEncodedPayload payload, V4aRequestSigningResult requestSigningResult) { | ||
| SdkHttpRequest.Builder request = requestSigningResult.getSignedRequest(); | ||
|
|
||
| payload.decodedContentLength(request.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH) | ||
| .map(Long::parseLong) | ||
| .orElseThrow(() -> { | ||
| String msg = String.format("Expected header '%s' to be present", | ||
| X_AMZ_DECODED_CONTENT_LENGTH); | ||
| return new RuntimeException(msg); | ||
| })); | ||
|
|
||
| preExistingTrailers.forEach(trailer -> payload.addTrailer(() -> trailer)); | ||
|
|
||
| switch (requestSigningResult.getSigningConfig().getSignedBodyValue()) { | ||
| case STREAMING_ECDSA_SIGNED_PAYLOAD: { | ||
| RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(), | ||
| requestSigningResult.getSigningConfig()); | ||
| chunkedEncodedInputStreamBuilder.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); | ||
| payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); | ||
| break; | ||
| } | ||
| case STREAMING_UNSIGNED_PAYLOAD_TRAILER: | ||
| setupChecksumTrailerIfNeeded(chunkedEncodedInputStreamBuilder); | ||
| setupChecksumTrailerIfNeeded(payload); | ||
| break; | ||
| case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: { | ||
| RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(), | ||
| requestSigningResult.getSigningConfig()); | ||
| chunkedEncodedInputStreamBuilder.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); | ||
| setupChecksumTrailerIfNeeded(chunkedEncodedInputStreamBuilder); | ||
| chunkedEncodedInputStreamBuilder.addTrailer( | ||
| new SigV4aTrailerProvider(chunkedEncodedInputStreamBuilder.trailers(), rollingSigner, credentialScope) | ||
| payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); | ||
| setupChecksumTrailerIfNeeded(payload); | ||
| payload.addTrailer( | ||
| new SigV4aTrailerProvider(payload.trailers(), rollingSigner, credentialScope) | ||
| ); | ||
| break; | ||
| } | ||
| default: | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| return new ResettableContentStreamProvider(chunkedEncodedInputStreamBuilder::build); | ||
| return payload; | ||
| } | ||
|
|
||
| @Override | ||
| public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider payload, String checksum) { | ||
| long encodedContentLength = 0; | ||
| long contentLength = SignerUtils.computeAndMoveContentLength(request, payload); | ||
| long contentLength = computeAndMoveContentLength(request, payload); | ||
| setupPreExistingTrailers(request); | ||
|
|
||
| // pre-existing trailers | ||
| long encodedContentLength = calculateEncodedContentLength(contentLength, checksum); | ||
|
|
||
| if (checksumAlgorithm != null) { | ||
| String checksumHeaderName = checksumHeaderName(checksumAlgorithm); | ||
| request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); | ||
| } | ||
| request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); | ||
| // CRT-signed request doesn't expect 'aws-chunked' Content-Encoding, so we don't add it | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<Pair<SdkHttpRequest.Builder, Optional<Publisher<ByteBuffer>>>> beforeSigningAsync( | ||
| SdkHttpRequest.Builder request, Publisher<ByteBuffer> payload, String checksum) { | ||
|
|
||
| return SignerUtils.moveContentLength(request, payload) | ||
| .thenApply(p -> { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: is there any difference between sigv4a and sigv4 for this logic here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, we can consolidate this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually nm, they're slightly different that it's a bit annoying to consolidate the two. The normal V4a version doesn't have a |
||
| SdkHttpRequest.Builder requestBuilder = p.left(); | ||
| setupPreExistingTrailers(requestBuilder); | ||
|
|
||
| long decodedContentLength = | ||
| requestBuilder.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH) | ||
| .map(Long::parseLong) | ||
| // should not happen, this header is added by | ||
| // moveContentLength | ||
| .orElseThrow(() -> new IllegalArgumentException( | ||
| X_AMZ_DECODED_CONTENT_LENGTH + " header not present")); | ||
|
|
||
| long encodedContentLength = calculateEncodedContentLength(decodedContentLength, checksum); | ||
|
|
||
| if (checksumAlgorithm != null) { | ||
| String checksumHeaderName = checksumHeaderName(checksumAlgorithm); | ||
| request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); | ||
| } | ||
| request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); | ||
|
|
||
| return Pair.of(requestBuilder, p.right()); | ||
| }); | ||
| } | ||
|
|
||
| private long calculateEncodedContentLength(long decodedContentLength, String checksum) { | ||
| long encodedContentLength = 0; | ||
|
|
||
| encodedContentLength += calculateExistingTrailersLength(); | ||
|
|
||
| switch (checksum) { | ||
dagnir marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| case STREAMING_ECDSA_SIGNED_PAYLOAD: { | ||
| long extensionsLength = 161; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes> | ||
| encodedContentLength += calculateChunksLength(contentLength, extensionsLength); | ||
| encodedContentLength += calculateChunksLength(decodedContentLength, CHUNK_SIGNATURE_EXTENSION_LENGTH); | ||
| break; | ||
| } | ||
| case STREAMING_UNSIGNED_PAYLOAD_TRAILER: | ||
| if (checksumAlgorithm != null) { | ||
| encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm)); | ||
| } | ||
| encodedContentLength += calculateChunksLength(contentLength, 0); | ||
| encodedContentLength += calculateChunksLength(decodedContentLength, 0); | ||
| break; | ||
| case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: { | ||
| long extensionsLength = 161; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes> | ||
| encodedContentLength += calculateChunksLength(contentLength, extensionsLength); | ||
| encodedContentLength += calculateChunksLength(decodedContentLength, CHUNK_SIGNATURE_EXTENSION_LENGTH); | ||
| if (checksumAlgorithm != null) { | ||
| encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm)); | ||
| } | ||
|
|
@@ -149,12 +232,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider | |
| // terminating \r\n | ||
| encodedContentLength += 2; | ||
|
|
||
| if (checksumAlgorithm != null) { | ||
| String checksumHeaderName = checksumHeaderName(checksumAlgorithm); | ||
| request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); | ||
| } | ||
| request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); | ||
| // CRT-signed request doesn't expect 'aws-chunked' Content-Encoding, so we don't add it | ||
| return encodedContentLength; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -238,12 +316,7 @@ private long calculateChecksumTrailerLength(String checksumHeaderName) { | |
| return lengthInBytes + 2; | ||
| } | ||
|
|
||
| /** | ||
| * Add the checksum as a trailer to the chunk-encoded stream. | ||
| * <p> | ||
| * If the checksum-algorithm is not present, then nothing is done. | ||
| */ | ||
| private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder builder) { | ||
| private void setupChecksumTrailerIfNeeded(ChunkedEncodedPayload payload) { | ||
| if (checksumAlgorithm == null) { | ||
| return; | ||
| } | ||
|
|
@@ -254,20 +327,17 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil | |
| if (cachedChecksum != null) { | ||
| LOG.debug(() -> String.format("Cached payload checksum available for algorithm %s: %s. Using cached value", | ||
| checksumAlgorithm.algorithmId(), checksumHeaderName)); | ||
| builder.addTrailer(() -> Pair.of(checksumHeaderName, Collections.singletonList(cachedChecksum))); | ||
| payload.addTrailer(() -> Pair.of(checksumHeaderName, Collections.singletonList(cachedChecksum))); | ||
| return; | ||
| } | ||
|
|
||
| SdkChecksum sdkChecksum = fromChecksumAlgorithm(checksumAlgorithm); | ||
| ChecksumInputStream checksumInputStream = new ChecksumInputStream( | ||
| builder.inputStream(), | ||
| Collections.singleton(sdkChecksum) | ||
| ); | ||
| payload.checksumPayload(sdkChecksum); | ||
|
|
||
| TrailerProvider checksumTrailer = | ||
| new ChecksumTrailerProvider(sdkChecksum, checksumHeaderName, checksumAlgorithm, payloadChecksumStore); | ||
|
|
||
| builder.inputStream(checksumInputStream).addTrailer(checksumTrailer); | ||
| payload.addTrailer(checksumTrailer); | ||
| } | ||
|
|
||
| private String getCachedChecksum() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /* | ||
| * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). | ||
| * You may not use this file except in compliance with the License. | ||
| * A copy of the License is located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed | ||
| * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|
|
||
| package software.amazon.awssdk.http.auth.aws.crt.internal.signer; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import org.reactivestreams.Publisher; | ||
| import software.amazon.awssdk.annotations.SdkInternalApi; | ||
| import software.amazon.awssdk.crt.http.HttpRequestBodyStream; | ||
| import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber; | ||
| import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult; | ||
|
|
||
| @SdkInternalApi | ||
| public final class CrtRequestBodyAdapter implements HttpRequestBodyStream { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we move this existing class https://github.com/aws/aws-sdk-java-v2/blob/master/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah makes sense! |
||
| private static final int BUFFER_SIZE = 4 * 1024 * 1024; // 4 MB | ||
| private final Publisher<ByteBuffer> requestPublisher; | ||
| private final long contentLength; | ||
| private ByteBufferStoringSubscriber requestBodySubscriber; | ||
|
|
||
| public CrtRequestBodyAdapter(Publisher<ByteBuffer> requestPublisher, long contentLength) { | ||
| this.requestPublisher = requestPublisher; | ||
| this.contentLength = contentLength; | ||
| this.requestBodySubscriber = new ByteBufferStoringSubscriber(BUFFER_SIZE); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean sendRequestBody(ByteBuffer bodyBytesOut) { | ||
| return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean resetPosition() { | ||
| requestBodySubscriber = new ByteBufferStoringSubscriber(BUFFER_SIZE); | ||
| requestPublisher.subscribe(requestBodySubscriber); | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public long getLength() { | ||
| return contentLength; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.