Skip to content

KAFKA-18401: Transaction version 2 does not support commit transaction without records #18448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Ma
if (isTransactionV2Enabled()) {
log.debug("Begin adding offsets {} for consumer group {} to transaction with transaction protocol V2", offsets, groupMetadata);
handler = txnOffsetCommitHandler(null, offsets, groupMetadata);
transactionStarted = true;
} else {
log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, groupMetadata);
AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(
Expand Down Expand Up @@ -411,6 +412,7 @@ public synchronized void maybeAddPartition(TopicPartition topicPartition) {
} else if (isTransactionV2Enabled()) {
txnPartitionMap.getOrCreate(topicPartition);
partitionsInTransaction.add(topicPartition);
transactionStarted = true;
} else if (transactionContainsPartition(topicPartition) || isPartitionPendingAdd(topicPartition)) {
return;
} else {
Expand Down Expand Up @@ -854,11 +856,16 @@ synchronized TxnRequestHandler nextRequest(boolean hasIncompleteBatches) {
return null;
}

if (nextRequestHandler.isEndTxn() && (!isTransactionV2Enabled() && !transactionStarted)) {
if (nextRequestHandler.isEndTxn() && !transactionStarted) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit here: I wonder if we can change the log here for tv2 so that we say we didn't attempt to add any partitions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, could you give me some hint? Did you mean we need to modify the log in line 862

                log.debug("Not sending EndTxn for completed transaction since no partitions " +
                        "or offsets were successfully added");

to something else when using TV2?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right -- the implication of successfully added is a little confusing in the TV2 case because it is not a matter of "successfully" adding but a matter of not attempting to add any at all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I modified the log to Not sending EndTxn for completed transaction since no send or sendOffsetsToTransaction were triggered under TV2. Thank you.

nextRequestHandler.result.done();
if (currentState != State.FATAL_ERROR) {
log.debug("Not sending EndTxn for completed transaction since no partitions " +
"or offsets were successfully added");
if (isTransactionV2Enabled) {
log.debug("Not sending EndTxn for completed transaction since no send " +
"or sendOffsetsToTransaction were triggered");
} else {
log.debug("Not sending EndTxn for completed transaction since no partitions " +
"or offsets were successfully added");
}
completeTransaction();
}
nextRequestHandler = pendingRequests.poll();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.coordinator.transaction

import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils
import org.apache.kafka.clients.admin.{Admin, NewTopic, TransactionState}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, ClusterTests, Type}
import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
import org.apache.kafka.common.test.TestUtils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertEquals, assertInstanceOf, assertThrows, assertTrue}
import org.junit.jupiter.api.extension.ExtendWith

import java.time.Duration
import java.util
import java.util.Collections
import java.util.concurrent.ExecutionException
import java.util.stream.{Collectors, IntStream, StreamSupport}
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._

@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
))
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ProducerIntegrationTest {

@ClusterTests(Array(
new ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV0)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)
}

@ClusterTests(Array(
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
))
def testTransactionWithAndWithoutSend(cluster: ClusterInstance): Unit = {
val properties = new util.HashMap[String, Object]
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar")
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
val producer: Producer[Array[Byte], Array[Byte]] = cluster.producer(properties)
try {
producer.initTransactions()
producer.beginTransaction()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes))
producer.commitTransaction()

producer.beginTransaction()
producer.commitTransaction()
} finally if (producer != null) producer.close()
}

