Skip to content

Commit 4a8a063

Browse files
authored
KAFKA-18723; Better handle invalid records during replication (apache#18852)
For the KRaft implementation there is a race between the network thread, which read bytes in the log segments, and the KRaft driver thread, which truncates the log and appends records to the log. This race can cause the network thread to send corrupted records or inconsistent records. The corrupted records case is handle by catching and logging the CorruptRecordException. The inconsistent records case is handle by only appending record batches who's partition leader epoch is less than or equal to the fetching replica's epoch and the epoch didn't change between the request and response. For the ISR implementation there is also a race between the network thread and the replica fetcher thread, which truncates the log and appends records to the log. This race can cause the network thread send corrupted records or inconsistent records. The replica fetcher thread already handles the corrupted record case. The inconsistent records case is handle by only appending record batches who's partition leader epoch is less than or equal to the leader epoch in the FETCH request. Reviewers: Jun Rao <[email protected]>, Alyssa Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 1fed928 commit 4a8a063

29 files changed

+1298
-302
lines changed

build.gradle

+8
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,7 @@ project(':core') {
10371037
testImplementation project(':test-common:test-common-util')
10381038
testImplementation libs.bcpkix
10391039
testImplementation libs.mockitoCore
1040+
testImplementation libs.jqwik
10401041
testImplementation(libs.apacheda) {
10411042
exclude group: 'xml-apis', module: 'xml-apis'
10421043
// `mina-core` is a transitive dependency for `apacheds` and `apacheda`.
@@ -1231,6 +1232,12 @@ project(':core') {
12311232
)
12321233
}
12331234

1235+
test {
1236+
useJUnitPlatform {
1237+
includeEngines 'jqwik', 'junit-jupiter'
1238+
}
1239+
}
1240+
12341241
tasks.create(name: "copyDependantTestLibs", type: Copy) {
12351242
from (configurations.testRuntimeClasspath) {
12361243
include('*.jar')
@@ -1802,6 +1809,7 @@ project(':clients') {
18021809
testImplementation libs.jacksonJakartarsJsonProvider
18031810
testImplementation libs.jose4j
18041811
testImplementation libs.junitJupiter
1812+
testImplementation libs.jqwik
18051813
testImplementation libs.spotbugs
18061814
testImplementation libs.mockitoCore
18071815
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension

clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public void ensureValid() {
159159

160160
/**
161161
* Gets the base timestamp of the batch which is used to calculate the record timestamps from the deltas.
162-
*
162+
*
163163
* @return The base timestamp
164164
*/
165165
public long baseTimestamp() {
@@ -502,6 +502,7 @@ public static void writeHeader(ByteBuffer buffer,
502502
public String toString() {
503503
return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset() + "], " +
504504
"sequence=[" + baseSequence() + ", " + lastSequence() + "], " +
505+
"partitionLeaderEpoch=" + partitionLeaderEpoch() + ", " +
505506
"isTransactional=" + isTransactional() + ", isControlBatch=" + isControlBatch() + ", " +
506507
"compression=" + compressionType() + ", timestampType=" + timestampType() + ", crc=" + checksum() + ")";
507508
}

clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@
3232
import org.apache.kafka.common.utils.CloseableIterator;
3333
import org.apache.kafka.common.utils.Utils;
3434

35-
import org.slf4j.Logger;
36-
import org.slf4j.LoggerFactory;
37-
3835
import java.io.IOException;
3936
import java.nio.ByteBuffer;
4037
import java.nio.channels.GatheringByteChannel;
@@ -49,7 +46,6 @@
4946
* or one of the {@link #builder(ByteBuffer, byte, Compression, TimestampType, long)} variants.
5047
*/
5148
public class MemoryRecords extends AbstractRecords {
52-
private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class);
5349
public static final MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
5450

5551
private final ByteBuffer buffer;
@@ -596,7 +592,7 @@ public static MemoryRecords withRecords(byte magic, long initialOffset, Compress
596592
return withRecords(magic, initialOffset, compression, TimestampType.CREATE_TIME, records);
597593
}
598594

599-
public static MemoryRecords withRecords(long initialOffset, Compression compression, Integer partitionLeaderEpoch, SimpleRecord... records) {
595+
public static MemoryRecords withRecords(long initialOffset, Compression compression, int partitionLeaderEpoch, SimpleRecord... records) {
600596
return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compression, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
601597
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, false, records);
602598
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.record;
18+
19+
import net.jqwik.api.Arbitraries;
20+
import net.jqwik.api.Arbitrary;
21+
import net.jqwik.api.ArbitrarySupplier;
22+
23+
import java.nio.ByteBuffer;
24+
import java.util.Random;
25+
26+
public final class ArbitraryMemoryRecords implements ArbitrarySupplier<MemoryRecords> {
27+
@Override
28+
public Arbitrary<MemoryRecords> get() {
29+
return Arbitraries.randomValue(ArbitraryMemoryRecords::buildRandomRecords);
30+
}
31+
32+
private static MemoryRecords buildRandomRecords(Random random) {
33+
int size = random.nextInt(128) + 1;
34+
byte[] bytes = new byte[size];
35+
random.nextBytes(bytes);
36+
37+
return MemoryRecords.readableRecords(ByteBuffer.wrap(bytes));
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.record;
18+
19+
import org.apache.kafka.common.errors.CorruptRecordException;
20+
21+
import org.junit.jupiter.api.extension.ExtensionContext;
22+
import org.junit.jupiter.params.provider.Arguments;
23+
import org.junit.jupiter.params.provider.ArgumentsProvider;
24+
25+
import java.nio.ByteBuffer;
26+
import java.util.Optional;
27+
import java.util.stream.Stream;
28+
29+
public final class InvalidMemoryRecordsProvider implements ArgumentsProvider {
30+
// Use a baseOffset that's not zero so that it is less likely to match the LEO
31+
private static final long BASE_OFFSET = 1234;
32+
private static final int EPOCH = 4321;
33+
34+
/**
35+
* Returns a stream of arguments for invalid memory records and the expected exception.
36+
*
37+
* The first object in the {@code Arguments} is a {@code MemoryRecords}.
38+
*
39+
* The second object in the {@code Arguments} is an {@code Optional<Class<Exception>>} which is
40+
* the expected exception from the log layer.
41+
*/
42+
@Override
43+
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
44+
return Stream.of(
45+
Arguments.of(MemoryRecords.readableRecords(notEnoughBytes()), Optional.empty()),
46+
Arguments.of(MemoryRecords.readableRecords(recordsSizeTooSmall()), Optional.of(CorruptRecordException.class)),
47+
Arguments.of(MemoryRecords.readableRecords(notEnoughBytesToMagic()), Optional.empty()),
48+
Arguments.of(MemoryRecords.readableRecords(negativeMagic()), Optional.of(CorruptRecordException.class)),
49+
Arguments.of(MemoryRecords.readableRecords(largeMagic()), Optional.of(CorruptRecordException.class)),
50+
Arguments.of(MemoryRecords.readableRecords(lessBytesThanRecordSize()), Optional.empty())
51+
);
52+
}
53+
54+
private static ByteBuffer notEnoughBytes() {
55+
var buffer = ByteBuffer.allocate(Records.LOG_OVERHEAD - 1);
56+
buffer.limit(buffer.capacity());
57+
58+
return buffer;
59+
}
60+
61+
private static ByteBuffer recordsSizeTooSmall() {
62+
var buffer = ByteBuffer.allocate(256);
63+
// Write the base offset
64+
buffer.putLong(BASE_OFFSET);
65+
// Write record size
66+
buffer.putInt(LegacyRecord.RECORD_OVERHEAD_V0 - 1);
67+
buffer.position(0);
68+
buffer.limit(buffer.capacity());
69+
70+
return buffer;
71+
}
72+
73+
private static ByteBuffer notEnoughBytesToMagic() {
74+
var buffer = ByteBuffer.allocate(256);
75+
// Write the base offset
76+
buffer.putLong(BASE_OFFSET);
77+
// Write record size
78+
buffer.putInt(buffer.capacity() - Records.LOG_OVERHEAD);
79+
buffer.position(0);
80+
buffer.limit(Records.HEADER_SIZE_UP_TO_MAGIC - 1);
81+
82+
return buffer;
83+
}
84+
85+
private static ByteBuffer negativeMagic() {
86+
var buffer = ByteBuffer.allocate(256);
87+
// Write the base offset
88+
buffer.putLong(BASE_OFFSET);
89+
// Write record size
90+
buffer.putInt(buffer.capacity() - Records.LOG_OVERHEAD);
91+
// Write the epoch
92+
buffer.putInt(EPOCH);
93+
// Write magic
94+
buffer.put((byte) -1);
95+
buffer.position(0);
96+
buffer.limit(buffer.capacity());
97+
98+
return buffer;
99+
}
100+
101+
private static ByteBuffer largeMagic() {
102+
var buffer = ByteBuffer.allocate(256);
103+
// Write the base offset
104+
buffer.putLong(BASE_OFFSET);
105+
// Write record size
106+
buffer.putInt(buffer.capacity() - Records.LOG_OVERHEAD);
107+
// Write the epoch
108+
buffer.putInt(EPOCH);
109+
// Write magic
110+
buffer.put((byte) (RecordBatch.CURRENT_MAGIC_VALUE + 1));
111+
buffer.position(0);
112+
buffer.limit(buffer.capacity());
113+
114+
return buffer;
115+
}
116+
117+
private static ByteBuffer lessBytesThanRecordSize() {
118+
var buffer = ByteBuffer.allocate(256);
119+
// Write the base offset
120+
buffer.putLong(BASE_OFFSET);
121+
// Write record size
122+
buffer.putInt(buffer.capacity() - Records.LOG_OVERHEAD);
123+
// Write the epoch
124+
buffer.putInt(EPOCH);
125+
// Write magic
126+
buffer.put(RecordBatch.CURRENT_MAGIC_VALUE);
127+
buffer.position(0);
128+
buffer.limit(buffer.capacity() - Records.LOG_OVERHEAD - 1);
129+
130+
return buffer;
131+
}
132+
}

core/src/main/scala/kafka/cluster/Partition.scala

+14-6
Original file line numberDiff line numberDiff line change
@@ -1302,27 +1302,35 @@ class Partition(val topicPartition: TopicPartition,
13021302
}
13031303
}
13041304

1305-
private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Option[LogAppendInfo] = {
1305+
private def doAppendRecordsToFollowerOrFutureReplica(
1306+
records: MemoryRecords,
1307+
isFuture: Boolean,
1308+
partitionLeaderEpoch: Int
1309+
): Option[LogAppendInfo] = {
13061310
if (isFuture) {
13071311
// The read lock is needed to handle race condition if request handler thread tries to
13081312
// remove future replica after receiving AlterReplicaLogDirsRequest.
13091313
inReadLock(leaderIsrUpdateLock) {
13101314
// Note the replica may be undefined if it is removed by a non-ReplicaAlterLogDirsThread before
13111315
// this method is called
1312-
futureLog.map { _.appendAsFollower(records) }
1316+
futureLog.map { _.appendAsFollower(records, partitionLeaderEpoch) }
13131317
}
13141318
} else {
13151319
// The lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
13161320
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
13171321
futureLogLock.synchronized {
1318-
Some(localLogOrException.appendAsFollower(records))
1322+
Some(localLogOrException.appendAsFollower(records, partitionLeaderEpoch))
13191323
}
13201324
}
13211325
}
13221326

1323-
def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Option[LogAppendInfo] = {
1327+
def appendRecordsToFollowerOrFutureReplica(
1328+
records: MemoryRecords,
1329+
isFuture: Boolean,
1330+
partitionLeaderEpoch: Int
1331+
): Option[LogAppendInfo] = {
13241332
try {
1325-
doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
1333+
doAppendRecordsToFollowerOrFutureReplica(records, isFuture, partitionLeaderEpoch)
13261334
} catch {
13271335
case e: UnexpectedAppendOffsetException =>
13281336
val log = if (isFuture) futureLocalLogOrException else localLogOrException
@@ -1340,7 +1348,7 @@ class Partition(val topicPartition: TopicPartition,
13401348
info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset ${log.logStartOffset}." +
13411349
s" Since this is the first record to be appended to the $replicaName's log, will start the log from offset ${e.firstOffset}.")
13421350
truncateFullyAndStartAt(e.firstOffset, isFuture)
1343-
doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
1351+
doAppendRecordsToFollowerOrFutureReplica(records, isFuture, partitionLeaderEpoch)
13441352
} else
13451353
throw e
13461354
}

0 commit comments

Comments
 (0)