Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(s3stream): create S3Operator with credential providers #898

Merged
merged 3 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.17.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.18.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.17.0-SNAPSHOT</version>
<version>0.18.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
20 changes: 0 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ public class Config {
private String region;
private String bucket;
private boolean forcePathStyle = false;
private String accessKey;
private String secretKey;
private String walPath = "/tmp/s3stream_wal";
private long walCacheSize = 200 * 1024 * 1024;
private long walCapacity = 1024L * 1024 * 1024;
Expand Down Expand Up @@ -207,14 +205,6 @@ public boolean objectLogEnable() {
return objectLogEnable;
}

public String accessKey() {
return accessKey;
}

public String secretKey() {
return secretKey;
}

public long networkBaselineBandwidth() {
return networkBaselineBandwidth;
}
Expand Down Expand Up @@ -403,16 +393,6 @@ public Config objectLogEnable(boolean s3ObjectLogEnable) {
return this;
}

public Config accessKey(String s3AccessKey) {
this.accessKey = s3AccessKey;
return this;
}

public Config secretKey(String s3SecretKey) {
this.secretKey = s3SecretKey;
return this;
}

public Config networkBaselineBandwidth(long networkBaselineBandwidth) {
this.networkBaselineBandwidth = networkBaselineBandwidth;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
Expand Down Expand Up @@ -113,20 +113,20 @@ public class DefaultS3Operator implements S3Operator {
private final HashedWheelTimer timeoutDetect = new HashedWheelTimer(
ThreadUtils.createThreadFactory("s3-timeout-detect", true), 1, TimeUnit.SECONDS, 100);

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey,
String secretKey) {
this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null, false);
public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders) {
this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false);
}

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey,
String secretKey,
public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter,
AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) {
this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate();
this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter;
this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter;
this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey);
this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey) : writeS3Client;
this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, credentialsProviders);
this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, credentialsProviders) : writeS3Client;
this.inflightWriteLimiter = new Semaphore(50);
this.inflightReadLimiter = readWriteIsolate ? new Semaphore(50) : inflightWriteLimiter;
this.bucket = bucket;
Expand All @@ -137,8 +137,7 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
.setRegion(region)
.setBucketName(bucket)
.setForcePathStyle(forcePathStyle)
.setAccessKey(accessKey)
.setSecretKey(secretKey)
.setCredentialsProviders(credentialsProviders)
.build();
LOGGER.info("You are using s3Context: {}", s3Context);
checkAvailable(s3Context);
Expand Down Expand Up @@ -649,26 +648,30 @@ private void checkAvailable(S3Utils.S3Context s3Context) {
}
}

public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, String accessKey,
String secretKey) {
public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders) {
S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
if (StringUtils.isNotBlank(endpoint)) {
builder.endpointOverride(URI.create(endpoint));
}
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
builder.credentialsProvider(AwsCredentialsProviderChain.builder()
.reuseLastProviderEnabled(true)
.credentialsProviders(
() -> AwsBasicCredentials.create(accessKey, secretKey),
InstanceProfileCredentialsProvider.create(),
AnonymousCredentialsProvider.create()
).build()
);
builder.credentialsProvider(newCredentialsProviderChain(credentialsProviders));
builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1))
.apiCallAttemptTimeout(Duration.ofSeconds(30)));
return builder.build();
}

private AwsCredentialsProvider newCredentialsProviderChain(List<AwsCredentialsProvider> credentialsProviders) {
List<AwsCredentialsProvider> providers = new ArrayList<>(credentialsProviders);
// Add default providers to the end of the chain
providers.add(InstanceProfileCredentialsProvider.create());
providers.add(AnonymousCredentialsProvider.create());
return AwsCredentialsProviderChain.builder()
.reuseLastProviderEnabled(true)
.credentialsProviders(providers)
.build();
}