@ClusterTests(Array(
new ClusterTest(
features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
new ClusterTest(
features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
new ClusterTest(
features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
))
def testTransactionWithInvalidSend(cluster: ClusterInstance): Unit = {
val topic = new NewTopic("foobar", 1, 1.toShort).configs(Collections.singletonMap(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "1"))
val admin = cluster.admin()
var txnVersion: Short = 0
try {
txnVersion = Option(admin.describeFeatures().featureMetadata().get().finalizedFeatures().get(Feature.TRANSACTION_VERSION))
.map(finalizedFeatures => finalizedFeatures.maxVersionLevel())
.getOrElse(0)
admin.createTopics(List(topic).asJava)
} finally if (admin != null) admin.close()

val properties = new util.HashMap[String, Object]
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar")
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

val producer: Producer[Array[Byte], Array[Byte]] = cluster.producer(properties)
try {
producer.initTransactions()
producer.beginTransaction()
assertInstanceOf(classOf[RecordTooLargeException],
assertThrows(classOf[ExecutionException],
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic.name(), "key".getBytes, "value".getBytes)).get()).getCause)

val commitError = assertThrows(classOf[KafkaException], () => producer.commitTransaction()) // fail due to last send failed
assertInstanceOf(classOf[RecordTooLargeException], commitError.getCause)

if (txnVersion == 2) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! happy to see this worked!
small nit -- is there a reason to run this test on the other versions? I think we should either have cases to test for versions 0 and 1 or not run those for this one. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I see your comment now. I will look at the revised version when it is ready, but I think the nit still stands.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit -- is there a reason to run this test on the other versions? I think we should either have cases to test for versions 0 and 1 or not run those for this one. WDYT?

Yeah.. though I think the test case perhaps is wrong. But I observed that the abortTransaction in this test case only works under TV_2 instead of TV_0 and TV_1. I haven't look into it yet, not sure why...

Copy link
Member Author

@brandboat brandboat Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, in the beginning I thought RecordTooLargeException will put transaction in FATAL_ERROR state in TransactionManager, but the state is ABORTABLE_ERROR actually, so the test seems in the right direction.

I'll deep dive and check why abortTransaction will hang under TV_0 & TV_1 in the test case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jolshan , I just update the test, now the test seems ok.

small nit -- is there a reason to run this test on the other versions? I think we should either have cases to test for versions 0 and 1 or not run those for this one. WDYT?

After I increment max.message.bytes to 100, abortTransaction not hanging anymore with TV_0 and TV_1, but I didn't find out the root cause of why doing this could solve the hanging issue...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what the ticket would be. I will continue to think about this, but I don't think it needs to block this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recreated the failure locally and it seems like for TV0 and TV1 we are unable to join the ioThread when trying to close the producer. With TV2, the thread seems to join fine. Were you observing this as well?

Copy link
Member Author

@brandboat brandboat Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply, the comment #18448 (comment) was incorrect—apologies for the misleading input. The behavior I was referring to, where the response (EndTxnResponse) is called first, followed by WriteTxnMarkerRequest, is actually the same for TV0 and TV1.

I recreated the failure locally and it seems like for TV0 and TV1 we are unable to join the ioThread when trying to close the producer. With TV2, the thread seems to join fine. Were you observing this as well?

What I observed in TV0 and TV1 is that the producer continuously sends InitProducerId requests[0][1] because it encounters Errors.CONCURRENT_TRANSACTIONS. Since this is a retriable error, the producer enters a retry loop, repeatedly sending InitProducerId requests to the broker. But the last transaction remains incomplete, the entire process gets stuck in an endless retry loop, and causing the test time out.

The CONCURRENT_TRANSACTIONS error occurs because WriteTxnMarker fails due to a RecordTooLargeException, preventing the transaction state from correctly transitioning to the CompleteAbort state.

Although TV2 passes in this scenario, we can still reproduce the CONCURRENT_TRANSACTIONS error if we add the following lines after abortTransaction(). This happens because WriteTxnMarker failed in the previous abort attempt:

      producer.beginTransaction()
      producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes))
      producer.commitTransaction()

I'm not sure if this is something we need to aware of...

Copy link
Member

@jolshan jolshan Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me ponder this a bit. Thanks @brandboat for the deeper investigation. Is this because the TV0, TV1, send intProducerId when it needs to bump the epoch? This is a difference between TV0/1 and TV2.

If this is correct, the issue is that any request after EndTxn will see concurrent transactions, which makes sense and is expected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is correct, the issue is that any request after EndTxn will see concurrent transactions, which makes sense and is expected.

Yep, that's the root cause, glad to hear that this is not an issue. Thanks for the explanation. 😃

producer.abortTransaction() // success under transaction version 2
}
} finally if (producer != null) producer.close()
}

