Skip to content

KAFKA-18401: Transaction version 2 does not support commit transaction without records #18448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Ma
if (isTransactionV2Enabled()) {
log.debug("Begin adding offsets {} for consumer group {} to transaction with transaction protocol V2", offsets, groupMetadata);
handler = txnOffsetCommitHandler(null, offsets, groupMetadata);
transactionStarted = true;
} else {
log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, groupMetadata);
AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(
Expand Down Expand Up @@ -411,6 +412,7 @@ public synchronized void maybeAddPartition(TopicPartition topicPartition) {
} else if (isTransactionV2Enabled()) {
txnPartitionMap.getOrCreate(topicPartition);
partitionsInTransaction.add(topicPartition);
transactionStarted = true;
} else if (transactionContainsPartition(topicPartition) || isPartitionPendingAdd(topicPartition)) {
return;
} else {
Expand Down Expand Up @@ -854,11 +856,16 @@ synchronized TxnRequestHandler nextRequest(boolean hasIncompleteBatches) {
return null;
}

if (nextRequestHandler.isEndTxn() && (!isTransactionV2Enabled() && !transactionStarted)) {
if (nextRequestHandler.isEndTxn() && !transactionStarted) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit here: I wonder if we can change the log here for tv2 so that we say we didn't attempt to add any partitions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, could you give me some hint? Did you mean we need to modify the log in line 862

                log.debug("Not sending EndTxn for completed transaction since no partitions " +
                        "or offsets were successfully added");

to something else when using TV2?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right -- the implication of successfully added is a little confusing in the TV2 case because it is not a matter of "successfully" adding but a matter of not attempting to add any at all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I modified the log to Not sending EndTxn for completed transaction since no send or sendOffsetsToTransaction were triggered under TV2. Thank you.

nextRequestHandler.result.done();
if (currentState != State.FATAL_ERROR) {
log.debug("Not sending EndTxn for completed transaction since no partitions " +
"or offsets were successfully added");
if (isTransactionV2Enabled) {
log.debug("Not sending EndTxn for completed transaction since no send " +
"or sendOffsetsToTransaction were triggered");
} else {
log.debug("Not sending EndTxn for completed transaction since no partitions " +
"or offsets were successfully added");
}
completeTransaction();
}
nextRequestHandler = pendingRequests.poll();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.coordinator.transaction

import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils
import org.apache.kafka.clients.admin.{Admin, NewTopic, TransactionState}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, ClusterTests, Type}
import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
import org.apache.kafka.common.test.TestUtils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertEquals, assertInstanceOf, assertThrows, assertTrue}
import org.junit.jupiter.api.extension.ExtendWith

import java.time.Duration
import java.util
import java.util.Collections
import java.util.concurrent.ExecutionException
import java.util.stream.{Collectors, IntStream, StreamSupport}
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._

@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
))
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ProducerIntegrationTest {

@ClusterTests(Array(
new ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV0)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)
}

@ClusterTests(Array(
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
))
def testTransactionWithAndWithoutSend(cluster: ClusterInstance): Unit = {
val properties = new util.HashMap[String, Object]
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar")
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
val producer: Producer[Array[Byte], Array[Byte]] = cluster.producer(properties)
try {
producer.initTransactions()
producer.beginTransaction()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes))
producer.commitTransaction()

producer.beginTransaction()
producer.commitTransaction()
} finally if (producer != null) producer.close()
}

@ClusterTests(Array(
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
))
def testTransactionWithInvalidSendAndEndTxnRequestSent(cluster: ClusterInstance): Unit = {
val topic = new NewTopic("foobar", 1, 1.toShort).configs(Collections.singletonMap(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "100"))
val txnId = "test-txn"
val properties = new util.HashMap[String, Object]
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

val admin = cluster.admin()
val producer: Producer[Array[Byte], Array[Byte]] = cluster.producer(properties)
try {
admin.createTopics(List(topic).asJava)

producer.initTransactions()
producer.beginTransaction()
assertInstanceOf(classOf[RecordTooLargeException],
assertThrows(classOf[ExecutionException],
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
topic.name(), Array.fill(100)(0: Byte), Array.fill(100)(0: Byte))).get()).getCause)

