Skip to content

Commit e7258eb

Browse files
brandboatmjsax
authored andcommitted
KAFKA-18401: Transaction version 2 does not support commit transaction without records (apache#18448)
Fix the issue where producer.commitTransaction under transaction version 2 throws error if no partition or offset is added to transaction. The solution is to avoid sending the endTxnRequest unless producer.send or producer.sendOffsetsToTransaction is triggered. Reviewers: Justine Olshan <[email protected]>
1 parent 677aafa commit e7258eb

File tree

3 files changed

+233
-91
lines changed

3 files changed

+233
-91
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Ma
381381
if (isTransactionV2Enabled()) {
382382
log.debug("Begin adding offsets {} for consumer group {} to transaction with transaction protocol V2", offsets, groupMetadata);
383383
handler = txnOffsetCommitHandler(null, offsets, groupMetadata);
384+
transactionStarted = true;
384385
} else {
385386
log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, groupMetadata);
386387
AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(
@@ -411,6 +412,7 @@ public synchronized void maybeAddPartition(TopicPartition topicPartition) {
411412
} else if (isTransactionV2Enabled()) {
412413
txnPartitionMap.getOrCreate(topicPartition);
413414
partitionsInTransaction.add(topicPartition);
415+
transactionStarted = true;
414416
} else if (transactionContainsPartition(topicPartition) || isPartitionPendingAdd(topicPartition)) {
415417
return;
416418
} else {
@@ -854,11 +856,16 @@ synchronized TxnRequestHandler nextRequest(boolean hasIncompleteBatches) {
854856
return null;
855857
}
856858

857-
if (nextRequestHandler.isEndTxn() && (!isTransactionV2Enabled() && !transactionStarted)) {
859+
if (nextRequestHandler.isEndTxn() && !transactionStarted) {
858860
nextRequestHandler.result.done();
859861
if (currentState != State.FATAL_ERROR) {
860-
log.debug("Not sending EndTxn for completed transaction since no partitions " +
861-
"or offsets were successfully added");
862+
if (isTransactionV2Enabled) {
863+
log.debug("Not sending EndTxn for completed transaction since no send " +
864+
"or sendOffsetsToTransaction were triggered");
865+
} else {
866+
log.debug("Not sending EndTxn for completed transaction since no partitions " +
867+
"or offsets were successfully added");
868+
}
862869
completeTransaction();
863870
}
864871
nextRequestHandler = pendingRequests.poll();

core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala

Lines changed: 0 additions & 88 deletions
This file was deleted.
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.coordinator.transaction
19+
20+
import kafka.network.SocketServer
21+
import kafka.server.IntegrationTestUtils
22+
import org.apache.kafka.clients.admin.{Admin, NewTopic, TransactionState}
23+
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords, OffsetAndMetadata}
24+
import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
25+
import org.apache.kafka.common.config.TopicConfig
26+
import org.apache.kafka.common.errors.RecordTooLargeException
27+
import org.apache.kafka.common.TopicPartition
28+
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, ClusterTests, Type}
29+
import org.apache.kafka.common.message.InitProducerIdRequestData
30+
import org.apache.kafka.common.network.ListenerName
31+
import org.apache.kafka.common.protocol.Errors
32+
import org.apache.kafka.common.record.RecordBatch
33+
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
34+
import org.apache.kafka.common.test.TestUtils
35+
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
36+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
37+
import org.apache.kafka.server.common.{Feature, MetadataVersion}
38+
import org.junit.jupiter.api.Assertions.{assertEquals, assertInstanceOf, assertThrows, assertTrue}
39+
import org.junit.jupiter.api.extension.ExtendWith
40+
41+
import java.time.Duration
42+
import java.util
43+
import java.util.Collections
44+
import java.util.concurrent.ExecutionException
45+
import java.util.stream.{Collectors, IntStream, StreamSupport}
46+
import scala.concurrent.duration.DurationInt
47+
import scala.jdk.CollectionConverters._
48+
49+
@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
50+
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
51+
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
52+
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
53+
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
54+
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
55+
))
56+
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
57+
class ProducerIntegrationTest {
58+
59+
@ClusterTests(Array(
60+
new ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV0)
61+
))
62+
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
63+
verifyUniqueIds(clusterInstance)
64+
}
65+
66+
@ClusterTests(Array(
67+
new ClusterTest(features = Array(
68+
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
69+
new ClusterTest(features = Array(
70+
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
71+
new ClusterTest(features = Array(
72+
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
73+
))
74+
def testTransactionWithAndWithoutSend(cluster: ClusterInstance): Unit = {
75+
val properties = new util.HashMap[String, Object]
76+
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar")
77+
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
78+
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
79+
val producer: Producer[Array[Byte], Array[Byte]] = cluster.producer(properties)
80+
try {
81+
producer.initTransactions()
82+
producer.beginTransaction()
83+
producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes))
84+
producer.commitTransaction()
85+
86+
producer.beginTransaction()
87+
producer.commitTransaction()
88+
} finally if (producer != null) producer.close()
89+
}
90+
91+
@ClusterTests(Array(
92+
new ClusterTest(features = Array(
93+
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
94+
new ClusterTest(features = Array(
95+
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
96+
new ClusterTest(features = Array(
97+
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
98+
))
99+
def testTransactionWithInvalidSendAndEndTxnRequestSent(cluster: ClusterInstance): Unit = {
100+
val topic = new NewTopic("foobar", 1, 1.toShort).configs(Collections.singletonMap(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "100"))
101+
val txnId = "test-txn"
102+
val properties = new util.HashMap[String, Object]
103+
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
104+
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
105+
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
106+
107+
val admin = cluster.admin()
108+
val producer: Producer[Array[Byte], Array[Byte]] = cluster.producer(properties)
109+
try {
110+
admin.createTopics(List(topic).asJava)
111+
112+
producer.initTransactions()
113+
producer.beginTransaction()
114+
assertInstanceOf(classOf[RecordTooLargeException],
115+
assertThrows(classOf[ExecutionException],
116+
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
117+
topic.name(), Array.fill(100)(0: Byte), Array.fill(100)(0: Byte))).get()).getCause)
118+
119+
producer.abortTransaction()
120+
} finally {
121+
if (admin != null) admin.close()
122+
if (producer != null) producer.close()
123+
}
124+
}
125+
126+
@ClusterTests(Array(
127+
new ClusterTest(features = Array(
128+
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
129+
new ClusterTest(features = Array(
130+
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
131+
new ClusterTest(features = Array(
132+
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
133+
))
134+
def testTransactionWithSendOffset(cluster: ClusterInstance): Unit = {
135+
val inputTopic: String = "my-input-topic"
136+
var producer: Producer[Array[Byte], Array[Byte]] = cluster.producer
137+
try {
138+
for (i <- 0 until 5) {
139+
val key: Array[Byte] = ("key-" + i).getBytes
140+
val value: Array[Byte] = ("value-" + i).getBytes
141+
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](inputTopic, key, value)).get
142+
}
143+
} finally if (producer != null) producer.close()
144+
145+
val txnId: String = "foobar"
146+
val producerProperties: util.Map[String, Object] = new util.HashMap[String, Object]
147+
producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
148+
producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
149+
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
150+
151+
val consumerProperties: util.Map[String, Object] = new util.HashMap[String, Object]
152+
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
153+
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
154+
155+
producer = cluster.producer(producerProperties)
156+
val consumer: Consumer[Array[Byte], Array[Byte]] = cluster.consumer(consumerProperties)
157+
try {
158+
producer.initTransactions()
159+
producer.beginTransaction()
160+
consumer.subscribe(util.List.of(inputTopic))
161+
var records: ConsumerRecords[Array[Byte], Array[Byte]] = null
162+
TestUtils.waitForCondition(() => {
163+
records = consumer.poll(Duration.ZERO)
164+
records.count == 5
165+
}, "poll records size not match")
166+
val lastRecord = StreamSupport.stream(records.spliterator, false).reduce((_, second) => second).orElse(null)
167+
val offsets = Collections.singletonMap(
168+
new TopicPartition(lastRecord.topic, lastRecord.partition), new OffsetAndMetadata(lastRecord.offset + 1))
169+
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata)
170+
producer.commitTransaction()
171+
} finally {
172+
if (producer != null) producer.close()
173+
if (consumer != null) consumer.close()
174+
}
175+
176+
val admin: Admin = cluster.admin
177+
try {
178+
TestUtils.waitForCondition(() => {
179+
admin.listTransactions.all.get.stream
180+
.filter(txn => txn.transactionalId == txnId)
181+
.anyMatch(txn => txn.state eq TransactionState.COMPLETE_COMMIT)
182+
}, "transaction is not in COMPLETE_COMMIT state")
183+
} finally if (admin != null) admin.close()
184+
}
185+
186+
private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
187+
// Request enough PIDs from each broker to ensure each broker generates two blocks
188+
val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker => {
189+
IntStream.range(0, 1001).parallel().mapToObj( _ =>
190+
nextProducerId(broker, clusterInstance.clientListener())
191+
)}).collect(Collectors.toList[Long]).asScala.toSeq
192+
193+
val brokerCount = clusterInstance.brokerIds.size
194+
val expectedTotalCount = 1001 * brokerCount
195+
assertEquals(expectedTotalCount, ids.size, s"Expected exactly $expectedTotalCount IDs")
196+
assertEquals(expectedTotalCount, ids.distinct.size, "Found duplicate producer IDs")
197+
}
198+
199+
private def nextProducerId(broker: SocketServer, listener: ListenerName): Long = {
200+
// Generating producer ids may fail while waiting for the initial block and also
201+
// when the current block is full and waiting for the prefetched block.
202+
val deadline = 5.seconds.fromNow
203+
var shouldRetry = true
204+
var response: InitProducerIdResponse = null
205+
while (shouldRetry && deadline.hasTimeLeft()) {
206+
val data = new InitProducerIdRequestData()
207+
.setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
208+
.setProducerId(RecordBatch.NO_PRODUCER_ID)
209+
.setTransactionalId(null)
210+
.setTransactionTimeoutMs(10)
211+
val request = new InitProducerIdRequest.Builder(data).build()
212+
213+
response = IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request,
214+
destination = broker,
215+
listenerName = listener)
216+
217+
shouldRetry = response.data.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code
218+
}
219+
assertTrue(deadline.hasTimeLeft())
220+
assertEquals(Errors.NONE.code, response.data.errorCode)
221+
response.data().producerId()
222+
}
223+
}

0 commit comments

Comments
 (0)