Skip to content

Commit abbb6b3

Browse files
authored
KAFKA-19471: Enable acknowledgement for a record which could not be deserialized (#20148)
This patch mainly includes two improvements: 1. Update currentFetch when `pollForFetches()` throws an exception. 2. Add an override `KafkaShareConsumer.acknowledge(String topic, int partition, long offset, AcknowledgeType type)` . Reviewers: Andrew Schofield <[email protected]>
1 parent 1a176be commit abbb6b3

File tree

14 files changed

+340
-23
lines changed

14 files changed

+340
-23
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 151 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.kafka.common.errors.InvalidConfigurationException;
4242
import org.apache.kafka.common.errors.InvalidRecordStateException;
4343
import org.apache.kafka.common.errors.InvalidTopicException;
44+
import org.apache.kafka.common.errors.RecordDeserializationException;
45+
import org.apache.kafka.common.errors.SerializationException;
4446
import org.apache.kafka.common.errors.WakeupException;
4547
import org.apache.kafka.common.header.Header;
4648
import org.apache.kafka.common.header.Headers;
@@ -67,6 +69,7 @@
6769
import org.junit.jupiter.api.Tag;
6870
import org.junit.jupiter.api.Timeout;
6971

72+
import java.nio.ByteBuffer;
7073
import java.time.Duration;
7174
import java.util.ArrayList;
7275
import java.util.Arrays;
@@ -843,6 +846,144 @@ public void testExplicitAcknowledgeThrowsNotInBatch() {
843846
}
844847
}
845848

