Skip to content

Commit bd9eab6

Browse files
Add delete payload batch feature.
1 parent d9c73b6 commit bd9eab6

File tree

9 files changed

+290
-3
lines changed

9 files changed

+290
-3
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>software.amazon.payloadoffloading</groupId>
88
<artifactId>payloadoffloading-common</artifactId>
9-
<version>2.2.0</version>
9+
<version>2.2.1</version>
1010
<packaging>jar</packaging>
1111
<name>Payload offloading common library for AWS</name>
1212
<description>Common library between extended Amazon AWS clients to save payloads up to 2GB on Amazon S3.</description>

src/main/java/software/amazon/payloadoffloading/PayloadStore.java

+17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package software.amazon.payloadoffloading;
22

3+
import java.util.Collection;
34
import software.amazon.awssdk.core.exception.SdkClientException;
45
import software.amazon.awssdk.services.s3.model.S3Exception;
56

@@ -60,4 +61,20 @@ public interface PayloadStore {
6061
* a server side issue.
6162
*/
6263
void deleteOriginalPayload(String payloadPointer);
64+
65+
/**
66+
* Deletes original payloads using the given payloadPointers. The pointers must
67+
* have been obtained using {@link storeOriginalPayload}
68+
* <p>
69+
* This call will be more efficient than deleting payloads one at a time if the payloads
70+
* are in the same S3 bucket.
71+
*
72+
* @param payloadPointers
73+
* @throws SdkClientException If any internal errors are encountered on the client side while
74+
* attempting to make the request or handle the response to/from PayloadStore.
75+
* For example, if payloadPointer is invalid or a network connection is not available.
76+
* @throws S3Exception If an error response is returned by actual PayloadStore indicating
77+
* a server side issue.
78+
*/
79+
void deleteOriginalPayloads(Collection<String> payloadPointers);
6380
}

src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java

+21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package software.amazon.payloadoffloading;
22

3+
import java.util.Collection;
34
import java.util.concurrent.CompletableFuture;
45
import software.amazon.awssdk.core.exception.SdkClientException;
56
import software.amazon.awssdk.services.s3.model.S3Exception;
@@ -75,4 +76,24 @@ public interface PayloadStoreAsync {
7576
* a server side issue.
7677
*/
7778
CompletableFuture<Void> deleteOriginalPayload(String payloadPointer);
79+
80+
/**
81+
* Deletes the original payload using the given payloadPointer. The pointer must
82+
* have been obtained using {@link #storeOriginalPayload(String)}
83+
* <p>
84+
* This call will be more efficient than deleting payloads one at a time if the payloads
85+
* are in the same S3 bucket.
86+
* <p>
87+
* This call is asynchronous, and so documented return values and exceptions are propagated through
88+
* the returned {@link CompletableFuture}.
89+
*
90+
* @param payloadPointers
91+
* @return future value that completes when the delete operation finishes
92+
* @throws SdkClientException If any internal errors are encountered on the client side while
93+
* attempting to make the request or handle the response to/from PayloadStore.
94+
* For example, if payloadPointer is invalid or a network connection is not available.
95+
* @throws S3Exception If an error response is returned by actual PayloadStore indicating
96+
* a server side issue.
97+
*/
98+
CompletableFuture<Void> deleteOriginalPayloads(Collection<String> payloadPointers);
7899
}

src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java

+34
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package software.amazon.payloadoffloading;
22

33
import java.io.UncheckedIOException;
4+
import java.util.Collection;
45
import java.util.concurrent.CompletableFuture;
56
import java.util.concurrent.CompletionException;
7+
import java.util.stream.Collectors;
68
import org.slf4j.Logger;
79
import org.slf4j.LoggerFactory;
810
import software.amazon.awssdk.core.ResponseBytes;
@@ -11,9 +13,12 @@
1113
import software.amazon.awssdk.core.exception.SdkClientException;
1214
import software.amazon.awssdk.core.exception.SdkException;
1315
import software.amazon.awssdk.services.s3.S3AsyncClient;
16+
import software.amazon.awssdk.services.s3.model.Delete;
1417
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
18+
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
1519
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
1620
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
21+
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
1722
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
1823

