Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
Expand All @@ -67,6 +69,7 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -843,6 +846,144 @@ public void testExplicitAcknowledgeThrowsNotInBatch() {
}
}

@ClusterTest
public void testExplicitOverrideAcknowledgeCorruptedMessage() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
null,
mockErrorDeserializer(3))) {

ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record1);
producer.send(record2);
producer.send(record3);
producer.flush();

shareConsumer.subscribe(Set.of(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
assertEquals(2, records.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();

ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
assertEquals(1L, secondRecord.offset());
shareConsumer.acknowledge(firstRecord);
shareConsumer.acknowledge(secondRecord);

RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
assertEquals(2, rde.offset());
shareConsumer.commitSync();

// The corrupted record was automatically released, so we can still obtain it.
rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
assertEquals(2, rde.offset());

// Reject this record
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
shareConsumer.commitSync();

records = shareConsumer.poll(Duration.ZERO);
assertEquals(0, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
}

@ClusterTest
public void testExplicitAcknowledgeOffsetThrowsNotException() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {

ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();

shareConsumer.subscribe(Set.of(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
assertEquals(1, records.count());
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
assertEquals(0L, consumedRecord.offset());

assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), AcknowledgeType.ACCEPT));

shareConsumer.acknowledge(consumedRecord);
verifyShareGroupStateTopicRecordsProduced();
}
}

@ClusterTest
public void testExplicitAcknowledgeOffsetThrowsParametersError() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
null,
mockErrorDeserializer(2))) {

ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record1);
producer.send(record2);
producer.flush();

shareConsumer.subscribe(Set.of(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
assertEquals(1, records.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();

ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
shareConsumer.acknowledge(firstRecord);

final RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
assertEquals(1, rde.offset());

assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge("foo", rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT));
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), AcknowledgeType.REJECT));
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, AcknowledgeType.REJECT));

// Reject this record.
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
shareConsumer.commitSync();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And what happens if you repeat the acknowledge call after the commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I have updated the patch.


// The next acknowledge() should throw an IllegalStateException as the record has been acked.
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT));

records = shareConsumer.poll(Duration.ZERO);
assertEquals(0, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
}

private ByteArrayDeserializer mockErrorDeserializer(int recordNumber) {
int recordIndex = recordNumber - 1;
return new ByteArrayDeserializer() {
int i = 0;

@Override
public byte[] deserialize(String topic, Headers headers, ByteBuffer data) {
if (i == recordIndex) {
throw new SerializationException();
} else {
i++;
return super.deserialize(topic, headers, data);
}
}
};
}

@ClusterTest
public void testImplicitAcknowledgeFailsExplicit() {
alterShareAutoOffsetReset("group1", "earliest");
Expand Down Expand Up @@ -2794,13 +2935,22 @@ private <K, V> ShareConsumer<K, V> createShareConsumer(String groupId) {
private <K, V> ShareConsumer<K, V> createShareConsumer(
String groupId,
Map<?, ?> additionalProperties
) {
return createShareConsumer(groupId, additionalProperties, null, null);
}

private <K, V> ShareConsumer<K, V> createShareConsumer(
String groupId,
Map<?, ?> additionalProperties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer
) {
Properties props = new Properties();
props.putAll(additionalProperties);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Map<String, Object> conf = new HashMap<>();
props.forEach((k, v) -> conf.put((String) k, v));
return cluster.shareConsumer(conf);
return cluster.shareConsumer(conf, keyDeserializer, valueDeserializer);
}

private void warmup() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,28 @@ public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
delegate.acknowledge(record, type);
}

/**
* Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
* it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
* {@link #commitAsync()} or {@link #poll(Duration)} call.
* <p>This method can only be used if the consumer is using <b>explicit acknowledgement</b>.
* <p>It provides an alternative to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for
* situations where the {@link ConsumerRecord} is not available, such as when the record could not be deserialized.
*
* @param topic The topic of the record to acknowledge
* @param partition The partition of the record to acknowledge
* @param offset The offset of the record to acknowledge
* @param type The acknowledge type which indicates whether it was processed successfully
*
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using
* explicit acknowledgement
*/

@Override
public void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
delegate.acknowledge(topic, partition, offset, type);
}

/**
* Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
* the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public synchronized void acknowledge(ConsumerRecord<K, V> record) {
public synchronized void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
}

@Override
public synchronized void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
}

@Override
public synchronized Map<TopicIdPartition, Optional<KafkaException>> commitSync() {
return new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public interface ShareConsumer<K, V> extends Closeable {
*/
void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);