849+
@ClusterTest
850+
public void testExplicitOverrideAcknowledgeCorruptedMessage() {
851+
alterShareAutoOffsetReset("group1", "earliest");
852+
try (Producer<byte[], byte[]> producer = createProducer();
853+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
854+
"group1",
855+
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
856+
null,
857+
mockErrorDeserializer(3))) {
858+
859+
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
860+
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
861+
ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
862+
producer.send(record1);
863+
producer.send(record2);
864+
producer.send(record3);
865+
producer.flush();
866+
867+
shareConsumer.subscribe(Set.of(tp.topic()));
868+
869+
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
870+
assertEquals(2, records.count());
871+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
872+
873+
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
874+
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
875+
assertEquals(0L, firstRecord.offset());
876+
assertEquals(1L, secondRecord.offset());
877+
shareConsumer.acknowledge(firstRecord);
878+
shareConsumer.acknowledge(secondRecord);
879+
880+
RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
881+
assertEquals(2, rde.offset());
882+
shareConsumer.commitSync();
883+
884+
// The corrupted record was automatically released, so we can still obtain it.
885+
rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
886+
assertEquals(2, rde.offset());
887+
888+
// Reject this record
889+
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
890+
shareConsumer.commitSync();
891+
892+
records = shareConsumer.poll(Duration.ZERO);
893+
assertEquals(0, records.count());
894+
verifyShareGroupStateTopicRecordsProduced();
895+
}
896+
}
897+
898+
@ClusterTest
899+
public void testExplicitAcknowledgeOffsetThrowsNotException() {
900+
alterShareAutoOffsetReset("group1", "earliest");
901+
try (Producer<byte[], byte[]> producer = createProducer();
902+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
903+
"group1",
904+
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
905+
906+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
907+
producer.send(record);
908+
producer.flush();
909+
910+
shareConsumer.subscribe(Set.of(tp.topic()));
911+
912+
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
913+
assertEquals(1, records.count());
914+
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
915+
assertEquals(0L, consumedRecord.offset());
916+
917+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), AcknowledgeType.ACCEPT));
918+
919+
shareConsumer.acknowledge(consumedRecord);
920+
verifyShareGroupStateTopicRecordsProduced();
921+
}
922+
}
923+
924+
@ClusterTest
925+
public void testExplicitAcknowledgeOffsetThrowsParametersError() {
926+
alterShareAutoOffsetReset("group1", "earliest");
927+
try (Producer<byte[], byte[]> producer = createProducer();
928+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
929+
"group1",
930+
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
931+
null,
932+
mockErrorDeserializer(2))) {
933+
934+
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
935+
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
936+
producer.send(record1);
937+
producer.send(record2);
938+
producer.flush();
939+
940+
shareConsumer.subscribe(Set.of(tp.topic()));
941+
942+
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
943+
assertEquals(1, records.count());
944+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
945+
946+
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
947+
assertEquals(0L, firstRecord.offset());
948+
shareConsumer.acknowledge(firstRecord);
949+
950+
final RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
951+
assertEquals(1, rde.offset());
952+
953+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge("foo", rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT));
954+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), AcknowledgeType.REJECT));
955+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, AcknowledgeType.REJECT));
956+
957+
// Reject this record.
958+
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
959+
shareConsumer.commitSync();
960+
961+
// The next acknowledge() should throw an IllegalStateException as the record has been acked.
962+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT));
963+
964+
records = shareConsumer.poll(Duration.ZERO);
965+
assertEquals(0, records.count());
966+
verifyShareGroupStateTopicRecordsProduced();
967+
}
968+
}
969+
970+
private ByteArrayDeserializer mockErrorDeserializer(int recordNumber) {
971+
int recordIndex = recordNumber - 1;
972+
return new ByteArrayDeserializer() {
973+
int i = 0;
974+
975+
@Override
976+
public byte[] deserialize(String topic, Headers headers, ByteBuffer data) {
977+
if (i == recordIndex) {
978+
throw new SerializationException();
979+
} else {
980+
i++;
981+
return super.deserialize(topic, headers, data);
982+
}
983+
}
984+
};
985+
}
986+
846987
@ClusterTest
847988
public void testImplicitAcknowledgeFailsExplicit() {
848989
alterShareAutoOffsetReset("group1", "earliest");
@@ -2794,13 +2935,22 @@ private <K, V> ShareConsumer<K, V> createShareConsumer(String groupId) {
27942935
private <K, V> ShareConsumer<K, V> createShareConsumer(
27952936
String groupId,
27962937
Map<?, ?> additionalProperties
2938+
) {
2939+
return createShareConsumer(groupId, additionalProperties, null, null);
2940+
}
2941+
2942+
private <K, V> ShareConsumer<K, V> createShareConsumer(
2943+
String groupId,
2944+
Map<?, ?> additionalProperties,
2945+
Deserializer<K> keyDeserializer,
2946+
Deserializer<V> valueDeserializer
27972947
) {
27982948
Properties props = new Properties();
27992949
props.putAll(additionalProperties);
28002950
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
28012951
Map<String, Object> conf = new HashMap<>();
28022952
props.forEach((k, v) -> conf.put((String) k, v));
2803-
return cluster.shareConsumer(conf);
2953+
return cluster.shareConsumer(conf, keyDeserializer, valueDeserializer);
28042954
}
28052955

28062956
private void warmup() throws InterruptedException {

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,28 @@ public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
507507
delegate.acknowledge(record, type);
508508
}
509509

510+
/**
511+
* Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
512+
* it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
513+
* {@link #commitAsync()} or {@link #poll(Duration)} call.
514+
* <p>This method can only be used if the consumer is using <b>explicit acknowledgement</b>.
515+
* <p>It provides an alternative to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for
516+
* situations where the {@link ConsumerRecord} is not available, such as when the record could not be deserialized.
517+
*
518+
* @param topic The topic of the record to acknowledge
519+
* @param partition The partition of the record to acknowledge
520+
* @param offset The offset of the record to acknowledge
521+
* @param type The acknowledge type which indicates whether it was processed successfully
522+
*
523+
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using
524+
* explicit acknowledgement
525+
*/
526+
527+
@Override
528+
public void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
529+
delegate.acknowledge(topic, partition, offset, type);
530+
}
531+
510532
/**
511533
* Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
512534
* the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or

clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ public synchronized void acknowledge(ConsumerRecord<K, V> record) {
104104
public synchronized void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
105105
}
106106

107+
@Override
108+
public synchronized void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
109+
}
110+
107111
@Override
108112
public synchronized Map<TopicIdPartition, Optional<KafkaException>> commitSync() {
109113
return new HashMap<>();

clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public interface ShareConsumer<K, V> extends Closeable {
7070
*/
7171
void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);
7272

73+
/**
74+
* @see KafkaShareConsumer#acknowledge(String, int, long, AcknowledgeType)
75+
*/
76+
void acknowledge(String topic, int partition, long offset, AcknowledgeType type);
77+
7378
/**
7479
* @see KafkaShareConsumer#commitSync()
7580
*/

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@
4141

4242
import java.io.Closeable;
4343
import java.nio.ByteBuffer;
44+
import java.util.HashSet;
4445
import java.util.Iterator;
4546
import java.util.LinkedList;
4647
import java.util.List;
4748
import java.util.ListIterator;
4849
import java.util.Optional;
50+
import java.util.Set;
4951

