You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
public ConsumerRecords<K, V> poll(long timeoutMillis) {
try {
QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
if (item == null) {
return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
int numberOfRecords = 0;
while (item != null) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message<byte[]> msg = item.message;
MessageId msgId = msg.getMessageId();
if (msgId instanceof TopicMessageIdImpl) {
msgId = ((TopicMessageIdImpl) msgId).getInnerMessageId();
}
long offset = MessageIdUtils.getOffset(msgId);
TopicPartition tp = new TopicPartition(topic, partition);
if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
resetOffsets(tp);
}
// .. other code
// If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
This code can not discriminate partitioned-topic or non-paritioned-topic.
The text was updated successfully, but these errors were encountered:
Reproduce
error
probable reason
PulsarKafkaConsumer -> poll
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
This code can not discriminate partitioned-topic or non-paritioned-topic.
The text was updated successfully, but these errors were encountered: