-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-19471: Enable acknowledgement for a record which could not be deserialized #20148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I've only taken a very cursory look so far and have one comment. I'll review more deeply soon.
return new KafkaShareConsumer<>(configs, null, null); | ||
} | ||
|
||
default <K, V> ShareConsumer<K, V> shareConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { | ||
Map<String, Object> props = new HashMap<>(configs); | ||
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're providing instances of the deserializers, I don't think you should be setting the class names in the config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
Lines 743 to 744 in 2346c0e
if (keyDeserializer != null) | |
newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); |
Thanks for your review. During initialization, if the keyDeserializer is set, it will override the class name in the config.
Of course, the explicit check here makes the logic clearer. I'll fix it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I think it's along the right lines, but the validation of the parameters is too loose as it stands. I suggest introducing some kind of exceptional batch that contains a single record, and then validating the parameters against this.
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) { | ||
TopicIdPartition tip = tipBatch.getKey(); | ||
if (tip.topic().equals(topic) && (tip.partition() == partition)) { | ||
tipBatch.getValue().addAcknowledgement(offset, type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checking here is not tight enough. If the user calls acknowledge(ConsumerRecord, AcknowledgeType)
, the code makes sure that the in-flight records include the offset (see ShareInFlightBatch.acknowledge(ConsumerRecord, AcknowledgeType)
). However, in this case, there is no validation of the offset. The handling of an exceptional batch needs to be a bit more sophisticated I think. It should not be possible to use ShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType)
with an exceptional batch. It should only be possible to use ShareConsumer.acknowlege(String, int, long, AcknowledgeType)
and only for the specific parameters that were retrieved from the RecordDeserializationException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Appreciate the suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. That's looking good now.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchException.java
Outdated
Show resolved
Hide resolved
TopicIdPartition tip = tipBatch.getKey(); | ||
KafkaException shareException = tipBatch.getValue().getException(); | ||
if (tip.topic().equals(topic) && (tip.partition() == partition) && | ||
shareException instanceof RecordDeserializationException && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the instanceof
and then the cast. One way around this I suppose would be to ShareInFlightBatch
have an exception which has a known type, ShareFetchException
I guess. Then ShareFetchException
could have methods which extract the topic, partition and offset from the RecordDeserialization
exception, and then you just need shareException.offset() == offset)
. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShareFetchException
was actually created to address the issue mentioned in the comments of KAFKA-19471 preventing exceptions from being swallowed. Therefore, the cause could be either a SerializationException
or a CorruptRecordException
. Consequently, if we use ShareFetchException
, we might still need to perform instanceof
checks followed by casting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was thinking about this case last night and you're absolutely correct. But still, ShareFetchException
could perhaps be able to return an offset if appropriate rather than using instanceof
and casting. There's got to be a more elegant, OO way of achieving this.
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
Outdated
Show resolved
Hide resolved
|
||
// Reject this record | ||
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT); | ||
shareConsumer.commitSync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And what happens if you repeat the acknowledge call after the commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I have updated the patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Looks good to me.
…eserialized (apache#20148) This patch mainly includes two improvements: 1. Update currentFetch when `pollForFetches()` throws an exception. 2. Add an override `KafkaShareConsumer.acknowledge(String topic, int partition, long offset, AcknowledgeType type)` . Reviewers: Andrew Schofield <[email protected]>
This patch mainly includes two improvements:
pollForFetches()
throws an exception.KafkaShareConsumer.acknowledge(String topic, int partition, long offset, AcknowledgeType type)
.Reviewers: Andrew Schofield [email protected]