Skip to content

Commit a9c79a3

Browse files
committed
redo implementation to use RecordsSnapshotWriter
1 parent 25de320 commit a9c79a3

File tree

10 files changed

+27
-23
lines changed

10 files changed

+27
-23
lines changed

core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ class DumpLogSegmentsTest {
593593
)
594594

595595
val lastContainedLogTimestamp = 10000
596+
val emptyOptional: Optional[java.util.List[ApiMessageAndVersion]] = Optional.empty()
596597

597598
Using.resource(
598599
new RecordsSnapshotWriter.Builder()
@@ -601,7 +602,7 @@ class DumpLogSegmentsTest {
601602
.setRawSnapshotWriter(metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)).get)
602603
.setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
603604
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true))))
604-
.build(MetadataRecordSerde.INSTANCE)
605+
.build(MetadataRecordSerde.INSTANCE, emptyOptional)
605606
) { snapshotWriter =>
606607
snapshotWriter.append(metadataRecords)
607608
snapshotWriter.freeze()

metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -437,14 +437,11 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws Exception {
437437
directoryTypes.get(writeLogDir).description(), writeLogDir,
438438
MetadataVersion.FEATURE_NAME, releaseVersion);
439439
Files.createDirectories(Paths.get(writeLogDir));
440-
BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(writeLogDir);
441-
bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
442-
if (directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
443-
writeDynamicQuorumSnapshot(writeLogDir,
440+
writeBoostrapSnapshot(writeLogDir,
441+
bootstrapMetadata,
444442
initialControllers.get(),
445443
featureLevels.get(KRaftVersion.FEATURE_NAME),
446444
controllerListenerName);
447-
}
448445
});
449446
copier.setWriteErrorHandler((errorLogDir, e) -> {
450447
throw new FormatterException("Error while writing meta.properties file " +
@@ -492,8 +489,9 @@ static DirectoryType calculate(
492489
}
493490
}
494491

495-
static void writeDynamicQuorumSnapshot(
492+
static void writeBoostrapSnapshot(
496493
String writeLogDir,
494+
BootstrapMetadata bootstrapMetadata,
497495
DynamicVoters initialControllers,
498496
short kraftVersion,
499497
String controllerListenerName
@@ -502,6 +500,7 @@ static void writeDynamicQuorumSnapshot(
502500
File clusterMetadataDirectory = new File(parentDir, String.format("%s-%d",
503501
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
504502
CLUSTER_METADATA_TOPIC_PARTITION.partition()));
503+
505504
VoterSet voterSet = initialControllers.toVoterSet(controllerListenerName);
506505
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder().
507506
setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
@@ -511,8 +510,9 @@ static void writeDynamicQuorumSnapshot(
511510
Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
512511
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
513512
setVoterSet(Optional.of(voterSet));
514-
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde())) {
513+
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde(), Optional.of(bootstrapMetadata.records()))) {
515514
writer.freeze();
516515
}
516+
517517
}
518518
}

metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(
729729
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
730730
.setTime(new MockTime())
731731
.setRawSnapshotWriter(createNewSnapshot(snapshotId))
732-
.build(new MetadataRecordSerde())
732+
.build(new MetadataRecordSerde(), Optional.empty())
733733
);
734734
}
735735

raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3726,7 +3726,7 @@ public Optional<SnapshotWriter<T>> createSnapshot(
37263726
.setRawSnapshotWriter(wrappedWriter)
37273727
.setKraftVersion(partitionState.kraftVersionAtOffset(lastContainedLogOffset))
37283728
.setVoterSet(partitionState.voterSetAtOffset(lastContainedLogOffset))
3729-
.build(serde);
3729+
.build(serde, Optional.empty());
37303730
});
37313731
}
37323732

raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.kafka.raft.VoterSet;
3131
import org.apache.kafka.raft.internals.BatchAccumulator;
3232
import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
33+
import org.apache.kafka.server.common.ApiMessageAndVersion;
3334
import org.apache.kafka.server.common.KRaftVersion;
3435
import org.apache.kafka.server.common.OffsetAndEpoch;
3536
import org.apache.kafka.server.common.serialization.RecordSerde;
@@ -191,7 +192,7 @@ public Builder setVoterSet(Optional<VoterSet> voterSet) {
191192
return this;
192193
}
193194