1924
/**
@@ -115,4 +120,33 @@ public CompletableFuture<Void> deletePayloadFromS3(String s3BucketName, String s
115120
return null;
116121
});
117122
}
123+
124+
public CompletableFuture<Void> deletePayloadsFromS3(String s3BucketName, Collection<String> s3Keys) {
125+
DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder()
126+
.bucket(s3BucketName)
127+
.delete(Delete.builder()
128+
.objects(s3Keys.stream()
129+
.map(s3Key -> ObjectIdentifier.builder()
130+
.key(s3Key)
131+
.build())
132+
.collect(Collectors.toList()))
133+
.build())
134+
.build();
135+
136+
return s3Client.deleteObjects(deleteObjectsRequest)
137+
.handle((v, tIn) -> {
138+
if (tIn != null) {
139+
Throwable t = Util.unwrapFutureException(tIn);
140+
if (t instanceof SdkException) {
141+
String errorMessage = "Failed to delete the S3 object which contains the payload";
142+
LOG.error(errorMessage, t);
143+
throw SdkException.create(errorMessage, t);
144+
}
145+
throw new CompletionException(t);
146+
}
147+
148+
LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object keys: " + s3Keys + ".");
149+
return null;
150+
});
151+
}
118152
}

src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStore.java

+20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package software.amazon.payloadoffloading;
22

3+
import java.util.Collection;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.stream.Collectors;
37
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
59

@@ -56,4 +60,20 @@ public void deleteOriginalPayload(String payloadPointer) {
5660
String s3Key = s3Pointer.getS3Key();
5761
s3Dao.deletePayloadFromS3(s3BucketName, s3Key);
5862
}
63+
64+
@Override
65+
public void deleteOriginalPayloads(Collection<String> payloadPointers) {
66+
// Sort by S3 bucket.
67+
Map<String, List<PayloadS3Pointer>> offloadedMessages = payloadPointers.stream()
68+
.map(PayloadS3Pointer::fromJson)
69+
.collect(Collectors.groupingBy(PayloadS3Pointer::getS3BucketName));
70+
71+
for (Map.Entry<String, List<PayloadS3Pointer>> bucket : offloadedMessages.entrySet()) {
72+
String s3BucketName = bucket.getKey();
73+
List<String> s3Keys = bucket.getValue().stream()
74+
.map(PayloadS3Pointer::getS3Key)
75+
.collect(Collectors.toList());
76+
s3Dao.deletePayloadsFromS3(s3BucketName, s3Keys);
77+
}
78+
}
5979
}

src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package software.amazon.payloadoffloading;
22

3+
import java.util.ArrayList;
4+
import java.util.Collection;
5+
import java.util.List;
6+
import java.util.Map;
37
import java.util.UUID;
48
import java.util.concurrent.CompletableFuture;
59
import java.util.concurrent.CompletionException;
10+
import java.util.stream.Collectors;
611
import org.slf4j.Logger;
712
import org.slf4j.LoggerFactory;
8-
import software.amazon.payloadoffloading.PayloadS3Pointer;
913

1014
/**
1115
* S3 based implementation for PayloadStoreAsync.
@@ -74,4 +78,34 @@ public CompletableFuture<Void> deleteOriginalPayload(String payloadPointer) {
7478
return futureEx;
7579
}
7680
}
81+
82+
@Override
83+
public CompletableFuture<Void> deleteOriginalPayloads(Collection<String> payloadPointers) {
84+
// Skip the delete if there are no payloads to delete.
85+
if (payloadPointers.isEmpty()) {
86+
return CompletableFuture.completedFuture(null);
87+
}
88+
89+
try {
90+
// Sort by S3 bucket.
91+
Map<String, List<PayloadS3Pointer>> offloadedMessages = payloadPointers.stream()
92+
.map(PayloadS3Pointer::fromJson)
93+
.collect(Collectors.groupingBy(PayloadS3Pointer::getS3BucketName));
94+
95+
List<CompletableFuture<Void>> deleteFutures = new ArrayList<>(offloadedMessages.size());
96+
for (Map.Entry<String, List<PayloadS3Pointer>> bucket : offloadedMessages.entrySet()) {
97+
String s3BucketName = bucket.getKey();
98+
List<String> s3Keys = bucket.getValue().stream()
99+
.map(PayloadS3Pointer::getS3Key)
100+
.collect(Collectors.toList());
101+
deleteFutures.add(s3Dao.deletePayloadsFromS3(s3BucketName, s3Keys));
102+
}
103+
104+
return CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0]));
105+
} catch (Exception e) {
106+
CompletableFuture<Void> futureEx = new CompletableFuture<>();
107+
futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e));
108+
return futureEx;
109+
}
110+
}
77111
}

src/main/java/software/amazon/payloadoffloading/S3Dao.java

+27
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
package software.amazon.payloadoffloading;
22

3+
import java.util.Collection;
4+
import java.util.stream.Collectors;
35
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
57
import software.amazon.awssdk.core.ResponseInputStream;
68
import software.amazon.awssdk.core.exception.SdkClientException;
79
import software.amazon.awssdk.core.exception.SdkException;
810
import software.amazon.awssdk.core.sync.RequestBody;
911
import software.amazon.awssdk.services.s3.S3Client;
12+
import software.amazon.awssdk.services.s3.model.Delete;
1013
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
14+
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
1115
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
1216
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
1317
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
18+
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
1419
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
1520
import software.amazon.awssdk.utils.IoUtils;
1621

@@ -104,4 +109,26 @@ public void deletePayloadFromS3(String s3BucketName, String s3Key) {
104109

105110
LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
106111
}
112+
113+
public void deletePayloadsFromS3(String s3BucketName, Collection<String> s3Keys) {
114+
try {
115+
DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder()
116+
.bucket(s3BucketName)
117+
.delete(Delete.builder()
118+
.objects(s3Keys.stream()
119+
.map(s3Key -> ObjectIdentifier.builder()
120+
.key(s3Key)
121+
.build())
122+
.collect(Collectors.toList()))
123+
.build())
124+
.build();
125+
s3Client.deleteObjects(deleteObjectsRequest);
126+
} catch (SdkException e) {
127+
String errorMessage = "Failed to delete the S3 object which contains the payload";
128+
LOG.error(errorMessage, e);
129+
throw SdkException.create(errorMessage, e);
130+
}
131+
132+
LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object keys: " + s3Keys + ".");
133+
}
107134
}

src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java

+73
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
import static org.mockito.Mockito.verifyNoInteractions;
1313
import static org.mockito.Mockito.when;
1414

15+
import java.util.ArrayList;
16+
import java.util.Arrays;
17+
import java.util.Collection;
18+
import java.util.Collections;
19+
import java.util.List;
1520
import java.util.concurrent.CompletableFuture;
1621
import java.util.concurrent.CompletionException;
1722
import org.junit.jupiter.api.BeforeEach;
@@ -22,8 +27,10 @@
2227

2328
public class S3BackedPayloadStoreAsyncTest {
2429
private static final String S3_BUCKET_NAME = "test-bucket-name";
30+
private static final String OTHER_S3_BUCKET_NAME = "other-bucket-name";
2531
private static final String ANY_PAYLOAD = "AnyPayload";
2632
private static final String ANY_S3_KEY = "AnyS3key";
33+
private static final String ANY_OTHER_S3_KEY = "AnyOtherS3key";
2734
private static final String INCORRECT_POINTER_EXCEPTION_MSG = "Failed to read the S3 object pointer from given string";
2835
private PayloadStoreAsync payloadStore;
2936
private S3AsyncDao s3AsyncDao;
@@ -175,4 +182,70 @@ public void testDeleteOriginalPayloadIncorrectPointer() {
175182
verifyNoInteractions(s3AsyncDao);
176183
}
177184

185+
@Test
186+
public void testDeleteOriginalPayloadsOnSuccess() {
187+
when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
188+
189+
List<String> payloadPointers = new ArrayList<>();
190+
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
191+
payloadStore.deleteOriginalPayloads(payloadPointers).join();
192+
193+
ArgumentCaptor<String> bucketNameCaptor = ArgumentCaptor.forClass(String.class);
194+
ArgumentCaptor<Collection> keyCaptor = ArgumentCaptor.forClass(Collection.class);
195+
verify(s3AsyncDao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
196+
197+
assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getValue());
198+
assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
199+
}
200+
201+
@Test
202+
public void testDeleteOriginalPayloadsSameBucket() {
203+
when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
204+
205+
List<String> payloadPointers = new ArrayList<>();
206+
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
207+
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson());
208+
payloadStore.deleteOriginalPayloads(payloadPointers).join();
209+
210+
ArgumentCaptor<String> bucketNameCaptor = ArgumentCaptor.forClass(String.class);
211+
ArgumentCaptor<Collection> keyCaptor = ArgumentCaptor.forClass(Collection.class);
212+
verify(s3AsyncDao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
213+
214+
assertEquals(Arrays.asList(ANY_S3_KEY, ANY_OTHER_S3_KEY), keyCaptor.getValue());
215+
assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
216+
}
217+
218+
@Test
219+
public void testDeleteOriginalPayloadsDifferentBuckets() {
220+
when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
221+
222+
List<String> payloadPointers = new ArrayList<>();
223+
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
224+
payloadPointers.add(new PayloadS3Pointer(OTHER_S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson());
225+
payloadStore.deleteOriginalPayloads(payloadPointers).join();
226+
227+
ArgumentCaptor<String> bucketNameCaptor = ArgumentCaptor.forClass(String.class);
228+
ArgumentCaptor<Collection> keyCaptor = ArgumentCaptor.forClass(Collection.class);
229+
verify(s3AsyncDao, times(2)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
230+
231+
assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getAllValues().get(0));
232+
assertEquals(Collections.singletonList(ANY_OTHER_S3_KEY), keyCaptor.getAllValues().get(1));
233+
assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(0));
234+
assertEquals(OTHER_S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(1));
235+
}
236+
237+
@Test
238+
public void testDeleteOriginalPayloadsIncorrectPointer() {
239+
List<String> payloadPointers = new ArrayList<>();
240+
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
241+
payloadPointers.add("IncorrectPointer");
242+
243+
CompletionException exception = assertThrows(CompletionException.class, () -> {
244+
payloadStore.deleteOriginalPayloads(payloadPointers).join();
245+
});
246+
247+
assertTrue(exception.getMessage().contains(INCORRECT_POINTER_EXCEPTION_MSG));
248+
verifyNoInteractions(s3AsyncDao);
249+
}
250+
178251
}

0 commit comments

Comments
 (0)