/**
* @see KafkaShareConsumer#acknowledge(String, int, long, AcknowledgeType)
*/
void acknowledge(String topic, int partition, long offset, AcknowledgeType type);

/**
* @see KafkaShareConsumer#commitSync()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import java.util.Set;

/**
* {@link ShareCompletedFetch} represents a {@link RecordBatch batch} of {@link Record records}
Expand Down Expand Up @@ -162,15 +164,15 @@ <K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deseriali

if (cachedBatchException != null) {
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.
rejectRecordBatch(inFlightBatch, currentBatch);
inFlightBatch.setException(cachedBatchException);
Set<Long> offsets = rejectRecordBatch(inFlightBatch, currentBatch);
inFlightBatch.setException(new ShareInFlightBatchException(cachedBatchException, offsets));
cachedBatchException = null;
return inFlightBatch;
}

if (cachedRecordException != null) {
inFlightBatch.addAcknowledgement(lastRecord.offset(), AcknowledgeType.RELEASE);
inFlightBatch.setException(cachedRecordException);
inFlightBatch.setException(new ShareInFlightBatchException(cachedRecordException, Set.of(lastRecord.offset())));
cachedRecordException = null;
return inFlightBatch;
}
Expand Down Expand Up @@ -224,16 +226,16 @@ <K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deseriali
nextAcquired = nextAcquiredRecord();
if (inFlightBatch.isEmpty()) {
inFlightBatch.addAcknowledgement(lastRecord.offset(), AcknowledgeType.RELEASE);
inFlightBatch.setException(se);
inFlightBatch.setException(new ShareInFlightBatchException(se, Set.of(lastRecord.offset())));
} else {
cachedRecordException = se;
inFlightBatch.setHasCachedException(true);
}
} catch (CorruptRecordException e) {
if (inFlightBatch.isEmpty()) {
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.
rejectRecordBatch(inFlightBatch, currentBatch);
inFlightBatch.setException(e);
Set<Long> offsets = rejectRecordBatch(inFlightBatch, currentBatch);
inFlightBatch.setException(new ShareInFlightBatchException(e, offsets));
} else {
cachedBatchException = e;
inFlightBatch.setHasCachedException(true);
Expand Down Expand Up @@ -261,26 +263,29 @@ private OffsetAndDeliveryCount nextAcquiredRecord() {
return null;
}

private <K, V> void rejectRecordBatch(final ShareInFlightBatch<K, V> inFlightBatch,
private <K, V> Set<Long> rejectRecordBatch(final ShareInFlightBatch<K, V> inFlightBatch,
final RecordBatch currentBatch) {
// Rewind the acquiredRecordIterator to the start, so we are in a known state
acquiredRecordIterator = acquiredRecordList.listIterator();

OffsetAndDeliveryCount nextAcquired = nextAcquiredRecord();
Set<Long> offsets = new HashSet<>();
for (long offset = currentBatch.baseOffset(); offset <= currentBatch.lastOffset(); offset++) {
if (nextAcquired == null) {
// No more acquired records, so we are done
break;
} else if (offset == nextAcquired.offset) {
// It's acquired, so we reject it
inFlightBatch.addAcknowledgement(offset, AcknowledgeType.REJECT);
offsets.add(offset);
} else if (offset < nextAcquired.offset) {
// It's not acquired, so we skip it
continue;
}

nextAcquired = nextAcquiredRecord();
}
return offsets;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ public void unsubscribe() {
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
Timer timer = time.timer(timeout);

Expand Down Expand Up @@ -601,6 +602,9 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
} while (timer.notExpired());

return ConsumerRecords.empty();
} catch (ShareFetchException e) {
currentFetch = (ShareFetch<K, V>) e.shareFetch();
throw e.cause();
} finally {
kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
release();
Expand Down Expand Up @@ -692,6 +696,19 @@ public void acknowledge(final ConsumerRecord<K, V> record, final AcknowledgeType
}
}

/**
* {@inheritDoc}
*/
public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) {
acquireAndEnsureOpen();
try {
ensureExplicitAcknowledgement();
currentFetch.acknowledge(topic, partition, offset, type);
} finally {
release();
}
}

/**
* {@inheritDoc}
*/
Expand Down
Loading