@ClusterTests(Array(
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
new ClusterTest(features = Array(
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
))
def testTransactionWithSendOffset(cluster: ClusterInstance): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test new behavior/one related to the change? The one other thing I was curious about was a test where we try to send a record, it fails to be added and we should be able to abort, but a commit (in the case where a partition was not added) will fail. That one might be tricky to code -- especially the part where we fail to add the partition on the server side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test new behavior/one related to the change?

Yes, I want to ensure that if a transaction with sendOffsetToTransaction triggered and no sendRecords are fired, then executing commitTransaction should not raise any errors. And on the server side, the transaction state should be COMPLETE_COMMIT, which means the END_TXN request has been sent to the broker.

The one other thing I was curious about was a test where we try to send a record, it fails to be added and we should be able to abort, but a commit (in the case where a partition was not added) will fail. That one might be tricky to code -- especially the part where we fail to add the partition on the server side.

Hmm... I need some time to figure out how to make adding the partition fail on server side. Once that's done, I think I can come up with the test you mentioned.

Thanks for the suggestion!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one test, testFencingOnAddPartitions in TransactionTest.scala [0], which seems to do something similar—creating a scenario where TransactionCoordinator#handleAddPartitionsToTransaction raises an error, Errors.PRODUCER_FENCED due to a mismatch in the producer epoch [1] and cause AddPartitionToTxn request failed.

However, in this test, we cannot commit or abort the transaction successfully. Committing the transaction directly fails from the client side since the last error caused by the failed send is not empty[2]. In the case of aborting the transaction, since there is a newer producer (producer2), the broker raises the error Errors.PRODUCER_FENCED, causing the abort to fail.

Perhaps the producer fenced test case is not what we want? I'm just wondering under what circumstances the AddPartitionToTxn would fail and in producer we can abort transaction successfully but commit transaction would fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit tricky. Depending on the error, a producer will enter a fatal error state or an abortable error state. In the abortable error state, we can always abort, but not commit. In fatal state, we can't do anything. The fenced error I believe puts the producer in fatal state. There are a few other errors that are only "abortable" but not sure if they can easily come up in test scenarios. Some there are some auth errors like this for example, but that may be hard to replicate on a test. KIP-1050 does a pretty good job explaining the current state of transactional errors. You can see in the "current handling" columns of the tables in the KIP. https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050:+Consistent+error+handling+for+Transactions

I'm thinking though that this might be a bit complicated for testing, and it is ok to not include if it is too much trouble. I think the main thing I wanted to confirm is that if we attempted to send a record/offset, we can no longer do an empty commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main thing I wanted to confirm is that if we attempted to send a record/offset, we can no longer do an empty commit.

Sorry for the delay, I was a little bit confused about this comment, what does an empty commit means? Does testTransactionWithAndWithoutSend in ProducerIntegrationTest.scala cover that?

Copy link
Member Author

@brandboat brandboat Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or did you mean something like

      producer.initTransactions()
      producer.beginTransaction()
      producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes)).get()

      producer.beginTransaction() // <-- fail TransactionalId foobar: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
      producer.commitTransaction()

in this case, it would fail in client side actually

Copy link
Member

@jolshan jolshan Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not being clear. I meant something similar to the above, but we wouldn't call begin transaction again. Something like

      producer.initTransactions()
      producer.beginTransaction()
      producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes)).get() 
      // we would create a case where this fails and we wouldn't actually write a record. I think it would take us to an abortable state

      producer.commitTransaction() // fails