5052
/**
5153
* {@link ShareCompletedFetch} represents a {@link RecordBatch batch} of {@link Record records}
@@ -162,15 +164,15 @@ <K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deseriali
162164

163165
if (cachedBatchException != null) {
164166
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.
165-
rejectRecordBatch(inFlightBatch, currentBatch);
166-
inFlightBatch.setException(cachedBatchException);
167+
Set<Long> offsets = rejectRecordBatch(inFlightBatch, currentBatch);
168+
inFlightBatch.setException(new ShareInFlightBatchException(cachedBatchException, offsets));
167169
cachedBatchException = null;
168170
return inFlightBatch;
169171
}
170172

171173
if (cachedRecordException != null) {
172174
inFlightBatch.addAcknowledgement(lastRecord.offset(), AcknowledgeType.RELEASE);
173-
inFlightBatch.setException(cachedRecordException);
175+
inFlightBatch.setException(new ShareInFlightBatchException(cachedRecordException, Set.of(lastRecord.offset())));
174176
cachedRecordException = null;
175177
return inFlightBatch;
176178
}
@@ -224,16 +226,16 @@ <K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deseriali
224226
nextAcquired = nextAcquiredRecord();
225227
if (inFlightBatch.isEmpty()) {
226228
inFlightBatch.addAcknowledgement(lastRecord.offset(), AcknowledgeType.RELEASE);
227-
inFlightBatch.setException(se);
229+
inFlightBatch.setException(new ShareInFlightBatchException(se, Set.of(lastRecord.offset())));
228230
} else {
229231
cachedRecordException = se;
230232
inFlightBatch.setHasCachedException(true);
231233
}
232234
} catch (CorruptRecordException e) {
233235
if (inFlightBatch.isEmpty()) {
234236
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.
235-
rejectRecordBatch(inFlightBatch, currentBatch);
236-
inFlightBatch.setException(e);
237+
Set<Long> offsets = rejectRecordBatch(inFlightBatch, currentBatch);
238+
inFlightBatch.setException(new ShareInFlightBatchException(e, offsets));
237239
} else {
238240
cachedBatchException = e;
239241
inFlightBatch.setHasCachedException(true);
@@ -261,26 +263,29 @@ private OffsetAndDeliveryCount nextAcquiredRecord() {
261263
return null;
262264
}
263265

264-
private <K, V> void rejectRecordBatch(final ShareInFlightBatch<K, V> inFlightBatch,
266+
private <K, V> Set<Long> rejectRecordBatch(final ShareInFlightBatch<K, V> inFlightBatch,
265267
final RecordBatch currentBatch) {
266268
// Rewind the acquiredRecordIterator to the start, so we are in a known state
267269
acquiredRecordIterator = acquiredRecordList.listIterator();
268270

269271
OffsetAndDeliveryCount nextAcquired = nextAcquiredRecord();
272+
Set<Long> offsets = new HashSet<>();
270273
for (long offset = currentBatch.baseOffset(); offset <= currentBatch.lastOffset(); offset++) {
271274
if (nextAcquired == null) {
272275
// No more acquired records, so we are done
273276
break;
274277
} else if (offset == nextAcquired.offset) {
275278
// It's acquired, so we reject it
276279
inFlightBatch.addAcknowledgement(offset, AcknowledgeType.REJECT);
280+
offsets.add(offset);
277281
} else if (offset < nextAcquired.offset) {
278282
// It's not acquired, so we skip it
279283
continue;
280284
}
281285

282286
nextAcquired = nextAcquiredRecord();
283287
}
288+
return offsets;
284289
}
285290

286291
/**

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ public void unsubscribe() {
561561
* {@inheritDoc}
562562
*/
563563
@Override
564+
@SuppressWarnings("unchecked")
564565
public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
565566
Timer timer = time.timer(timeout);
566567

@@ -601,6 +602,9 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
601602
} while (timer.notExpired());
602603

603604
return ConsumerRecords.empty();
605+
} catch (ShareFetchException e) {
606+
currentFetch = (ShareFetch<K, V>) e.shareFetch();
607+
throw e.cause();
604608
} finally {
605609
kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
606610
release();
@@ -692,6 +696,19 @@ public void acknowledge(final ConsumerRecord<K, V> record, final AcknowledgeType
692696
}
693697
}
694698

699+
/**
700+
* {@inheritDoc}
701+
*/
702+
public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) {
703+
acquireAndEnsureOpen();
704+
try {
705+
ensureExplicitAcknowledgement();
706+
currentFetch.acknowledge(topic, partition, offset, type);
707+
} finally {
708+
release();
709+
}
710+
}
711+
695712
/**
696713
* {@inheritDoc}
697714
*/

0 commit comments

Comments
 (0)