Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
a9c79a3
redo implementation to use RecordsSnapshotWriter
mannoopj Oct 20, 2025
2a6f663
NIT fix spacing
mannoopj Oct 20, 2025
752a79e
NIT remove unused import
mannoopj Oct 20, 2025
026da2b
NIT remove extra line
mannoopj Oct 20, 2025
7e303b9
add bootstrap records in formatter
mannoopj Oct 20, 2025
584fe3d
revert RaftClientTestContext
mannoopj Oct 20, 2025
9852262
wip changes
mannoopj Nov 3, 2025
9cb5318
wip remove system.out
mannoopj Nov 3, 2025
ff3c050
wip remove more system.out
mannoopj Nov 3, 2025
f5763d0
remove system out in quorum controller
mannoopj Nov 3, 2025
f3a3e41
change check for emptyLog
mannoopj Nov 4, 2025
0ab8e02
remove prints
mannoopj Nov 4, 2025
1f76fa9
remove default
mannoopj Nov 4, 2025
4de55ce
remove bootstrapRecordsAppended
mannoopj Nov 5, 2025
56435b3
dont format checkpoint for broker only, split bootstrapDirectory read
mannoopj Nov 5, 2025
29affeb
WIP, stop MetadataLoader reading 000.checkpoint
mannoopj Nov 7, 2025
533033b
remove null check for bootstrapMetadata
mannoopj Nov 7, 2025
f35e992
stop replaying metadata records
mannoopj Nov 7, 2025
1c4528e
address comments
mannoopj Nov 18, 2025
625acfb
handleLoadSnapshot changes
mannoopj Nov 18, 2025
cb2ad18
import fixes
mannoopj Nov 18, 2025
3c809c1
temp build changes
mannoopj Nov 19, 2025
1908b2c
BootstrapDirectoryTest fix
mannoopj Nov 19, 2025
7226af0
more tests fixes
mannoopj Nov 19, 2025
029aa6b
Revert "BootstrapDirectoryTest fix"
mannoopj Nov 19, 2025
6dbef44
BootstrapDirectoryTest fix
mannoopj Nov 19, 2025
cc3f0b4
nit: fix indentation
mannoopj Nov 19, 2025
7ad1342
ReconfigurableQuorumIntegrationTest fix
mannoopj Nov 20, 2025
135c126
StorageToolTest fix
mannoopj Nov 20, 2025
383045a
fix KafkaClusterTestKit
mannoopj Nov 24, 2025
b474770
remove writeBootstrapSnapshot var in StorageTool
mannoopj Nov 25, 2025
721fb5c
remove test specific fixes
mannoopj Nov 25, 2025
cb4803d
removed unused import
mannoopj Nov 25, 2025
6a46cf7
testStartupWithNonDefaultKControllerDynamicConfiguration fix
mannoopj Nov 25, 2025
adb1548
add WARN message
mannoopj Dec 1, 2025
a839631
add FileNotFoundException
mannoopj Dec 1, 2025
96bc7f3
add checkpoint file exist checks
mannoopj Dec 1, 2025
4823d30
remove assertCheckpointExists
mannoopj Dec 1, 2025
082d469
move up copier.setWriteErrorHandler
mannoopj Dec 1, 2025
dae3bd9
move testNonDefaultKControllerDynamicConfiguration to DynamicBrokerRe…
mannoopj Dec 1, 2025
8ebfdaf
remove additionalBootstrapRecords
mannoopj Dec 1, 2025
5ba0c9b
convert BootstrapDirectory to interface
mannoopj Dec 1, 2025
a7450e0
fix testFormatterFailsOnUnwritableDirectory
mannoopj Dec 1, 2025
dcd1b0b
move up readFromBinaryFile
mannoopj Dec 2, 2025
67ad6c4
remove writeBinaryFile
mannoopj Dec 2, 2025
0a78d0d
revert gradle changes
mannoopj Dec 2, 2025
de4da73
revert build.gradle
mannoopj Dec 2, 2025
eb4ba90
more build.gradle reverts
mannoopj Dec 2, 2025
1d1e7ba
more build.gradle fixes
mannoopj Dec 2, 2025
dc27d7c
import-control.xml reverts
mannoopj Dec 2, 2025
8469809
KafkaStreamsTelemetryIntegrationTest reverts
mannoopj Dec 2, 2025
2e0c8f0
KafkaStreamsTelemetryIntegrationTest more reverts
mannoopj Dec 2, 2025
a182b12
fix spacing
mannoopj Dec 2, 2025
0da0919
FormatterTest fixes
mannoopj Dec 2, 2025
fd3adf6
move TestBootstrapDirectory
mannoopj Dec 2, 2025
3d589e6
remove extra line
mannoopj Dec 2, 2025
56dae1b
remove SuppressWarnings for testReadFromEmptyConfiguration
mannoopj Dec 2, 2025
b3feaa6
move BootstrapDirectory filenames
mannoopj Dec 4, 2025
03eafae
nit BootstrapDirectory fixes
mannoopj Dec 4, 2025
7e8d2e6
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj Dec 4, 2025
7a9404f
fix incorrect merge
mannoopj Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ class DumpLogSegmentsTest {
)