producer.abortTransaction()
} finally {
if (admin != null) admin.close()
if (producer != null) producer.close()
}
}

@ClusterTests(Array(
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
))
def testTransactionWithSendOffset(cluster: ClusterInstance): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test new behavior/one related to the change? The one other thing I was curious about was a test where we try to send a record, it fails to be added and we should be able to abort, but a commit (in the case where a partition was not added) will fail. That one might be tricky to code -- especially the part where we fail to add the partition on the server side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test new behavior/one related to the change?

Yes, I want to ensure that if a transaction with sendOffsetToTransaction triggered and no sendRecords are fired, then executing commitTransaction should not raise any errors. And on the server side, the transaction state should be COMPLETE_COMMIT, which means the END_TXN request has been sent to the broker.

The one other thing I was curious about was a test where we try to send a record, it fails to be added and we should be able to abort, but a commit (in the case where a partition was not added) will fail. That one might be tricky to code -- especially the part where we fail to add the partition on the server side.

Hmm... I need some time to figure out how to make adding the partition fail on server side. Once that's done, I think I can come up with the test you mentioned.

Thanks for the suggestion!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one test, testFencingOnAddPartitions in TransactionTest.scala [0], which seems to do something similar—creating a scenario where TransactionCoordinator#handleAddPartitionsToTransaction raises an error, Errors.PRODUCER_FENCED due to a mismatch in the producer epoch [1] and cause AddPartitionToTxn request failed.

However, in this test, we cannot commit or abort the transaction successfully. Committing the transaction directly fails from the client side since the last error caused by the failed send is not empty[2]. In the case of aborting the transaction, since there is a newer producer (producer2), the broker raises the error Errors.PRODUCER_FENCED, causing the abort to fail.

Perhaps the producer fenced test case is not what we want? I'm just wondering under what circumstances the AddPartitionToTxn would fail and in producer we can abort transaction successfully but commit transaction would fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit tricky. Depending on the error, a producer will enter a fatal error state or an abortable error state. In the abortable error state, we can always abort, but not commit. In fatal state, we can't do anything. The fenced error I believe puts the producer in fatal state. There are a few other errors that are only "abortable" but not sure if they can easily come up in test scenarios. Some there are some auth errors like this for example, but that may be hard to replicate on a test. KIP-1050 does a pretty good job explaining the current state of transactional errors. You can see in the "current handling" columns of the tables in the KIP. https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050:+Consistent+error+handling+for+Transactions

I'm thinking though that this might be a bit complicated for testing, and it is ok to not include if it is too much trouble. I think the main thing I wanted to confirm is that if we attempted to send a record/offset, we can no longer do an empty commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main thing I wanted to confirm is that if we attempted to send a record/offset, we can no longer do an empty commit.

Sorry for the delay, I was a little bit confused about this comment, what does an empty commit means? Does testTransactionWithAndWithoutSend in ProducerIntegrationTest.scala cover that?

Copy link
Member Author

@brandboat brandboat Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or did you mean something like

      producer.initTransactions()
      producer.beginTransaction()
      producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes)).get()

      producer.beginTransaction() // <-- fail TransactionalId foobar: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
      producer.commitTransaction()

in this case, it would fail in client side actually

Copy link
Member

@jolshan jolshan Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not being clear. I meant something similar to the above, but we wouldn't call begin transaction again. Something like

      producer.initTransactions()
      producer.beginTransaction()
      producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes)).get() 
      // we would create a case where this fails and we wouldn't actually write a record. I think it would take us to an abortable state

      producer.commitTransaction() // fails

