Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -35,13 +33,13 @@
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.ObjectStorageClass;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.StorageClass;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Map;

Expand Down Expand Up @@ -163,10 +161,10 @@ private void safeMove(
targetS3Path
);
try {
selfCheckingMove(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg);
moveObject(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg);
return null;
}
catch (S3Exception | IOException | SegmentLoadingException e) {
catch (S3Exception | SegmentLoadingException e) {
log.info(e, "Error while trying to move " + copyMsg);
throw e;
}
Expand All @@ -185,90 +183,75 @@ private void safeMove(
}

/**
* Copies an object and after that checks that the object is present at the target location, via a separate API call.
* If it is not, an exception is thrown, and the object is not deleted at the old location. This "paranoic" check
* is added after it was observed that S3 may report a successful move, and the object is not found at the target
* location.
* Copies an S3 object to a target location and deletes the source.
* S3 has been strongly consistent since December 2020, so no post-copy existence check is needed.
*/
private void selfCheckingMove(
private void moveObject(
String s3Bucket,
String targetS3Bucket,
String s3Path,
String targetS3Path,
String copyMsg
) throws IOException, SegmentLoadingException
) throws SegmentLoadingException
{
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
return;
}
final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
if (s3Client.doesObjectExist(s3Bucket, s3Path)) {
ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(s3Bucket)
.prefix(s3Path)
.maxKeys(1)
.build();
final ListObjectsV2Response listResult = s3Client.listObjectsV2(request);
// Using contents().size() instead of keyCount as, in some cases
// it is observed that even though the contents returns some data
// keyCount is still zero.
if (listResult.contents().size() == 0) {
// should never happen
throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path);
}
final S3Object objectSummary = listResult.contents().get(0);
if (objectSummary.storageClass() != null &&
objectSummary.storageClass().equals(ObjectStorageClass.GLACIER)) {
throw S3Exception.builder()
.message(StringUtils.format(
"Cannot move file[s3://%s/%s] of storage class glacier, skipping.",
s3Bucket,
s3Path
))
.build();
} else {
log.info("Moving file %s", copyMsg);
CopyObjectRequest.Builder copyRequestBuilder = CopyObjectRequest.builder()
.sourceBucket(s3Bucket)
.sourceKey(s3Path)
.destinationBucket(targetS3Bucket)
.destinationKey(targetS3Path);
if (!config.getDisableAcl()) {
final String headerValue = S3Utils.grantFullControlHeaderValue(
S3Utils.grantFullControlToBucketOwner(s3Client, targetS3Bucket)
final HeadObjectResponse sourceMetadata;
try {
sourceMetadata = s3Client.getObjectMetadata(s3Bucket, s3Path);
}
catch (S3Exception e) {
if (e.statusCode() == 404 && "NoSuchKey".equals(S3Utils.getS3ErrorCode(e))) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Treat any HEAD 404 as a missing source

The idempotent already-moved path now only runs when headObject fails with status 404 and error code NoSuchKey. S3 HEAD failures for absent objects can surface as a generic 404/NotFound response, and other Druid S3 code handles missing HEAD results by status alone. In that case this code rethrows before checking whether the target object already exists, so a retry or competing move where the source was deleted after a successful copy can incorrectly fail instead of returning the moved segment. Please key this fallback off statusCode() == 404 rather than requiring NoSuchKey.

// Source is gone; succeed silently if it already landed at the target.
if (s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) {
log.info(
"Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]",
s3Bucket,
s3Path,
targetS3Bucket,
targetS3Path
);
if (headerValue != null) {
copyRequestBuilder.grantFullControl(headerValue);
}
}
s3Client.copyObject(copyRequestBuilder);
if (!s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) {
throw new IOE(
"After copy was reported as successful the file doesn't exist in the target location [%s]",
} else {
throw new SegmentLoadingException(
"Unable to move file %s, not present in either source or target location",
copyMsg
);
}
deleteWithRetriesSilent(s3Bucket, s3Path);
log.debug("Finished moving file %s", copyMsg);
return;
}
} else {
// ensure object exists in target location
if (s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) {
log.info(
"Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]",
s3Bucket,
s3Path,
targetS3Bucket,
targetS3Path
);
} else {
throw new SegmentLoadingException(
"Unable to move file %s, not present in either source or target location",
copyMsg
);
throw e;
}
final StorageClass sc = sourceMetadata.storageClass();
if (StorageClass.GLACIER.equals(sc) || StorageClass.DEEP_ARCHIVE.equals(sc)) {
throw S3Exception.builder()
.message(StringUtils.format(
"Cannot move file[s3://%s/%s] of storage class [%s], skipping.",
s3Bucket,
s3Path,
sc
))
.build();
}
log.info("Moving file %s", copyMsg);
CopyObjectRequest.Builder copyRequestBuilder = CopyObjectRequest.builder()
.sourceBucket(s3Bucket)
.sourceKey(s3Path)
.destinationBucket(targetS3Bucket)
.destinationKey(targetS3Path);
if (!config.getDisableAcl()) {
final String headerValue = S3Utils.grantFullControlHeaderValue(
s3Client.getBucketOwnerGrant(targetS3Bucket)
);
if (headerValue != null) {
copyRequestBuilder.grantFullControl(headerValue);
}
}
s3Client.copyObject(copyRequestBuilder);
deleteWithRetriesSilent(s3Bucket, s3Path);
log.debug("Finished moving file %s", copyMsg);
}

private void deleteWithRetriesSilent(final String s3Bucket, final String s3Path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ FileUtils.FileCopyResult getSegmentFiles(final CloudObjectLocation s3Coords, fin
FileUtils.mkdirp(outDir);

if (CompressionUtils.isZip(s3Coords.getPath())) {
if (!isObjectInBucket(s3Coords)) {
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
}
final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME);
final ByteSource byteSource = getByteSource(uri);
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
Expand All @@ -101,9 +98,6 @@ FileUtils.FileCopyResult getSegmentFiles(final CloudObjectLocation s3Coords, fin
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath());
return result;
} else if (CompressionUtils.isGz(s3Coords.getPath())) {
if (!isObjectInBucket(s3Coords)) {
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
}
final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME);
final ByteSource byteSource = getByteSource(uri);
final String fname = Files.getNameWithoutExtension(uri.getPath());
Expand Down Expand Up @@ -235,6 +229,9 @@ public void close() throws IOException
};
}
catch (SdkException e) {
if (e instanceof S3Exception && ((S3Exception) e).statusCode() == 404) {
throw new IOE(e, "IndexFile[s3://%s/%s] does not exist", coords.getBucket(), coords.getPath());
}
throw new IOE(e, "Could not load S3 URI [%s]", uri);
}
}
Expand Down Expand Up @@ -318,18 +315,4 @@ public String getVersion(URI uri) throws IOException
}
}