val lastContainedLogTimestamp = 10000
val emptyOptional: Optional[java.util.List[ApiMessageAndVersion]] = Optional.empty()

Using.resource(
new RecordsSnapshotWriter.Builder()
Expand All @@ -601,7 +602,7 @@ class DumpLogSegmentsTest {
.setRawSnapshotWriter(metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)).get)
.setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true))))
.build(MetadataRecordSerde.INSTANCE)
.build(MetadataRecordSerde.INSTANCE, emptyOptional)
Copy link
Contributor

@kevin-wu24 kevin-wu24 Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to remove this. This applies to other instances where we build the RecordsSnapshotWriter.

) { snapshotWriter =>
snapshotWriter.append(metadataRecords)
snapshotWriter.freeze()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,11 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws Exception {
directoryTypes.get(writeLogDir).description(), writeLogDir,
MetadataVersion.FEATURE_NAME, releaseVersion);
Files.createDirectories(Paths.get(writeLogDir));
BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(writeLogDir);
bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
if (directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
writeDynamicQuorumSnapshot(writeLogDir,
initialControllers.get(),
featureLevels.get(KRaftVersion.FEATURE_NAME),
controllerListenerName);
}
writeBoostrapSnapshot(writeLogDir,
bootstrapMetadata,
initialControllers.get(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass the optional for initialControllers here. We can only do a .get() if initialControllers.isPresent().

featureLevels.get(KRaftVersion.FEATURE_NAME),
controllerListenerName);
});
copier.setWriteErrorHandler((errorLogDir, e) -> {
throw new FormatterException("Error while writing meta.properties file " +
Expand Down Expand Up @@ -492,8 +489,9 @@ static DirectoryType calculate(
}
}

static void writeDynamicQuorumSnapshot(
static void writeBoostrapSnapshot(
String writeLogDir,
BootstrapMetadata bootstrapMetadata,
DynamicVoters initialControllers,
short kraftVersion,
String controllerListenerName
Expand All @@ -511,7 +509,7 @@ static void writeDynamicQuorumSnapshot(
Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
setVoterSet(Optional.of(voterSet));
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde())) {
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde(), Optional.of(bootstrapMetadata.records()))) {
writer.freeze();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde(), Optional.of(bootstrapMetadata.records()))) {
writer.freeze();
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde()))) {
writer.append(bootstrapMetadata.records());
writer.freeze();

Copy link
Contributor Author

@mannoopj mannoopj Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this write the bootstrap records after the control records, since in RecordsSnapshotWriter.build() is where we append the kraft records and we would be calling that ahead of writer.append in this scenario? we want the bootstrap records ahead correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter what order we write them in because KRaft only reads the control records, and the metadata module will only read the "data" records. When we read the 0-0.checkpoint back into memory, we only deal with either its control records or its data records, not both in the same code.

}
}
Copy link
Contributor