194-
public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {
195+
public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde, Optional<List<T>> bootstrapRecords) {
195196
if (rawSnapshotWriter.isEmpty()) {
196197
throw new IllegalStateException("Builder::build called without a RawSnapshotWriter");
197198
} else if (rawSnapshotWriter.get().sizeInBytes() != 0) {
@@ -213,6 +214,8 @@ public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {
213214
serde
214215
);
215216

217+
bootstrapRecords.ifPresent(writer::append);
218+
216219
writer.accumulator.appendControlMessages((baseOffset, epoch, compression, buffer) -> {
217220
long now = time.milliseconds();
218221
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(

raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2224,7 +2224,7 @@ private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext conte
22242224
return new RecordsSnapshotWriter.Builder()
22252225
.setTime(context.time)
22262226
.setRawSnapshotWriter(snapshot)
2227-
.build(new StringSerde());
2227+
.build(new StringSerde(), Optional.empty());
22282228
}
22292229

22302230
private static final class MemorySnapshotWriter implements RawSnapshotWriter {

raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ Builder withEmptySnapshot(OffsetAndEpoch snapshotId) {
268268
.setTime(time)
269269
.setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
270270
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(snapshotId).get())
271-
.build(SERDE)
271+
.build(SERDE, Optional.empty())
272272
) {
273273
snapshot.freeze();
274274
}
@@ -363,7 +363,7 @@ Builder withBootstrapSnapshot(Optional<VoterSet> voters) {
363363
.setKraftVersion(kraftVersion)
364364
.setVoterSet(voters);
365365

366-
try (RecordsSnapshotWriter<String> writer = builder.build(SERDE)) {
366+
try (RecordsSnapshotWriter<String> writer = builder.build(SERDE, Optional.empty())) {
367367
writer.freeze();
368368
}
369369
} else {

raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ void testUpdateWithEmptySnapshot() {
172172
// Create a snapshot that doesn't have any kraft.version or voter set control records
173173
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
174174
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(new OffsetAndEpoch(10, epoch)).get());
175-
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
175+
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
176176
writer.freeze();
177177
}
178178
log.truncateToLatestSnapshot();
@@ -234,7 +234,7 @@ void testUpdateWithSnapshot() {
234234
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(new OffsetAndEpoch(10, epoch)).get())
235235
.setKraftVersion(kraftVersion)
236236
.setVoterSet(Optional.of(voterSet));
237-
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
237+
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
238238
writer.freeze();
239239
}
240240
log.truncateToLatestSnapshot();
@@ -272,7 +272,7 @@ void testUpdateWithSnapshotAndLogOverride() {
272272
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(snapshotId).get())
273273
.setKraftVersion(kraftVersion)
274274
.setVoterSet(Optional.of(snapshotVoterSet));
275-
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
275+
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
276276
writer.freeze();
277277
}
278278
log.truncateToLatestSnapshot();

raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public void testControlRecordIterationWithKraftVersion0() {
171171
.setRawSnapshotWriter(
172172
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), buffer::set)
173173
);
174-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
174+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
175175
snapshot.append(List.of("a", "b", "c"));
176176
snapshot.append(List.of("d", "e", "f"));
177177
snapshot.append(List.of("g", "h", "i"));
@@ -221,7 +221,7 @@ public void testControlRecordIterationWithKraftVersion1() {
221221
.setRawSnapshotWriter(
222222
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), buffer::set)
223223
);
224-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
224+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
225225
snapshot.append(List.of("a", "b", "c"));
226226
snapshot.append(List.of("d", "e", "f"));
227227
snapshot.append(List.of("g", "h", "i"));

raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void testBuilderKRaftVersion0() {
6060
.setRawSnapshotWriter(
6161
new MockRawSnapshotWriter(snapshotId, buffer::set)
6262
);
63-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
63+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
6464
snapshot.freeze();
6565
}
6666

@@ -114,7 +114,7 @@ void testBuilderKRaftVersion0WithVoterSet() {
114114
new MockRawSnapshotWriter(snapshotId, buffer::set)
115115
);
116116

117-
assertThrows(IllegalStateException.class, () -> builder.build(STRING_SERDE));
117+
assertThrows(IllegalStateException.class, () -> builder.build(STRING_SERDE, Optional.empty()));
118118
}
119119

120120
@Test
@@ -133,7 +133,7 @@ void testKBuilderRaftVersion1WithVoterSet() {
133133
.setRawSnapshotWriter(
134134
new MockRawSnapshotWriter(snapshotId, buffer::set)
135135
);
136-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
136+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
137137
snapshot.freeze();
138138
}
139139

@@ -191,7 +191,7 @@ void testBuilderKRaftVersion1WithoutVoterSet() {
191191
.setRawSnapshotWriter(
192192
new MockRawSnapshotWriter(snapshotId, buffer::set)
193193
);
194-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
194+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
195195
snapshot.freeze();
196196
}
197197

0 commit comments

Comments
 (0)