private boolean isObjectInBucket(final CloudObjectLocation coords) throws SegmentLoadingException
{
try {
return S3Utils.retryS3Operation(
() -> S3Utils.isObjectInBucketIgnoringPermission(s3Client, coords.getBucket(), coords.getPath())
);
}
catch (S3Exception | IOException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
import software.amazon.awssdk.services.s3.model.Grantee;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.Permission;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.Type;

import javax.annotation.Nullable;
import java.io.File;
Expand Down Expand Up @@ -240,20 +238,6 @@ static String constructSegmentBasePath(String baseKey, String storageDir)
) + "/";
}

static Grant grantFullControlToBucketOwner(ServerSideEncryptingAmazonS3 s3Client, String bucket)
{
final String ownerId = s3Client.getBucketAcl(bucket).owner().id();
return Grant
.builder()
.grantee(Grantee
.builder()
.type(Type.CANONICAL_USER)
.id(ownerId)
.build())
.permission(Permission.FULL_CONTROL)
.build();
}

/**
* Builds the header value for {@code x-amz-grant-full-control}.
*
Expand Down Expand Up @@ -427,7 +411,7 @@ static void uploadFileIfPossible(
)
{
log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
service.upload(bucket, key, file, disableAcl ? null : S3Utils.grantFullControlToBucketOwner(service, bucket));
service.upload(bucket, key, file, disableAcl ? null : service.getBucketOwnerGrant(bucket));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.Grant;
import software.amazon.awssdk.services.s3.model.Grantee;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.Permission;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.Type;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
Expand All @@ -56,6 +59,7 @@
import javax.annotation.Nullable;
import java.io.File;
import java.io.InputStream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
Expand All @@ -81,6 +85,9 @@ public static Builder builder()
private final S3TransferManager transferManager;
@Nullable
private final S3AsyncClient s3AsyncClient;
// Bucket ownership is non-transferable in S3, so the owner ID (and therefore the Grant) is stable
// for the lifetime of the bucket. Cache the Grant to avoid a GetBucketAcl call per upload.
private final ConcurrentHashMap<String, Grant> bucketOwnerGrantCache = new ConcurrentHashMap<>();

public ServerSideEncryptingAmazonS3(
S3Client s3Client,
Expand Down Expand Up @@ -139,6 +146,17 @@ public GetBucketAclResponse getBucketAcl(String bucket)
return s3Client.getBucketAcl(builder -> builder.bucket(bucket));
}

public Grant getBucketOwnerGrant(String bucket)
{
return bucketOwnerGrantCache.computeIfAbsent(bucket, b -> {
final String ownerId = getBucketAcl(b).owner().id();
return Grant.builder()
.grantee(Grantee.builder().type(Type.CANONICAL_USER).id(ownerId).build())
.permission(Permission.FULL_CONTROL)
.build();
});
}

public HeadObjectResponse getObjectMetadata(String bucket, String key)
{
HeadObjectRequest.Builder requestBuilder = HeadObjectRequest.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.services.s3.model.GetBucketAclResponse;
import software.amazon.awssdk.services.s3.model.Grant;
import software.amazon.awssdk.services.s3.model.Grantee;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.Owner;
Expand Down Expand Up @@ -250,6 +251,18 @@ public boolean doesObjectExist(String bucketName, String objectKey)
return (objects != null && objects.contains(objectKey));
}

@Override
public HeadObjectResponse getObjectMetadata(String bucketName, String key)
{
if (doesObjectExist(bucketName, key)) {
return HeadObjectResponse.builder().storageClass(StorageClass.STANDARD).build();
}
throw (S3Exception) S3Exception.builder()
.awsErrorDetails(AwsErrorDetails.builder().errorCode("NoSuchKey").build())
.statusCode(404)
.build();
}

@Override
public ListObjectsV2Response listObjectsV2(ListObjectsV2Request listObjectsV2Request)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ public void testGZUncompress() throws IOException, SegmentLoadingException

final File tmpDir = temporaryFolder.newFolder("gzTestDir");

EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(bucket), EasyMock.eq(key)))
.andReturn(true)
.once();
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(key)))
.andAnswer(() -> new ResponseInputStream<>(
GetObjectResponse.builder().lastModified(Instant.ofEpochMilli(0)).build(),
Expand Down Expand Up @@ -152,9 +149,6 @@ public void testGZUncompressOn4xxError() throws IOException
.build())
.statusCode(404)
.build();
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(bucket), EasyMock.eq(key)))
.andReturn(true)
.once();
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(key)))
.andThrow(exception)
.once();
Expand Down Expand Up @@ -201,9 +195,6 @@ public void testGZUncompressOn5xxError() throws IOException, SegmentLoadingExcep
.build())
.statusCode(503)
.build();
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(bucket), EasyMock.eq(key)))
.andReturn(true)
.once();
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(key)))
.andThrow(exception)
.once();
Expand Down
Loading
Loading