Skip to content

Commit 6bfd49b

Browse files
MayureshGharatMayuresh Gharat
authored and
Mayuresh Gharat
committed
[LI-HOTFIX]
Added support to enable passthrough mirroring data from V1 format to V2 format (#70) Pass through mirroring as it is today, works when the log.message.format.version between source and destination cluster is the same (0.10.0 to 0.10.0, 2.0 to 2.0). When there is a mixed message format between source and destination, KMM throws errors. This patch makes passthrough mirroring work when the source is at 0.10.0 format and destination is at 2.0 format. This patch does not address the errors/issues when the source is at 2.0 format and destination is at 0.10.0 format. TICKET =
1 parent 456f47e commit 6bfd49b

File tree

6 files changed

+142
-5
lines changed

6 files changed

+142
-5
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer;
18+
19+
import org.apache.kafka.common.header.Headers;
20+
import org.apache.kafka.common.record.TimestampType;
21+
22+
/**
23+
* In case of passthrough, value contains the message batch while the key is null
24+
* @param <K> null
25+
* @param <V> batch of messages
26+
*/
27+
public class PassThroughConsumerRecord<K, V> extends ConsumerRecord<K, V> {
28+
private final byte magic;
29+
30+
public PassThroughConsumerRecord(String topic, int partition, long offset, K key, V value, byte magic) {
31+
super(topic, partition, offset, key, value);
32+
this.magic = magic;
33+
}
34+
35+
public PassThroughConsumerRecord(String topic, int partition, long offset, long timestamp,
36+
TimestampType timestampType, long checksum, int serializedKeySize, int serializedValueSize, K key, V value,
37+
byte magic) {
38+
super(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key,
39+
value);
40+
this.magic = magic;
41+
}
42+
43+
public PassThroughConsumerRecord(String topic, int partition, long offset, long timestamp,
44+
TimestampType timestampType, Long checksum, int serializedKeySize, int serializedValueSize, K key, V value,
45+
Headers headers, byte magic) {
46+
super(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key,
47+
value, headers);
48+
this.magic = magic;
49+
}
50+
51+
public byte magic() {
52+
return magic;
53+
}
54+
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
2727
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
2828
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
29+
import org.apache.kafka.clients.consumer.PassThroughConsumerRecord;
2930
import org.apache.kafka.common.Cluster;
3031
import org.apache.kafka.common.KafkaException;
3132
import org.apache.kafka.common.Node;
@@ -1147,6 +1148,13 @@ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
11471148
(enableShallowIteration ? ((AbstractLegacyRecordBatch) record).outerRecord().buffer() : record.value());
11481149
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
11491150
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
1151+
if (enableShallowIteration) {
1152+
return new PassThroughConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp,
1153+
timestampType, record.checksumOrNull(),
1154+
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
1155+
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, key, value, headers,
1156+
batch.magic());
1157+
}
11501158
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
11511159
timestamp, timestampType, record.checksumOrNull(),
11521160
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@
6969
*/
7070
public final class RecordAccumulator {
7171

72+
private static final String PASS_THROUGH_MAGIC_VALUE = "__passThroughMagicValue";
73+
7274
private final Logger log;
7375
private volatile boolean closed;
7476
private final AtomicInteger flushesInProgress;
@@ -222,6 +224,20 @@ public RecordAppendResult append(TopicPartition tp,
222224
return appendResult;
223225
}
224226

227+
// HOTFIX for enabling mirroring data with passthrough compression with mixed message format
228+
// In PassThrough mode, KMM will set a header with key = "__passThroughMagicValue" and value = magic byte of the message.
229+
// This code enables passthrough were source is in 0.10 format and destination is in 2.0 format (not the other way round).
230+
if (compression.equals(CompressionType.PASSTHROUGH)) {
231+
for (Header header : headers) {
232+
if (header.key().equals(PASS_THROUGH_MAGIC_VALUE)) {
233+
byte srcMagicValue = header.value()[0];
234+
if (maxUsableMagic > srcMagicValue) {
235+
maxUsableMagic = srcMagicValue;
236+
}
237+
}
238+
}
239+
}
240+
225241
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
226242
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
227243
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,8 +792,12 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Head
792792

793793
// For passthrough V2, ensure one producerBatch only has one DefaultRecordBatch, and since
794794
// in this case, DefaultRecordBatch is the value part of a DefaultRecord, so we only allow one record
795-
if (magic >= RecordBatch.MAGIC_VALUE_V2 && usePassthrough)
795+
// For passthrough V1, ideally we can append multiple passthrough records to the same batch, it
796+
// will not work when we are migrating from V1 to V2 message format as we cannot append V1 and V2 messages
797+
// in the same batch.
798+
if (usePassthrough) {
796799
return false;
800+
}
797801

798802
final int recordSize;
799803
if (magic < RecordBatch.MAGIC_VALUE_V2) {

core/src/main/scala/kafka/consumer/BaseConsumerRecord.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,5 @@ case class BaseConsumerRecord(topic: String,
3030
timestampType: TimestampType = TimestampType.NO_TIMESTAMP_TYPE,
3131
key: Array[Byte],
3232
value: Array[Byte],
33-
headers: Headers = new RecordHeaders())
33+
headers: Headers = new RecordHeaders(),
34+
magic: Byte = -1)

core/src/main/scala/kafka/tools/MirrorMaker.scala

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ import joptsimple.OptionParser
2828
import kafka.consumer.BaseConsumerRecord
2929
import kafka.metrics.KafkaMetricsGroup
3030
import kafka.utils.{CommandLineUtils, CoreUtils, Logging, Whitelist}
31-
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
31+
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, PassThroughConsumerRecord}
3232
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
3333
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
3434
import org.apache.kafka.common.{KafkaException, TopicPartition}
3535
import org.apache.kafka.common.serialization.ByteArrayDeserializer
3636
import org.apache.kafka.common.utils.Utils
3737
import org.apache.kafka.common.errors.WakeupException
38+
import org.apache.kafka.common.header.Headers
39+
import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
3840
import org.apache.kafka.common.record.RecordBatch
3941

4042
import scala.collection.JavaConverters._
@@ -69,6 +71,22 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
6971
private var offsetCommitIntervalMs = 0
7072
private var abortOnSendFailure: Boolean = true
7173
@volatile private var exitingOnSendFailure: Boolean = false
74+
private var passThroughEnabled: Boolean = false
75+
private val PASS_THROUGH_MAGIC_VALUE = "__passThroughMagicValue";
76+
77+
val recordHeadersV1: Headers = {
78+
val magicValueV1 = Array[Byte] {
79+
RecordBatch.MAGIC_VALUE_V1
80+
}
81+
new RecordHeaders().add(new RecordHeader(PASS_THROUGH_MAGIC_VALUE, magicValueV1))
82+
}
83+
84+
val recordHeadersV2: Headers = {
85+
val magicValueV2 = Array[Byte] {
86+
RecordBatch.MAGIC_VALUE_V2
87+
}
88+
new RecordHeaders().add(new RecordHeader(PASS_THROUGH_MAGIC_VALUE, magicValueV2))
89+
}
7290

7391
// If a message send failed after retries are exhausted. The offset of the messages will also be removed from
7492
// the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that
@@ -214,6 +232,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
214232
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
215233
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
216234
if (options.has(passthroughCompressionOpt)) {
235+
passThroughEnabled = true
217236
consumerProps.setProperty("enable.shallow.iterator", "true")
218237
producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "passthrough")
219238
}
@@ -253,7 +272,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
253272
else
254273
CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
255274
} else {
256-
defaultMirrorMakerMessageHandler
275+
if (passThroughEnabled) {
276+
passThroughMirrorMakerMessageHandler
277+
} else {
278+
defaultMirrorMakerMessageHandler
279+
}
257280
}
258281
}
259282
} catch {
@@ -362,6 +385,17 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
362385
record.value,
363386
record.headers)
364387

