Skip to content

Commit

Permalink
Using multiple successive polls to get as many messages as needed
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoutanov committed Jun 10, 2019
1 parent 56ddcab commit 7e0bc62
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.obsidiandynamics.kafdrop</groupId>
<artifactId>kafdrop</artifactId>
<version>3.4.0-SNAPSHOT</version>
<version>3.3.1</version>

<description>For when you have a Kafka cluster to monitor</description>

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public String viewMessageForm(@PathVariable("name") String topicName,
messageInspector.getMessages(topicName,
messageForm.getPartition(),
messageForm.getOffset(),
messageForm.getCount(),
messageForm.getCount().intValue(),
deserializer));

}
Expand All @@ -119,7 +119,7 @@ List<Object> getPartitionOrMessages(
@PathVariable("name") String topicName,
@RequestParam(name = "partition", required = false) Integer partition,
@RequestParam(name = "offset", required = false) Long offset,
@RequestParam(name = "count", required = false) Long count
@RequestParam(name = "count", required = false) Integer count
) {
if (partition == null || offset == null || count == null) {
final TopicVO topic = kafkaMonitor.getTopic(topicName)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/kafdrop/service/CuratorKafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private Map<String, TopicVO> getTopicMetadata(String... topics) {
}

@Override
public List<MessageVO> getMessages(TopicPartition topicPartition, long offset, long count,
public List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int count,
MessageDeserializer deserializer) {
final List<ConsumerRecord<String, String>> records =
kafkaHighLevelConsumer.getLatestRecords(topicPartition, offset, count, deserializer);
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,20 @@ synchronized Map<Integer, TopicPartitionVO> getPartitionSize(String topic) {
return partitionsVo;
}

synchronized List<ConsumerRecord<String, String>> getLatestRecords(TopicPartition topicPartition, long offset, Long count,
synchronized List<ConsumerRecord<String, String>> getLatestRecords(TopicPartition topicPartition, long offset, int count,
MessageDeserializer deserializer) {
initializeClient();
kafkaConsumer.assign(Collections.singletonList(topicPartition));
kafkaConsumer.seek(topicPartition, offset);

final var rawRecords = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
final var numRecords = rawRecords.count();
return rawRecords.records(topicPartition)
.subList(0, Math.min(count.intValue(), numRecords))
final var rawRecords = new ArrayList<ConsumerRecord<String, byte[]>>(count);
while (rawRecords.size() <= count) {
final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)).records(topicPartition);
if (polled.isEmpty()) break;
rawRecords.addAll(polled);
}
return rawRecords
.subList(0, Math.min(count, rawRecords.size()))
.stream()
.map(rec -> new ConsumerRecord<>(rec.topic(),
rec.partition(),
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface KafkaMonitor {

List<TopicVO> getTopics();

List<MessageVO> getMessages(TopicPartition topicPartition, long offset, long count,
List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int count,
MessageDeserializer deserializer);

Optional<TopicVO> getTopic(String topic);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/kafdrop/service/MessageInspector.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public MessageInspector(KafkaMonitor kafkaMonitor) {
this.kafkaMonitor = kafkaMonitor;
}

public List<MessageVO> getMessages(String topicName, int partitionId, long offset, long count,
public List<MessageVO> getMessages(String topicName, int partitionId, long offset, int count,
MessageDeserializer deserializer) {
final var topicPartition = new TopicPartition(topicName, partitionId);
return kafkaMonitor.getMessages(topicPartition, offset, count, deserializer);
Expand Down

0 comments on commit 7e0bc62

Please sign in to comment.