Skip to content

Commit

Permalink
chore: bump s3stream to 0.18.0
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Jan 18, 2024
1 parent 204e201 commit dc789cc
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.16.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.18.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.17.0-SNAPSHOT</version>
<version>0.18.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.operator.DefaultS3Operator;
import com.automq.stream.s3.operator.S3Operator;
import java.util.List;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;

import static com.automq.rocketmq.store.MessageStoreImpl.KV_NAMESPACE_CHECK_POINT;

Expand All @@ -64,7 +66,7 @@ public static MessageStoreImpl build(StoreConfig storeConfig, S3StreamConfig s3S

// S3 object manager, such as trim expired messages, etc.
S3Operator operator = new DefaultS3Operator(s3StreamConfig.s3Endpoint(), s3StreamConfig.s3Region(), s3StreamConfig.s3Bucket(),
s3StreamConfig.s3ForcePathStyle(), s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey());
s3StreamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey())));
S3ObjectOperator objectOperator = new S3ObjectOperatorImpl(operator);

TransactionService transactionService = new TransactionService(storeConfig, timerService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;

public class S3StreamStore implements StreamStore {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamStore.class);
Expand Down Expand Up @@ -92,7 +93,8 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store
}

S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);

WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.walPath(), s3Config.walCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config, objectManager, defaultOperator);
Expand All @@ -102,7 +104,8 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store

// Build the compaction manager
S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);
this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator);

this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter);
Expand Down Expand Up @@ -255,8 +258,6 @@ private Config configFrom(S3StreamConfig streamConfig) {
config.bucket(streamConfig.s3Bucket());
config.forcePathStyle(streamConfig.s3ForcePathStyle());
config.walPath(streamConfig.s3WALPath());
config.accessKey(streamConfig.s3AccessKey());
config.secretKey(streamConfig.s3SecretKey());
config.networkBaselineBandwidth(streamConfig.networkBaselineBandwidth());
config.refillPeriodMs(streamConfig.refillPeriodMs());

Expand Down

0 comments on commit dc789cc

Please sign in to comment.