/**
* Acquire read permit, permit will auto release when cf complete.
*
Expand Down Expand Up @@ -813,8 +816,7 @@ public static class Builder {
private String region;
private String bucket;
private boolean forcePathStyle;
private String accessKey;
private String secretKey;
private List<AwsCredentialsProvider> credentialsProviders;
private AsyncNetworkBandwidthLimiter inboundLimiter;
private AsyncNetworkBandwidthLimiter outboundLimiter;
private boolean readWriteIsolate;
Expand All @@ -839,13 +841,8 @@ public Builder forcePathStyle(boolean forcePathStyle) {
return this;
}

public Builder accessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}

public Builder secretKey(String secretKey) {
this.secretKey = secretKey;
public Builder credentialsProviders(List<AwsCredentialsProvider> credentialsProviders) {
this.credentialsProviders = credentialsProviders;
return this;
}

Expand All @@ -865,7 +862,7 @@ public Builder readWriteIsolate(boolean readWriteIsolate) {
}

public DefaultS3Operator build() {
return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, accessKey, secretKey,
return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, credentialsProviders,
inboundLimiter, outboundLimiter, readWriteIsolate);
}
}
Expand Down
57 changes: 21 additions & 36 deletions s3stream/src/main/java/com/automq/stream/utils/S3Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
Expand Down Expand Up @@ -84,35 +85,25 @@ private static String range(long start, long end) {
}

private static S3AsyncClient newS3AsyncClient(String endpoint, String region, boolean forcePathStyle,
String accessKey, String secretKey) {
List<AwsCredentialsProvider> credentialsProviders) {
S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
if (StringUtils.isNotBlank(endpoint)) {
builder.endpointOverride(URI.create(endpoint));
}
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
builder.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
builder.credentialsProvider(AwsCredentialsProviderChain.builder().credentialsProviders(credentialsProviders).build());
builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1))
.apiCallAttemptTimeout(Duration.ofSeconds(30)));
return builder.build();
}

private static String hideSecret(String secret) {
if (secret == null) {
return null;
}
if (secret.length() < 6) {
return "*".repeat(secret.length());
}
return secret.substring(0, 3) + "*".repeat(secret.length() - 6) + secret.substring(secret.length() - 3);
}

private static abstract class S3CheckTask implements AutoCloseable {
protected final S3AsyncClient client;
protected final String bucketName;
private final String taskName;

public S3CheckTask(S3Context context, String taskName) {
this.client = newS3AsyncClient(context.endpoint, context.region, context.forcePathStyle, context.accessKey, context.secretKey);
this.client = newS3AsyncClient(context.endpoint, context.region, context.forcePathStyle, context.credentialsProviders);
this.bucketName = context.bucketName;
this.taskName = taskName;
}
Expand Down Expand Up @@ -363,17 +354,16 @@ public void close() {

public static class S3Context {
private final String endpoint;
private final String accessKey;
private final String secretKey;
private final List<AwsCredentialsProvider> credentialsProviders;
private final String bucketName;
private final String region;
private final boolean forcePathStyle;

public S3Context(String endpoint, String accessKey, String secretKey, String bucketName, String region,
public S3Context(String endpoint, List<AwsCredentialsProvider> credentialsProviders, String bucketName,
String region,
boolean forcePathStyle) {
this.endpoint = endpoint;
this.accessKey = accessKey;
this.secretKey = secretKey;
this.credentialsProviders = credentialsProviders;
this.bucketName = bucketName;
this.region = region;
this.forcePathStyle = forcePathStyle;
Expand Down Expand Up @@ -406,11 +396,13 @@ public List<String> advices() {
}
}
}
if (StringUtils.isBlank(accessKey)) {
advises.add("accessKey is blank. Please supply a valid accessKey.");
if (credentialsProviders == null || credentialsProviders.isEmpty()) {
advises.add("no credentials provider is supplied. Please supply a credentials provider.");
}
if (StringUtils.isBlank(secretKey)) {
advises.add("secretKey is blank. Please supply a valid secretKey.");
try (AwsCredentialsProviderChain chain = AwsCredentialsProviderChain.builder().credentialsProviders(credentialsProviders).build()) {
chain.resolveCredentials();
} catch (SdkClientException e) {
advises.add("all provided credentials providers are invalid. Please supply a valid credentials provider. Error msg: " + e.getMessage());
}
if (StringUtils.isBlank(region)) {
advises.add("region is blank. Please supply a valid region.");
Expand All @@ -425,8 +417,7 @@ public List<String> advices() {
public String toString() {
return "S3CheckContext{" +
"endpoint='" + endpoint + '\'' +
", accessKey='" + hideSecret(accessKey) + '\'' +
", secretKey='" + hideSecret(secretKey) + '\'' +
", credentialsProviders=" + credentialsProviders +
", bucketName='" + bucketName + '\'' +
", region='" + region + '\'' +
", forcePathStyle=" + forcePathStyle +
Expand All @@ -435,8 +426,7 @@ public String toString() {

public static class Builder {
private String endpoint;
private String accessKey;
private String secretKey;
private List<AwsCredentialsProvider> credentialsProviders;
private String bucketName;
private String region;
private boolean forcePathStyle;
Expand All @@ -446,13 +436,8 @@ public Builder setEndpoint(String endpoint) {
return this;
}

public Builder setAccessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}

public Builder setSecretKey(String secretKey) {
this.secretKey = secretKey;
public Builder setCredentialsProviders(List<AwsCredentialsProvider> credentialsProviders) {
this.credentialsProviders = credentialsProviders;
return this;
}

Expand All @@ -472,7 +457,7 @@ public Builder setForcePathStyle(boolean forcePathStyle) {
}

public S3Context build() {
return new S3Context(endpoint, accessKey, secretKey, bucketName, region, forcePathStyle);
return new S3Context(endpoint, credentialsProviders, bucketName, region, forcePathStyle);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.operator.DefaultS3Operator;
import com.automq.stream.s3.operator.S3Operator;
import java.util.List;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;

import static com.automq.rocketmq.store.MessageStoreImpl.KV_NAMESPACE_CHECK_POINT;

Expand All @@ -64,7 +66,7 @@

// S3 object manager, such as trim expired messages, etc.
S3Operator operator = new DefaultS3Operator(s3StreamConfig.s3Endpoint(), s3StreamConfig.s3Region(), s3StreamConfig.s3Bucket(),
s3StreamConfig.s3ForcePathStyle(), s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey());
s3StreamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey())));

Check warning on line 69 in store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java#L69

Added line #L69 was not covered by tests
S3ObjectOperator objectOperator = new S3ObjectOperatorImpl(operator);

TransactionService transactionService = new TransactionService(storeConfig, timerService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;

public class S3StreamStore implements StreamStore {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamStore.class);
Expand Down Expand Up @@ -92,7 +93,8 @@
}

S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);

Check warning on line 97 in store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java#L96-L97

Added lines #L96 - L97 were not covered by tests

WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.walPath(), s3Config.walCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config, objectManager, defaultOperator);
Expand All @@ -102,7 +104,8 @@

// Build the compaction manager
S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);

Check warning on line 108 in store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java#L107-L108

Added lines #L107 - L108 were not covered by tests
this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator);

this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter);
Expand Down Expand Up @@ -255,8 +258,6 @@
config.bucket(streamConfig.s3Bucket());
config.forcePathStyle(streamConfig.s3ForcePathStyle());
config.walPath(streamConfig.s3WALPath());
config.accessKey(streamConfig.s3AccessKey());
config.secretKey(streamConfig.s3SecretKey());
config.networkBaselineBandwidth(streamConfig.networkBaselineBandwidth());
config.refillPeriodMs(streamConfig.refillPeriodMs());

Expand Down