Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to use awssdk v2 #47

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.alexmojaki</groupId>
<artifactId>s3-stream-upload</artifactId>
<version>2.2.4</version>
<version>2.2.5-SNAPSHOT</version>

<name>S3 Stream Upload</name>
<description>Manages streaming of data to S3 without knowing the size beforehand and without keeping it all in
Expand Down Expand Up @@ -38,19 +38,26 @@

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sso</artifactId>
</dependency>
<dependency>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I provide my AWS credentials on the command line using aws sso login

<groupId>software.amazon.awssdk</groupId>
<artifactId>ssooidc</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.gaul</groupId>
<artifactId>s3proxy</artifactId>
<version>1.7.0</version>
<scope>test</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand All @@ -63,9 +70,9 @@
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.11.782</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>${awssdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -74,6 +81,8 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>2.0.16</slf4j.version> <!-- Aug 2024 -->
<awssdk.version>2.27.24</awssdk.version> <!-- Sep 2024 -->
</properties>

<build>
Expand All @@ -83,8 +92,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;

import static com.amazonaws.services.s3.internal.Constants.MB;
import static alex.mojaki.s3upload.StreamTransferManager.MB;

/**
* An {@code OutputStream} which packages data written to it into discrete {@link StreamPart}s which can be obtained
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/alex/mojaki/s3upload/StreamPart.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package alex.mojaki.s3upload;

import com.amazonaws.util.Base64;
import software.amazon.awssdk.utils.BinaryUtils;

import java.io.InputStream;

Expand Down Expand Up @@ -40,7 +40,7 @@ public long size() {
}

public String getMD5Digest() {
return Base64.encodeAsString(stream.getMD5Digest());
return BinaryUtils.toBase64(stream.getMD5Digest());
}

@Override
Expand Down
153 changes: 95 additions & 58 deletions src/main/java/alex/mojaki/s3upload/StreamTransferManager.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package alex.mojaki.s3upload;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import com.amazonaws.util.BinaryUtils;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.utils.BinaryUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.ArrayList;
Expand All @@ -15,8 +25,6 @@
import java.util.List;
import java.util.concurrent.*;

import static com.amazonaws.services.s3.internal.Constants.MB;

// @formatter:off
/**
* Manages streaming of data to S3 without knowing the size beforehand and without keeping it all in memory or
Expand Down Expand Up @@ -85,7 +93,7 @@ public void run() {
* on what this class can accomplish. If order of data is important to you, then either use only one stream or ensure
* that you write at least 5 MB to every stream.
* <p>
* While performing the multipart upload this class will create instances of {@link InitiateMultipartUploadRequest},
* While performing the multipart upload this class will create instances of {@link CreateMultipartUploadRequest},
* {@link UploadPartRequest}, and {@link CompleteMultipartUploadRequest}, fill in the essential details, and send them
* off. If you need to add additional details then override the appropriate {@code customise*Request} methods and
* set the required properties within. Note that if no data is written (i.e. the object body is empty) then a normal (not multipart) upload will be performed and {@code customisePutEmptyObjectRequest} will be called instead.
Expand All @@ -105,16 +113,18 @@ public class StreamTransferManager {

private static final Logger log = LoggerFactory.getLogger(StreamTransferManager.class);

public final static int MB = 1024 * 1024;

protected final String bucketName;
protected final String putKey;
protected final AmazonS3 s3Client;
protected final S3Client s3Client;
protected String uploadId;
protected int numStreams = 1;
protected int numUploadThreads = 1;
protected int queueCapacity = 1;
protected int partSize = 5 * MB;
protected boolean checkIntegrity = false;
private final List<PartETag> partETags = Collections.synchronizedList(new ArrayList<PartETag>());
private final List<CompletedPart> completedParts = Collections.synchronizedList(new ArrayList<CompletedPart>());
private List<MultiPartOutputStream> multiPartOutputStreams;
private ExecutorServiceResultsHandler<Void> executorServiceResultsHandler;
private ClosableQueue<StreamPart> queue;
Expand All @@ -126,7 +136,7 @@ public class StreamTransferManager {

public StreamTransferManager(String bucketName,
String putKey,
AmazonS3 s3Client) {
S3Client s3Client) {
this.bucketName = bucketName;
this.putKey = putKey;
this.s3Client = s3Client;
Expand Down Expand Up @@ -292,12 +302,12 @@ private void ensureCanSet() {
}

/**
* Deprecated constructor kept for backward compatibility. Use {@link StreamTransferManager#StreamTransferManager(String, String, AmazonS3)} and then chain the desired setters.
* Deprecated constructor kept for backward compatibility. Use {@link StreamTransferManager#StreamTransferManager(String, String, S3Client)} and then chain the desired setters.
*/
@Deprecated
public StreamTransferManager(String bucketName,
String putKey,
AmazonS3 s3Client,
S3Client s3Client,
int numStreams,
int numUploadThreads,
int queueCapacity,
Expand All @@ -322,10 +332,15 @@ public List<MultiPartOutputStream> getMultiPartOutputStreams() {

queue = new ClosableQueue<StreamPart>(queueCapacity);
log.debug("Initiating multipart upload to {}/{}", bucketName, putKey);
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, putKey);
customiseInitiateRequest(initRequest);
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
uploadId = initResponse.getUploadId();
CreateMultipartUploadRequest initRequest = CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(putKey)
.applyMutation(this::customiseInitiateRequest)
.build();

CreateMultipartUploadResponse createMultipartUploadResponse = s3Client
.createMultipartUpload(initRequest);
uploadId = createMultipartUploadResponse.uploadId();
log.info("Initiated multipart upload to {}/{} with full ID {}", bucketName, putKey, uploadId);
try {
multiPartOutputStreams = new ArrayList<MultiPartOutputStream>();
Expand Down Expand Up @@ -369,29 +384,33 @@ public void complete() {
log.debug("{}: Leftover uploaded", this);
}
log.debug("{}: Completing", this);
if (partETags.isEmpty()) {
if (completedParts.isEmpty()) {
log.debug("{}: Aborting upload of empty stream", this);
abort();
log.info("{}: Putting empty object", this);
ByteArrayInputStream emptyStream = new ByteArrayInputStream(new byte[]{});
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
PutObjectRequest request = new PutObjectRequest(bucketName, putKey, emptyStream, metadata);
customisePutEmptyObjectRequest(request);
s3Client.putObject(request);
PutObjectRequest request = PutObjectRequest.builder()
.bucket(bucketName)
.key(putKey)
.contentLength(0L)
.applyMutation(this::customisePutEmptyObjectRequest)
.build();
s3Client.putObject(request, RequestBody.empty());
} else {
List<PartETag> sortedParts = new ArrayList<PartETag>(partETags);
List<CompletedPart> sortedParts = new ArrayList<CompletedPart>(completedParts);
Collections.sort(sortedParts, new PartNumberComparator());
CompleteMultipartUploadRequest completeRequest = new
CompleteMultipartUploadRequest(
bucketName,
putKey,
uploadId,
sortedParts);
customiseCompleteRequest(completeRequest);
CompleteMultipartUploadResult completeMultipartUploadResult = s3Client.completeMultipartUpload(completeRequest);

CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
.key(putKey)
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(sortedParts).build())
.applyMutation(this::customiseCompleteRequest)
.build();

CompleteMultipartUploadResponse completeMultipartUploadResponse = s3Client
.completeMultipartUpload(completeRequest);
if (checkIntegrity) {
checkCompleteFileIntegrity(completeMultipartUploadResult.getETag(), sortedParts);
checkCompleteFileIntegrity(completeMultipartUploadResponse.eTag(), sortedParts);
}
}
log.info("{}: Completed", this);
Expand All @@ -403,7 +422,7 @@ public void complete() {
}
}

private void checkCompleteFileIntegrity(String s3ObjectETag, List<PartETag> sortedParts) {
private void checkCompleteFileIntegrity(String s3ObjectETag, List<CompletedPart> sortedParts) {
String expectedETag = computeCompleteFileETag(sortedParts);
if (!expectedETag.equals(s3ObjectETag)) {
throw new IntegrityCheckException(String.format(
Expand All @@ -412,16 +431,20 @@ private void checkCompleteFileIntegrity(String s3ObjectETag, List<PartETag> sort
}
}

private String computeCompleteFileETag(List<PartETag> parts) {
private String computeCompleteFileETag(List<CompletedPart> parts) {
// When S3 combines the parts of a multipart upload into the final object, the ETag value is set to the
// hex-encoded MD5 hash of the concatenated binary-encoded (raw bytes) MD5 hashes of each part followed by
// "-" and the number of parts.
MessageDigest md = Utils.md5();
for (PartETag partETag : parts) {
md.update(BinaryUtils.fromHex(partETag.getETag()));
for (CompletedPart partETag : parts) {
String eTag = partETag.eTag();
// eTag is JSON and contains double quotes e.g. "de760a5fb2b12a108a5e61a96cd7166c"
String hexOnly = eTag.substring(1, eTag.length() - 1);
md.update(BinaryUtils.fromHex(hexOnly));
}
// Represent byte array as a 32-digit number hexadecimal format followed by "-<partCount>".
return String.format("%032x-%d", new BigInteger(1, md.digest()), parts.size());
// Wrapped in double quotes to match eTag response from S3 (JSON string).
return String.format("\"%032x-%d\"", new BigInteger(1, md.digest()), parts.size());
}

/**
Expand Down Expand Up @@ -466,8 +489,12 @@ public void abort() {
}
if (uploadId != null) {
log.debug("{}: Aborting", this);
AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest(
bucketName, putKey, uploadId);
AbortMultipartUploadRequest abortMultipartUploadRequest = AbortMultipartUploadRequest.builder()
.bucket(bucketName)
.key(putKey)
.uploadId(uploadId)
.build();

s3Client.abortMultipartUpload(abortMultipartUploadRequest);
log.info("{}: Aborted", this);
}
Expand Down Expand Up @@ -544,19 +571,29 @@ part remaining, which S3 can accept. It is uploaded in the complete() method.
private void uploadStreamPart(StreamPart part) {
log.debug("{}: Uploading {}", this, part);

UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucketName).withKey(putKey)
.withUploadId(uploadId).withPartNumber(part.getPartNumber())
.withInputStream(part.getInputStream())
.withPartSize(part.size());
UploadPartRequest.Builder builder = UploadPartRequest.builder()
.bucket(bucketName)
.key(putKey)
.uploadId(uploadId)
.partNumber(part.getPartNumber());

if (checkIntegrity) {
uploadRequest.setMd5Digest(part.getMD5Digest());
builder.contentMD5(part.getMD5Digest());
}
customiseUploadPartRequest(uploadRequest);

UploadPartResult uploadPartResult = s3Client.uploadPart(uploadRequest);
PartETag partETag = uploadPartResult.getPartETag();
partETags.add(partETag);
UploadPartRequest uploadRequest = builder
.applyMutation(this::customiseUploadPartRequest)
.build();

UploadPartResponse uploadPartResponse = s3Client.uploadPart(
uploadRequest,
RequestBody.fromInputStream(part.getInputStream(), part.size()));

CompletedPart completedPart = CompletedPart.builder()
.eTag(uploadPartResponse.eTag())
.partNumber(part.getPartNumber())
.build();
completedParts.add(completedPart);
log.info("{}: Finished uploading {}", this, part);
}

Expand All @@ -569,26 +606,26 @@ public String toString() {
// These methods are intended to be overridden for more specific interactions with the AWS API.

@SuppressWarnings("unused")
public void customiseInitiateRequest(InitiateMultipartUploadRequest request) {
public void customiseInitiateRequest(CreateMultipartUploadRequest.Builder requestBuilder) {
}

@SuppressWarnings("unused")
public void customiseUploadPartRequest(UploadPartRequest request) {
public void customiseUploadPartRequest(UploadPartRequest.Builder requestBuilder) {
}

@SuppressWarnings("unused")
public void customiseCompleteRequest(CompleteMultipartUploadRequest request) {
public void customiseCompleteRequest(CompleteMultipartUploadRequest.Builder requestBuilder) {
}

@SuppressWarnings("unused")
public void customisePutEmptyObjectRequest(PutObjectRequest request) {
public void customisePutEmptyObjectRequest(PutObjectRequest.Builder requestBuilder) {
}

private static class PartNumberComparator implements Comparator<PartETag> {
private static class PartNumberComparator implements Comparator<CompletedPart> {
@Override
public int compare(PartETag o1, PartETag o2) {
int partNumber1 = o1.getPartNumber();
int partNumber2 = o2.getPartNumber();
public int compare(CompletedPart o1, CompletedPart o2) {
int partNumber1 = o1.partNumber();
int partNumber2 = o2.partNumber();

if (partNumber1 == partNumber2) {
return 0;
Expand Down
Loading