Basically, just wanted to confirm that once we hit "true" for transaction started, we don't skip the EndTxn in the new code that has been added. I guess maybe this is a very specific scenario with the send failing and it is sufficient to just have the test where once we send/add offsets, we confirm the EndTxn sent. (No need for the error.) If we do that, I think your current test may be sufficient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, I think I need to figure out a scenario that producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes)).get() failed to send record. Will update the test ASAP!

Copy link
Member Author

@brandboat brandboat Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I just found my last update test is still wrong... I'm still using fatal error scenario, will try to find other way, plz ignore the commit [bded873](bded873)

val inputTopic: String = "my-input-topic"
var producer: Producer[Array[Byte], Array[Byte]] = cluster.producer
try {
for (i <- 0 until 5) {
val key: Array[Byte] = ("key-" + i).getBytes
val value: Array[Byte] = ("value-" + i).getBytes
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](inputTopic, key, value)).get
}
} finally if (producer != null) producer.close()

val txnId: String = "foobar"
val producerProperties: util.Map[String, Object] = new util.HashMap[String, Object]
producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

val consumerProperties: util.Map[String, Object] = new util.HashMap[String, Object]
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

producer = cluster.producer(producerProperties)
val consumer: Consumer[Array[Byte], Array[Byte]] = cluster.consumer(consumerProperties)
try {
producer.initTransactions()
producer.beginTransaction()
consumer.subscribe(util.List.of(inputTopic))
var records: ConsumerRecords[Array[Byte], Array[Byte]] = null
TestUtils.waitForCondition(() => {
records = consumer.poll(Duration.ZERO)
records.count == 5
}, "poll records size not match")
val lastRecord = StreamSupport.stream(records.spliterator, false).reduce((_, second) => second).orElse(null)
val offsets = Collections.singletonMap(
new TopicPartition(lastRecord.topic, lastRecord.partition), new OffsetAndMetadata(lastRecord.offset + 1))
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata)
producer.commitTransaction()
} finally {
if (producer != null) producer.close()
if (consumer != null) consumer.close()
}

val admin: Admin = cluster.admin
try {
TestUtils.waitForCondition(() => {
admin.listTransactions.all.get.stream
.filter(txn => txn.transactionalId == txnId)
.anyMatch(txn => txn.state eq TransactionState.COMPLETE_COMMIT)
}, "transaction is not in COMPLETE_COMMIT state")
} finally if (admin != null) admin.close()
}

private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
// Request enough PIDs from each broker to ensure each broker generates two blocks
val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker => {
IntStream.range(0, 1001).parallel().mapToObj( _ =>
nextProducerId(broker, clusterInstance.clientListener())
)}).collect(Collectors.toList[Long]).asScala.toSeq

val brokerCount = clusterInstance.brokerIds.size
val expectedTotalCount = 1001 * brokerCount
assertEquals(expectedTotalCount, ids.size, s"Expected exactly $expectedTotalCount IDs")
assertEquals(expectedTotalCount, ids.distinct.size, "Found duplicate producer IDs")
}

private def nextProducerId(broker: SocketServer, listener: ListenerName): Long = {
// Generating producer ids may fail while waiting for the initial block and also
// when the current block is full and waiting for the prefetched block.
val deadline = 5.seconds.fromNow
var shouldRetry = true
var response: InitProducerIdResponse = null
while (shouldRetry && deadline.hasTimeLeft()) {
val data = new InitProducerIdRequestData()
.setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
.setProducerId(RecordBatch.NO_PRODUCER_ID)
.setTransactionalId(null)
.setTransactionTimeoutMs(10)
val request = new InitProducerIdRequest.Builder(data).build()

response = IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request,
destination = broker,
listenerName = listener)

shouldRetry = response.data.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code
}
assertTrue(deadline.hasTimeLeft())
assertEquals(Errors.NONE.code, response.data.errorCode)
response.data().producerId()
}
}