388+
private def toBaseConsumerRecordWithPassThrough(record: PassThroughConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord =
389+
BaseConsumerRecord(record.topic,
390+
record.partition,
391+
record.offset,
392+
record.timestamp,
393+
record.timestampType,
394+
record.key,
395+
record.value,
396+
record.headers,
397+
record.magic)
398+
365399
override def run() {
366400
info("Starting mirror maker thread " + threadName)
367401
try {
@@ -377,7 +411,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
377411
} else {
378412
trace("Sending message with null value and offset %d.".format(data.offset))
379413
}
380-
val records = messageHandler.handle(toBaseConsumerRecord(data))
414+
val records = {
415+
if (passThroughEnabled) {
416+
messageHandler.handle(toBaseConsumerRecordWithPassThrough(data.asInstanceOf[PassThroughConsumerRecord[Array[Byte], Array[Byte]]]))
417+
} else {
418+
messageHandler.handle(toBaseConsumerRecord(data))
419+
}
420+
}
381421
records.asScala.foreach(producer.send)
382422
maybeFlushAndCommitOffsets()
383423
}
@@ -587,6 +627,20 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
587627
}
588628
}
589629

630+
private[tools] object passThroughMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
631+
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
632+
val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp
633+
// It is assumed that we don't have message format V0 anymore at Linkedin
634+
if (record.magic.equals(RecordBatch.MAGIC_VALUE_V1)) {
635+
Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, recordHeadersV1))
636+
} else if (record.magic.equals(RecordBatch.MAGIC_VALUE_V2)) {
637+
Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, recordHeadersV2))
638+
} else {
639+
throw new IllegalArgumentException("Record Batch with magic value : " + record.magic + ", is not supported in PassThrough mode")
640+
}
641+
}
642+
}
643+
590644
private class NoRecordsException extends RuntimeException
591645

592646
}

0 commit comments

Comments
 (0)