Basically, just wanted to confirm that once we hit "true" for transaction started, we don't skip the EndTxn in the new code that has been added. I guess maybe this is a very specific scenario with the send failing and it is sufficient to just have the test where once we send/add offsets, we confirm the EndTxn sent. (No need for the error.) If we do that, I think your current test may be sufficient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, I think I need to figure out a scenario that producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes)).get() failed to send record. Will update the test ASAP!

Copy link
Member Author

@brandboat brandboat Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I just found my last update test is still wrong... I'm still using fatal error scenario, will try to find other way, plz ignore the commit [bded873](bded873)

val inputTopic: String = "my-input-topic"
var producer: Producer[Array[Byte], Array[Byte]] = cluster.producer
try {
for (i <- 0 until 5) {
val key: Array[Byte] = ("key-" + i).getBytes
val value: Array[Byte] = ("value-" + i).getBytes
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](inputTopic, key, value)).get
}
} finally if (producer != null) producer.close()

val txnId: String = "foobar"
val producerProperties: util.Map[String, Object] = new util.HashMap[String, Object]
producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

val consumerProperties: util.Map[String, Object] = new util.HashMap[String, Object]
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

producer = cluster.producer(producerProperties)
val consumer: Consumer[Array[Byte], Array[Byte]] = cluster.consumer(consumerProperties)
try {
producer.initTransactions()
producer.beginTransaction()
consumer.subscribe(util.List.of(inputTopic))
var records: ConsumerRecords[Array[Byte], Array[Byte]] = null
TestUtils.waitForCondition(() => {
records = consumer.poll(Duration.ZERO)
records.count == 5
}, "poll records size not match")
val lastRecord = StreamSupport.stream(records.spliterator, false).reduce((_, second) => second).orElse(null)
val offsets = Collections.singletonMap(
new TopicPartition(lastRecord.topic, lastRecord.partition), new OffsetAndMetadata(lastRecord.offset + 1))
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata)
producer.commitTransaction()
} finally {
if (producer != null) producer.close()
if (consumer != null) consumer.close()
}

val admin: Admin = cluster.admin
try {
TestUtils.waitForCondition(() => {
admin.listTransactions.all.get.stream
.filter(txn => txn.transactionalId == txnId)
.anyMatch(txn => txn.state eq TransactionState.COMPLETE_COMMIT)
}, "transaction is not in COMPLETE_COMMIT state")
} finally if (admin != null) admin.close()
}

private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
// Request enough PIDs from each broker to ensure each broker generates two blocks
val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker => {
IntStream.range(0, 1001).parallel().mapToObj( _ =>
nextProducerId(broker, clusterInstance.clientListener())
)}).collect(Collectors.toList[Long]).asScala.toSeq

val brokerCount = clusterInstance.brokerIds.size
val expectedTotalCount = 1001 * brokerCount
assertEquals(expectedTotalCount, ids.size, s"Expected exactly $expectedTotalCount IDs")
assertEquals(expectedTotalCount, ids.distinct.size, "Found duplicate producer IDs")
}

private def nextProducerId(broker: SocketServer, listener: ListenerName): Long = {
// Generating producer ids may fail while waiting for the initial block and also
// when the current block is full and waiting for the prefetched block.
val deadline = 5.seconds.fromNow
var shouldRetry = true
var response: InitProducerIdResponse = null
while (shouldRetry && deadline.hasTimeLeft()) {
val data = new InitProducerIdRequestData()
.setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
.setProducerId(RecordBatch.NO_PRODUCER_ID)
.setTransactionalId(null)
.setTransactionTimeoutMs(10)
val request = new InitProducerIdRequest.Builder(data).build()

response = IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request,
destination = broker,
listenerName = listener)

shouldRetry = response.data.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code
}
assertTrue(deadline.hasTimeLeft())
assertEquals(Errors.NONE.code, response.data.errorCode)
response.data().producerId()
}
}
Loading