Skip to content

Commit 41e6322

Browse files
authored
feat: bump s3stream to 0.14.0 (#637)
Signed-off-by: Robin Han <[email protected]>
1 parent 8580d23 commit 41e6322

File tree

13 files changed

+40
-53
lines changed

13 files changed

+40
-53
lines changed

config/kraft/broker.properties

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,6 @@ s3.block.cache.size=104857600
167167
# The execution interval for stream object compaction, default 60 minutes
168168
s3.stream.object.compaction.interval.minutes=60
169169

170-
# The acceptable living time for stream object before being compacted, default 60 minutes
171-
s3.stream.object.compaction.living.time.minutes=60
172-
173170
# The maximum size of stream object allowed to be generated in stream compaction, default 10GB
174171
s3.stream.object.compaction.max.size.bytes=10737418240
175172

config/kraft/server.properties

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,6 @@ s3.block.cache.size=104857600
173173
# The execution interval for stream object compaction, default 60 minutes
174174
s3.stream.object.compaction.interval.minutes=60
175175

176-
# The acceptable living time for stream object before being compacted, default 60 minutes
177-
s3.stream.object.compaction.living.time.minutes=60
178-
179176
# The maximum size of stream object allowed to be generated in stream compaction, default 10GB
180177
s3.stream.object.compaction.max.size.bytes=10737418240
181178

core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public static Config to(KafkaConfig s) {
4747
.blockCacheSize(s.s3BlockCacheSize())
4848
.streamObjectCompactionIntervalMinutes(s.s3StreamObjectCompactionTaskIntervalMinutes())
4949
.streamObjectCompactionMaxSizeBytes(s.s3StreamObjectCompactionMaxSizeBytes())
50-
.streamObjectCompactionLivingTimeMinutes(s.s3StreamObjectCompactionLivingTimeMinutes())
5150
.controllerRequestRetryMaxCount(s.s3ControllerRequestRetryMaxCount())
5251
.controllerRequestRetryBaseDelayMs(s.s3ControllerRequestRetryBaseDelayMs())
5352
.streamSetObjectCompactionInterval(s.s3StreamSetObjectCompactionInterval())

core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ private List<GetObjectsTask> removePendingTasks() {
132132
}
133133
List<GetObjectsTask> retryTasks = new ArrayList<>();
134134
pendingStreamsOffsetRange.forEach(offsetRange -> {
135-
long streamId = offsetRange.getStreamId();
136-
long endOffset = offsetRange.getEndOffset();
135+
long streamId = offsetRange.streamId();
136+
long endOffset = offsetRange.endOffset();
137137
Map<Long, List<GetObjectsTask>> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.get(streamId);
138138
if (tasks == null || tasks.isEmpty()) {
139139
return;
@@ -170,8 +170,8 @@ public synchronized CompletableFuture<InRangeObjects> fetch(long streamId, long
170170
if (offsetRange == null || offsetRange == StreamOffsetRange.INVALID) {
171171
return CompletableFuture.completedFuture(InRangeObjects.INVALID);
172172
}
173-
long streamStartOffset = offsetRange.getStartOffset();
174-
long streamEndOffset = offsetRange.getEndOffset();
173+
long streamStartOffset = offsetRange.startOffset();
174+
long streamEndOffset = offsetRange.endOffset();
175175
if (startOffset < streamStartOffset) {
176176
LOGGER.warn(
177177
"[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}",

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,6 @@ object KafkaConfig {
709709
val S3BlockCacheSizeProp = "s3.block.cache.size"
710710
val S3StreamObjectCompactionIntervalMinutesProp = "s3.stream.object.compaction.interval.minutes"
711711
val S3StreamObjectCompactionMaxSizeBytesProp = "s3.stream.object.compaction.max.size.bytes"
712-
val S3StreamObjectCompactionLivingTimeMinutesProp = "s3.stream.object.compaction.living.time.minutes"
713712
val S3ControllerRequestRetryMaxCountProp = "s3.controller.request.retry.max.count"
714713
val S3ControllerRequestRetryBaseDelayMsProp = "s3.controller.request.retry.base.delay.ms"
715714
val S3StreamSetObjectCompactionIntervalProp = "s3.stream.set.object.compaction.interval.minutes"
@@ -758,7 +757,6 @@ object KafkaConfig {
758757
val S3BlockCacheSizeDoc = "The S3 block cache size in MiB."
759758
val S3StreamObjectCompactionIntervalMinutesDoc = "The S3 stream object compaction task interval in minutes."
760759
val S3StreamObjectCompactionMaxSizeBytesDoc = "The S3 stream object compaction max size in bytes."
761-
val S3StreamObjectCompactionLivingTimeMinutesDoc = "The S3 stream object compaction living time threshold in minutes."
762760
val S3ControllerRequestRetryMaxCountDoc = "The S3 controller request retry max count."
763761
val S3ControllerRequestRetryBaseDelayMsDoc = "The S3 controller request retry base delay in milliseconds."
764762
val S3StreamSetObjectCompactionIntervalDoc = "The execution interval of stream set object compaction in minutes."
@@ -1604,7 +1602,6 @@ object KafkaConfig {
16041602
.define(S3BlockCacheSizeProp, LONG, 104857600L, MEDIUM, S3BlockCacheSizeDoc)
16051603
.define(S3StreamObjectCompactionIntervalMinutesProp, INT, 60, MEDIUM, S3StreamObjectCompactionIntervalMinutesDoc)
16061604
.define(S3StreamObjectCompactionMaxSizeBytesProp, LONG, 10737418240L, MEDIUM, S3StreamObjectCompactionMaxSizeBytesDoc)
1607-
.define(S3StreamObjectCompactionLivingTimeMinutesProp, INT, 60, MEDIUM, S3StreamObjectCompactionLivingTimeMinutesDoc)
16081605
.define(S3ControllerRequestRetryMaxCountProp, INT, Integer.MAX_VALUE, MEDIUM, S3ControllerRequestRetryMaxCountDoc)
16091606
.define(S3ControllerRequestRetryBaseDelayMsProp, LONG, 500, MEDIUM, S3ControllerRequestRetryBaseDelayMsDoc)
16101607
.define(S3StreamSetObjectCompactionIntervalProp, INT, Defaults.S3StreamSetObjectCompactionInterval, MEDIUM, S3StreamSetObjectCompactionIntervalDoc)
@@ -2185,7 +2182,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
21852182
val s3BlockCacheSize = getLong(KafkaConfig.S3BlockCacheSizeProp)
21862183
val s3StreamObjectCompactionTaskIntervalMinutes = getInt(KafkaConfig.S3StreamObjectCompactionIntervalMinutesProp)
21872184
val s3StreamObjectCompactionMaxSizeBytes = getLong(KafkaConfig.S3StreamObjectCompactionMaxSizeBytesProp)
2188-
val s3StreamObjectCompactionLivingTimeMinutes = getInt(KafkaConfig.S3StreamObjectCompactionLivingTimeMinutesProp)
21892185
val s3ControllerRequestRetryMaxCount = getInt(KafkaConfig.S3ControllerRequestRetryMaxCountProp)
21902186
val s3ControllerRequestRetryBaseDelayMs = getLong(KafkaConfig.S3ControllerRequestRetryBaseDelayMsProp)
21912187
// TODO: ensure incremental epoch => Store epoch in disk, if timestamp flip back, we could use disk epoch to keep the incremental epoch.

gradle/dependencies.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ versions += [
128128
zookeeper: "3.6.3",
129129
zstd: "1.5.2-1",
130130
commonLang: "3.12.0",
131-
s3stream: "0.12.0-SNAPSHOT",
131+
s3stream: "0.14.0-SNAPSHOT",
132132
opentelemetry: "1.32.0",
133133
opentelemetryAlpha: "1.32.0-alpha",
134134
oshi: "6.4.7"

metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -421,8 +421,8 @@ public ControllerResult<TrimStreamResponse> trimStream(int nodeId, long nodeEpoc
421421
streamMetadata.streamObjects().entrySet().stream().forEach(it -> {
422422
Long objectId = it.getKey();
423423
S3StreamObject streamObject = it.getValue();
424-
long streamStartOffset = streamObject.streamOffsetRange().getStartOffset();
425-
long streamEndOffset = streamObject.streamOffsetRange().getEndOffset();
424+
long streamStartOffset = streamObject.streamOffsetRange().startOffset();
425+
long streamEndOffset = streamObject.streamOffsetRange().endOffset();
426426
if (newStartOffset <= streamStartOffset) {
427427
return;
428428
}
@@ -843,31 +843,31 @@ private Errors streamAdvanceCheck(List<StreamOffsetRange> ranges, int nodeId) {
843843
}
844844
for (StreamOffsetRange range : ranges) {
845845
// verify stream exist
846-
if (!this.streamsMetadata.containsKey(range.getStreamId())) {
847-
log.warn("[streamAdvanceCheck]: streamId={} not exist", range.getStreamId());
846+
if (!this.streamsMetadata.containsKey(range.streamId())) {
847+
log.warn("[streamAdvanceCheck]: streamId={} not exist", range.streamId());
848848
return Errors.STREAM_NOT_EXIST;
849849
}
850850
// check if this stream open
851-
if (this.streamsMetadata.get(range.getStreamId()).currentState() != StreamState.OPENED) {
852-
log.warn("[streamAdvanceCheck]: streamId={} not opened", range.getStreamId());
851+
if (this.streamsMetadata.get(range.streamId()).currentState() != StreamState.OPENED) {
852+
log.warn("[streamAdvanceCheck]: streamId={} not opened", range.streamId());
853853
return Errors.STREAM_NOT_OPENED;
854854
}
855-
RangeMetadata rangeMetadata = this.streamsMetadata.get(range.getStreamId()).currentRangeMetadata();
855+
RangeMetadata rangeMetadata = this.streamsMetadata.get(range.streamId()).currentRangeMetadata();
856856
if (rangeMetadata == null) {
857857
// should not happen
858858
log.error("[streamAdvanceCheck]: streamId={}'s current range={} not exist when stream has been ",
859-
range.getStreamId(), this.streamsMetadata.get(range.getStreamId()).currentRangeIndex());
859+
range.streamId(), this.streamsMetadata.get(range.streamId()).currentRangeIndex());
860860
return Errors.STREAM_INNER_ERROR;
861861
} else if (rangeMetadata.nodeId() != nodeId) {
862862
// should not happen
863863
log.error("[streamAdvanceCheck]: streamId={}'s current range node id not match expected nodeId={} but nodeId={}",
864-
range.getStreamId(), rangeMetadata.nodeId(), nodeId);
864+
range.streamId(), rangeMetadata.nodeId(), nodeId);
865865
return Errors.STREAM_INNER_ERROR;
866866
}
867-
if (rangeMetadata.endOffset() != range.getStartOffset()) {
867+
if (rangeMetadata.endOffset() != range.startOffset()) {
868868
log.warn("[streamAdvanceCheck]: streamId={}'s current range={}'s end offset {} is not equal to request start offset {}",
869-
range.getStreamId(), this.streamsMetadata.get(range.getStreamId()).currentRangeIndex(),
870-
rangeMetadata.endOffset(), range.getStartOffset());
869+
range.streamId(), this.streamsMetadata.get(range.streamId()).currentRangeIndex(),
870+
rangeMetadata.endOffset(), range.startOffset());
871871
return Errors.OFFSET_NOT_MATCHED;
872872
}
873873
}

metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ public List<S3StreamObject> getStreamObjects(long streamId, long startOffset, lo
126126
return Collections.emptyList();
127127
}
128128
return streamObjectsMetadata.values().stream().filter(obj -> {
129-
long objectStartOffset = obj.streamOffsetRange().getStartOffset();
130-
long objectEndOffset = obj.streamOffsetRange().getEndOffset();
129+
long objectStartOffset = obj.streamOffsetRange().startOffset();
130+
long objectEndOffset = obj.streamOffsetRange().endOffset();
131131
return objectStartOffset < endOffset && objectEndOffset > startOffset;
132132
}).sorted(Comparator.comparing(S3StreamObject::streamOffsetRange)).limit(limit).collect(Collectors.toCollection(ArrayList::new));
133133
}
@@ -189,14 +189,14 @@ private Queue<S3ObjectMetadataWrapper> rangeOfStreamSetObjects() {
189189
continue;
190190
}
191191
StreamOffsetRange range = ranges.get(index);
192-
if (range.getStartOffset() >= endOffset || range.getEndOffset() < startOffset) {
192+
if (range.startOffset() >= endOffset || range.endOffset() < startOffset) {
193193
continue;
194194
}
195195
S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(
196196
obj.objectId(), obj.objectType(), ranges, obj.dataTimeInMs(),
197197
obj.orderId());
198-
s3ObjectMetadataList.add(new S3ObjectMetadataWrapper(s3ObjectMetadata, range.getStartOffset(), range.getEndOffset()));
199-
if (range.getEndOffset() >= endOffset) {
198+
s3ObjectMetadataList.add(new S3ObjectMetadataWrapper(s3ObjectMetadata, range.startOffset(), range.endOffset()));
199+
if (range.endOffset() >= endOffset) {
200200
break;
201201
}
202202
}
@@ -209,12 +209,12 @@ private Queue<S3ObjectMetadataWrapper> rangeOfStreamObjects() {
209209
// TODO: refactor to make stream objects in order
210210
if (streamObjectsMetadata != null && !streamObjectsMetadata.isEmpty()) {
211211
return streamObjectsMetadata.values().stream().filter(obj -> {
212-
long objectStartOffset = obj.streamOffsetRange().getStartOffset();
213-
long objectEndOffset = obj.streamOffsetRange().getEndOffset();
212+
long objectStartOffset = obj.streamOffsetRange().startOffset();
213+
long objectEndOffset = obj.streamOffsetRange().endOffset();
214214
return objectStartOffset < endOffset && objectEndOffset > startOffset;
215215
}).sorted(Comparator.comparing(S3StreamObject::streamOffsetRange)).map(obj -> {
216-
long startOffset = obj.streamOffsetRange().getStartOffset();
217-
long endOffset = obj.streamOffsetRange().getEndOffset();
216+
long startOffset = obj.streamOffsetRange().startOffset();
217+
long endOffset = obj.streamOffsetRange().endOffset();
218218
S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(
219219
obj.objectId(), obj.objectType(), List.of(obj.streamOffsetRange()), obj.dataTimeInMs());
220220
return new S3ObjectMetadataWrapper(s3ObjectMetadata, startOffset, endOffset);
@@ -359,12 +359,12 @@ protected ComparableItem<Long> get(int index) {
359359
return new ComparableItem<>() {
360360
@Override
361361
public boolean isLessThan(Long o) {
362-
return range.getStreamId() < o;
362+
return range.streamId() < o;
363363
}
364364

365365
@Override
366366
public boolean isGreaterThan(Long o) {
367-
return range.getStreamId() > o;
367+
return range.streamId() > o;
368368
}
369369
};
370370
}

metadata/src/main/java/org/apache/kafka/metadata/stream/Convertor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
public class Convertor {
2424
public static S3StreamSetObjectRecord.StreamIndex to(StreamOffsetRange s) {
2525
return new S3StreamSetObjectRecord.StreamIndex()
26-
.setStreamId(s.getStreamId())
27-
.setStartOffset(s.getStartOffset())
28-
.setEndOffset(s.getEndOffset());
26+
.setStreamId(s.streamId())
27+
.setStartOffset(s.startOffset())
28+
.setEndOffset(s.endOffset());
2929
}
3030

3131
public static StreamOffsetRange to(S3StreamSetObjectRecord.StreamIndex s) {

metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ public long dataTimeInMs() {
5555
public ApiMessageAndVersion toRecord() {
5656
return new ApiMessageAndVersion(new S3StreamObjectRecord()
5757
.setObjectId(objectId)
58-
.setStreamId(streamOffsetRange.getStreamId())
59-
.setStartOffset(streamOffsetRange.getStartOffset())
60-
.setEndOffset(streamOffsetRange.getEndOffset())
58+
.setStreamId(streamOffsetRange.streamId())
59+
.setStartOffset(streamOffsetRange.startOffset())
60+
.setEndOffset(streamOffsetRange.endOffset())
6161
.setDataTimeInMs(dataTimeInMs), (short) 0);
6262
}
6363

0 commit comments

Comments
 (0)