@kevin-wu24 kevin-wu24 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a clearer way to make these changes. This method is what writes the 0-0.checkpoint currently. We should pass the bootstrap metadata object here and append the metadata records using writer.append.append(bootstrapMetadata.records()) before calling writer.freeze().

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
.setTime(new MockTime())
.setRawSnapshotWriter(createNewSnapshot(snapshotId))
.build(new MetadataRecordSerde())
.build(new MetadataRecordSerde(), Optional.empty())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3726,7 +3726,7 @@ public Optional<SnapshotWriter<T>> createSnapshot(
.setRawSnapshotWriter(wrappedWriter)
.setKraftVersion(partitionState.kraftVersionAtOffset(lastContainedLogOffset))
.setVoterSet(partitionState.voterSetAtOffset(lastContainedLogOffset))
.build(serde);
.build(serde, Optional.empty());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public Builder setVoterSet(Optional<VoterSet> voterSet) {
return this;
}

public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {
public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde, Optional<List<T>> bootstrapRecords) {
if (rawSnapshotWriter.isEmpty()) {
throw new IllegalStateException("Builder::build called without a RawSnapshotWriter");
} else if (rawSnapshotWriter.get().sizeInBytes() != 0) {
Expand All @@ -213,6 +213,8 @@ public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {
serde
);

bootstrapRecords.ifPresent(writer::append);

writer.accumulator.appendControlMessages((baseOffset, epoch, compression, buffer) -> {
long now = time.milliseconds();
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2224,7 +2224,7 @@ private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext conte
return new RecordsSnapshotWriter.Builder()
.setTime(context.time)
.setRawSnapshotWriter(snapshot)
.build(new StringSerde());
.build(new StringSerde(), Optional.empty());
}

private static final class MemorySnapshotWriter implements RawSnapshotWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ Builder withEmptySnapshot(OffsetAndEpoch snapshotId) {
.setTime(time)
.setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(snapshotId).get())
.build(SERDE)
.build(SERDE, Optional.empty())
) {
snapshot.freeze();
}
Expand Down Expand Up @@ -363,7 +363,7 @@ Builder withBootstrapSnapshot(Optional<VoterSet> voters) {
.setKraftVersion(kraftVersion)
.setVoterSet(voters);

try (RecordsSnapshotWriter<String> writer = builder.build(SERDE)) {
try (RecordsSnapshotWriter<String> writer = builder.build(SERDE, Optional.empty())) {
writer.freeze();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ void testUpdateWithEmptySnapshot() {
// Create a snapshot that doesn't have any kraft.version or voter set control records
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(new OffsetAndEpoch(10, epoch)).get());
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
writer.freeze();
}
log.truncateToLatestSnapshot();
Expand Down Expand Up @@ -234,7 +234,7 @@ void testUpdateWithSnapshot() {
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(new OffsetAndEpoch(10, epoch)).get())
.setKraftVersion(kraftVersion)
.setVoterSet(Optional.of(voterSet));
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
writer.freeze();
}
log.truncateToLatestSnapshot();
Expand Down Expand Up @@ -272,7 +272,7 @@ void testUpdateWithSnapshotAndLogOverride() {
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(snapshotId).get())
.setKraftVersion(kraftVersion)
.setVoterSet(Optional.of(snapshotVoterSet));
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
writer.freeze();
}
log.truncateToLatestSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void testControlRecordIterationWithKraftVersion0() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.append(List.of("a", "b", "c"));
snapshot.append(List.of("d", "e", "f"));
snapshot.append(List.of("g", "h", "i"));
Expand Down Expand Up @@ -221,7 +221,7 @@ public void testControlRecordIterationWithKraftVersion1() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.append(List.of("a", "b", "c"));
snapshot.append(List.of("d", "e", "f"));
snapshot.append(List.of("g", "h", "i"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void testBuilderKRaftVersion0() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.freeze();
}

Expand Down Expand Up @@ -114,7 +114,7 @@ void testBuilderKRaftVersion0WithVoterSet() {
new MockRawSnapshotWriter(snapshotId, buffer::set)
);

assertThrows(IllegalStateException.class, () -> builder.build(STRING_SERDE));
assertThrows(IllegalStateException.class, () -> builder.build(STRING_SERDE, Optional.empty()));
}

@Test
Expand All @@ -133,7 +133,7 @@ void testKBuilderRaftVersion1WithVoterSet() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.freeze();
}

Expand Down Expand Up @@ -191,7 +191,7 @@ void testBuilderKRaftVersion1WithoutVoterSet() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.freeze();
}

Expand Down