Skip to content

Commit a275312

Browse files
garyrussellartembilan
authored andcommitted
GH-2891: Fix isAckAfterHandle with No group.id
Resolves #2891 When using manual partition assignment and `AckMode.MANUAL`, it is possible to have a `null` `group.id`, whereby Kafka does not maintain any committed offsets. In this case, we should not attempt to commit the offset after recovery, even if the error handler `ackAfterHandle` property is true. **cherry-pick to 3.0.x, 2.9.x** (cherry picked from commit 9f1050a)
1 parent dae8a42 commit a275312

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2295,7 +2295,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22952295
try {
22962296
this.batchFailed = true;
22972297
invokeBatchErrorHandler(records, recordList, e);
2298-
commitOffsetsIfNeeded(records);
2298+
commitOffsetsIfNeededAfterHandlingError(records);
22992299
}
23002300
catch (KafkaException ke) {
23012301
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
@@ -2316,8 +2316,8 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
23162316
return null;
23172317
}
23182318

2319-
private void commitOffsetsIfNeeded(final ConsumerRecords<K, V> records) {
2320-
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
2319+
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecords<K, V> records) {
2320+
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null)
23212321
|| this.producer != null) {
23222322
if (this.remainingRecords != null) {
23232323
ConsumerRecord<K, V> firstUncommitted = this.remainingRecords.iterator().next();
@@ -2781,7 +2781,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27812781
}
27822782
try {
27832783
invokeErrorHandler(cRecord, iterator, e);
2784-
commitOffsetsIfNeeded(cRecord);
2784+
commitOffsetsIfNeededAfterHandlingError(cRecord);
27852785
}
27862786
catch (KafkaException ke) {
27872787
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
@@ -2800,8 +2800,8 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
28002800
});
28012801
}
28022802

2803-
private void commitOffsetsIfNeeded(final ConsumerRecord<K, V> cRecord) {
2804-
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
2803+
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {
2804+
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null)
28052805
|| this.producer != null) {
28062806
if (this.isManualAck) {
28072807
this.commitRecovered = true;

0 commit comments

Comments
 (0)