From 7fc58ca5378fda82b36b0166be8b388a2c6f4ea9 Mon Sep 17 00:00:00 2001 From: Kacey B Date: Thu, 20 Jun 2019 15:02:59 +1000 Subject: [PATCH 1/9] Added a new topic-messages view to show messages of all partitions in that topic. Messages are sorted by their timestamp. --- .../kafdrop/controller/MessageController.java | 35 ++++++++++++ src/main/java/kafdrop/model/MessageVO.java | 4 ++ .../kafdrop/service/CuratorKafkaMonitor.java | 1 + .../resources/templates/message-inspector.ftl | 6 +- src/main/resources/templates/topic-detail.ftl | 4 +- .../resources/templates/topic-messages.ftl | 55 +++++++++++++++++++ 6 files changed, 102 insertions(+), 3 deletions(-) create mode 100644 src/main/resources/templates/topic-messages.ftl diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 9eb38da1..04eac638 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -53,6 +53,41 @@ public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInsp this.schemaRegistryProperties = schemaRegistryProperties; } + /** + * Human friendly view of reading all topic messages sorted by timestamp. + * @param topicName Name of topic + * @param model + * @return View for seeing all messages in a topic sorted by timestamp. + */ + @RequestMapping(method = RequestMethod.GET, value = "/topic/{name:.+}/allmessages") + public String viewAllMessages(@PathVariable("name") String topicName, + Model model, @RequestParam(name = "count", required = false) Integer count) { + final int size = (count != null? count : 100); + final MessageFormat defaultFormat = messageFormatProperties.getFormat(); + final TopicVO topic = kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + + model.addAttribute("topic", topic); + model.addAttribute("defaultFormat", defaultFormat); + model.addAttribute("messageFormats", MessageFormat.values()); + + final var deserializer = getDeserializer(topicName, defaultFormat); + final List messages = new ArrayList<>(); + + for (TopicPartitionVO partition : topic.getPartitions()) { + messages.addAll(messageInspector.getMessages(topicName, + partition.getId(), + partition.getFirstOffset(), + size, + deserializer)); + } + + Collections.sort(messages, Comparator.comparing(MessageVO::getTimestamp)); + model.addAttribute("messages", messages); + + return "topic-messages"; + } + /** * Human friendly view of reading messages. * @param topicName Name of topic diff --git a/src/main/java/kafdrop/model/MessageVO.java b/src/main/java/kafdrop/model/MessageVO.java index fca0fc08..d330d58c 100644 --- a/src/main/java/kafdrop/model/MessageVO.java +++ b/src/main/java/kafdrop/model/MessageVO.java @@ -22,11 +22,15 @@ import java.util.stream.*; public final class MessageVO { + private int partition; private String message; private String key; private Map headers; private Date timestamp; + public int getPartition() { return partition; } + public void setPartition(int partition) { this.partition = partition; } + public String getMessage() { return message; } diff --git a/src/main/java/kafdrop/service/CuratorKafkaMonitor.java b/src/main/java/kafdrop/service/CuratorKafkaMonitor.java index 0959de92..ee345ece 100644 --- a/src/main/java/kafdrop/service/CuratorKafkaMonitor.java +++ b/src/main/java/kafdrop/service/CuratorKafkaMonitor.java @@ -226,6 +226,7 @@ public List getMessages(TopicPartition topicPartition, long offset, i final var messageVos = new ArrayList(); for (var record : records) { final var messageVo = new MessageVO(); + messageVo.setPartition(topicPartition.partition()); messageVo.setKey(record.key()); messageVo.setMessage(record.value()); messageVo.setHeaders(headersToMap(record.headers())); diff --git a/src/main/resources/templates/message-inspector.ftl b/src/main/resources/templates/message-inspector.ftl index 5508ae6e..c1b127d8 100644 --- a/src/main/resources/templates/message-inspector.ftl +++ b/src/main/resources/templates/message-inspector.ftl @@ -18,7 +18,7 @@ <#import "/spring.ftl" as spring /> <@template.header "Topic: ${topic.name}: Messages"> + + +<#setting number_format="0"> + +

Topic Messages: ${topic.name}

+ +
+ <#if messages?? && messages?size gt 0> + <#list messages as msg> +
+ Partition: ${msg.partition}   + + Key: ${msg.key!''}   + Timestamp: ${msg.timestamp?string('yyyy-MM-dd HH:mm:ss.SSS')} + Headers: ${msg.headersFormatted} +
+   +
${msg.message!''}
+
+
+ + + +
+ +<@template.footer/> From 8c46c84c6558f68ee8781b24c12c565e93c90f0e Mon Sep 17 00:00:00 2001 From: Kacey B Date: Fri, 21 Jun 2019 11:45:38 +1000 Subject: [PATCH 2/9] Pass the offset to MessageVO and show this correct offset on the partition and topic message views. --- src/main/java/kafdrop/model/MessageVO.java | 4 ++++ src/main/java/kafdrop/service/CuratorKafkaMonitor.java | 1 + src/main/resources/templates/message-inspector.ftl | 5 ++--- src/main/resources/templates/topic-messages.ftl | 2 +- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/kafdrop/model/MessageVO.java b/src/main/java/kafdrop/model/MessageVO.java index d330d58c..e0cd80d0 100644 --- a/src/main/java/kafdrop/model/MessageVO.java +++ b/src/main/java/kafdrop/model/MessageVO.java @@ -23,6 +23,7 @@ public final class MessageVO { private int partition; + private long offset; private String message; private String key; private Map headers; @@ -31,6 +32,9 @@ public final class MessageVO { public int getPartition() { return partition; } public void setPartition(int partition) { this.partition = partition; } + public long getOffset() { return offset; } + public void setOffset(long offset) { this.offset = offset; } + public String getMessage() { return message; } diff --git a/src/main/java/kafdrop/service/CuratorKafkaMonitor.java b/src/main/java/kafdrop/service/CuratorKafkaMonitor.java index ee345ece..d70ee3b4 100644 --- a/src/main/java/kafdrop/service/CuratorKafkaMonitor.java +++ b/src/main/java/kafdrop/service/CuratorKafkaMonitor.java @@ -227,6 +227,7 @@ public List getMessages(TopicPartition topicPartition, long offset, i for (var record : records) { final var messageVo = new MessageVO(); messageVo.setPartition(topicPartition.partition()); + messageVo.setOffset(record.offset()); messageVo.setKey(record.key()); messageVo.setMessage(record.value()); messageVo.setHeaders(headersToMap(record.headers())); diff --git a/src/main/resources/templates/message-inspector.ftl b/src/main/resources/templates/message-inspector.ftl index c1b127d8..f2c42424 100644 --- a/src/main/resources/templates/message-inspector.ftl +++ b/src/main/resources/templates/message-inspector.ftl @@ -105,9 +105,8 @@
<#if messages?? && messages?size gt 0> <#list messages as msg> - <#assign offset=messageForm.offset + msg_index> -
- Offset: ${offset}   +
+ Offset: ${msg.offset}   Key: ${msg.key!''}   Timestamp: ${msg.timestamp?string('yyyy-MM-dd HH:mm:ss.SSS')} Headers: ${msg.headersFormatted} diff --git a/src/main/resources/templates/topic-messages.ftl b/src/main/resources/templates/topic-messages.ftl index 921c5251..f5b698bd 100644 --- a/src/main/resources/templates/topic-messages.ftl +++ b/src/main/resources/templates/topic-messages.ftl @@ -38,7 +38,7 @@ <#list messages as msg>
Partition: ${msg.partition}   - + Offset: ${msg.offset}   Key: ${msg.key!''}   Timestamp: ${msg.timestamp?string('yyyy-MM-dd HH:mm:ss.SSS')} Headers: ${msg.headersFormatted} From 8c0a95c7fdca1742a0dfeb7b2e4bfad6faadb690 Mon Sep 17 00:00:00 2001 From: Kacey B Date: Sun, 21 Jul 2019 22:54:42 +1000 Subject: [PATCH 3/9] Add initial impl for getting topic messages. --- .../kafdrop/controller/MessageController.java | 4 +- .../kafdrop/service/CuratorKafkaMonitor.java | 23 +++++++++ .../service/KafkaHighLevelConsumer.java | 51 ++++++++++++++++++- .../java/kafdrop/service/KafkaMonitor.java | 6 +++ .../kafdrop/service/MessageInspector.java | 11 ++++ 5 files changed, 91 insertions(+), 4 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 04eac638..7fa78ff6 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -72,7 +72,7 @@ public String viewAllMessages(@PathVariable("name") String topicName, model.addAttribute("messageFormats", MessageFormat.values()); final var deserializer = getDeserializer(topicName, defaultFormat); - final List messages = new ArrayList<>(); + final List messages = messageInspector.getMessages(topicName, size, deserializer); for (TopicPartitionVO partition : topic.getPartitions()) { messages.addAll(messageInspector.getMessages(topicName, @@ -82,7 +82,7 @@ public String viewAllMessages(@PathVariable("name") String topicName, deserializer)); } - Collections.sort(messages, Comparator.comparing(MessageVO::getTimestamp)); + messages.sort(Comparator.comparing(MessageVO::getTimestamp)); model.addAttribute("messages", messages); return "topic-messages"; diff --git a/src/main/java/kafdrop/service/CuratorKafkaMonitor.java b/src/main/java/kafdrop/service/CuratorKafkaMonitor.java index 0b58f5eb..c7fa0368 100644 --- a/src/main/java/kafdrop/service/CuratorKafkaMonitor.java +++ b/src/main/java/kafdrop/service/CuratorKafkaMonitor.java @@ -221,6 +221,29 @@ private Map getTopicMetadata(String... topics) { return kafkaHighLevelConsumer.getTopicsInfo(topics); } + @Override + public List getMessages(String topic, int count, + MessageDeserializer deserializer) { + final var records = + kafkaHighLevelConsumer.getLatestRecords(topic, count, deserializer); + if (records != null) { + final var messageVos = new ArrayList(); + for (var record : records) { + final var messageVo = new MessageVO(); + messageVo.setPartition(record.partition()); + messageVo.setOffset(record.offset()); + messageVo.setKey(record.key()); + messageVo.setMessage(record.value()); + messageVo.setHeaders(headersToMap(record.headers())); + messageVo.setTimestamp(new Date(record.timestamp())); + messageVos.add(messageVo); + } + return messageVos; + } else { + return Collections.emptyList(); + } + } + @Override public List getMessages(TopicPartition topicPartition, long offset, int count, MessageDeserializer deserializer) { diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 8df02357..c2f68199 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -93,9 +93,9 @@ synchronized List> getLatestRecords(TopicPartitio kafkaConsumer.seek(topicPartition, offset); final var rawRecords = new ArrayList>(count); - while (rawRecords.size() <= count) { + while (rawRecords.size() <= count) { //TODO should stop if get to count or get to the latest offset final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)).records(topicPartition); - if (polled.isEmpty()) break; + if (polled.isEmpty()) break; //TODO remove this rawRecords.addAll(polled); } return rawRecords @@ -116,6 +116,53 @@ synchronized List> getLatestRecords(TopicPartitio .collect(Collectors.toList()); } + /** + * Gets records from all partitions of a given topic. + * @param count The maximum number of records getting back. + * @param deserializer Message deserializer + * @return A list of consumer records for a given topic. + */ + synchronized List> getLatestRecords(String topic, + int count, + MessageDeserializer deserializer) { + initializeClient(); + final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); + final var topicPartitions = partitionInfoSet.stream() + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), + partitionInfo.partition())) + .collect(Collectors.toList()); + kafkaConsumer.assign(topicPartitions); + for (var topicPartition : topicPartitions) { + kafkaConsumer.seek(topicPartition, count); //TODO fix latest offset - count > 0 + } + + final var rawRecords = new ArrayList>(count); + + while (rawRecords.size() <= count) { // TODO find all messages for each partition + final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); + for (var record : polled) { + rawRecords.add(record); + } + } + + return rawRecords + .subList(0, Math.min(count, rawRecords.size())) + .stream() + .map(rec -> new ConsumerRecord<>(rec.topic(), + rec.partition(), + rec.offset(), + rec.timestamp(), + rec.timestampType(), + 0L, + rec.serializedKeySize(), + rec.serializedValueSize(), + rec.key(), + deserializer.deserializeMessage(ByteBuffer.wrap(rec.value())), + rec.headers(), + rec.leaderEpoch())) + .collect(Collectors.toList()); + } + synchronized Map getTopicsInfo(String[] topics) { initializeClient(); if (topics.length == 0) { diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index 4fb2d397..fa626cb6 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -31,6 +31,12 @@ public interface KafkaMonitor { List getTopics(); + /** + * Returns messages for a given topic. + */ + List getMessages(String topic, int count, + MessageDeserializer deserializer); + List getMessages(TopicPartition topicPartition, long offset, int count, MessageDeserializer deserializer); diff --git a/src/main/java/kafdrop/service/MessageInspector.java b/src/main/java/kafdrop/service/MessageInspector.java index 9e8f8948..986343be 100644 --- a/src/main/java/kafdrop/service/MessageInspector.java +++ b/src/main/java/kafdrop/service/MessageInspector.java @@ -33,9 +33,20 @@ public MessageInspector(KafkaMonitor kafkaMonitor) { this.kafkaMonitor = kafkaMonitor; } + /** + * Gets messages for a given partition. + */ public List getMessages(String topicName, int partitionId, long offset, int count, MessageDeserializer deserializer) { final var topicPartition = new TopicPartition(topicName, partitionId); return kafkaMonitor.getMessages(topicPartition, offset, count, deserializer); } + + /** + * Gets messages for a given topic. + */ + public List getMessages(String topicName, int count, + MessageDeserializer deserializer) { + return kafkaMonitor.getMessages(topicName, count, deserializer); + } } From f7ce1d7956b21266e0129b16c05c066729353e5b Mon Sep 17 00:00:00 2001 From: Kacey B Date: Sun, 1 Sep 2019 11:34:00 +1000 Subject: [PATCH 4/9] Seek to count or latest offset (end - 1) for a partition. --- .../service/KafkaHighLevelConsumer.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index c2f68199..773e31ca 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -86,18 +86,35 @@ synchronized Map getPartitionSize(String topic) { return partitionsVo; } + /** + * Retrieves latest records from the given offset. + * @param topicPartition Topic partition + * @param offset Offset to seek from + * @param count Maximum number of records returned + * @param deserializer Message deserialiser + * @return Latest records + */ synchronized List> getLatestRecords(TopicPartition topicPartition, long offset, int count, MessageDeserializer deserializer) { initializeClient(); - kafkaConsumer.assign(Collections.singletonList(topicPartition)); + final var partitionList = Collections.singletonList(topicPartition); + kafkaConsumer.assign(partitionList); kafkaConsumer.seek(topicPartition, offset); final var rawRecords = new ArrayList>(count); - while (rawRecords.size() <= count) { //TODO should stop if get to count or get to the latest offset + final var latestOffset = Math.max(0, kafkaConsumer.endOffsets(partitionList).get(topicPartition) - 1); + var currentOffset = offset; + + // stop if get to count or get to the latest offset + while (rawRecords.size() < count && currentOffset < latestOffset) { final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)).records(topicPartition); - if (polled.isEmpty()) break; //TODO remove this - rawRecords.addAll(polled); + + if (!polled.isEmpty()) { + rawRecords.addAll(polled); + currentOffset = polled.get(polled.size() - 1).offset(); + } } + return rawRecords .subList(0, Math.min(count, rawRecords.size())) .stream() From c3670af4f1d71e57c98134be6ef2fabd48be691a Mon Sep 17 00:00:00 2001 From: Kacey B Date: Mon, 23 Sep 2019 20:52:14 +1000 Subject: [PATCH 5/9] Improved getLatest records. --- .../kafdrop/controller/MessageController.java | 4 +- .../service/KafkaHighLevelConsumer.java | 46 ++++++++++++------- .../kafdrop/service/MessageInspector.java | 2 +- .../resources/static/js/topic-messages.js | 5 ++ src/main/resources/templates/topic-detail.ftl | 3 +- 5 files changed, 40 insertions(+), 20 deletions(-) create mode 100644 src/main/resources/static/js/topic-messages.js diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 7fa78ff6..bff423be 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -59,7 +59,7 @@ public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInsp * @param model * @return View for seeing all messages in a topic sorted by timestamp. */ - @RequestMapping(method = RequestMethod.GET, value = "/topic/{name:.+}/allmessages") + @GetMapping("/topic/{name:.+}/allmessages") public String viewAllMessages(@PathVariable("name") String topicName, Model model, @RequestParam(name = "count", required = false) Integer count) { final int size = (count != null? count : 100); @@ -96,7 +96,7 @@ public String viewAllMessages(@PathVariable("name") String topicName, * @param model * @return View for seeing messages in a partition. */ - @RequestMapping(method = RequestMethod.GET, value = "/topic/{name:.+}/messages") + @GetMapping("/topic/{name:.+}/messages") public String viewMessageForm(@PathVariable("name") String topicName, @Valid @ModelAttribute("messageForm") PartitionOffsetInfo messageForm, BindingResult errors, diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 773e31ca..abe73a7e 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -88,26 +88,26 @@ synchronized Map getPartitionSize(String topic) { /** * Retrieves latest records from the given offset. - * @param topicPartition Topic partition + * @param partition Topic partition * @param offset Offset to seek from * @param count Maximum number of records returned * @param deserializer Message deserialiser * @return Latest records */ - synchronized List> getLatestRecords(TopicPartition topicPartition, long offset, int count, + synchronized List> getLatestRecords(TopicPartition partition, long offset, int count, MessageDeserializer deserializer) { initializeClient(); - final var partitionList = Collections.singletonList(topicPartition); - kafkaConsumer.assign(partitionList); - kafkaConsumer.seek(topicPartition, offset); + final var partitions = Collections.singletonList(partition); + kafkaConsumer.assign(partitions); + kafkaConsumer.seek(partition, offset); final var rawRecords = new ArrayList>(count); - final var latestOffset = Math.max(0, kafkaConsumer.endOffsets(partitionList).get(topicPartition) - 1); + final var latestOffset = Math.max(0, kafkaConsumer.endOffsets(partitions).get(partition) - 1); var currentOffset = offset; // stop if get to count or get to the latest offset while (rawRecords.size() < count && currentOffset < latestOffset) { - final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)).records(topicPartition); + final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)).records(partition); if (!polled.isEmpty()) { rawRecords.addAll(polled); @@ -144,27 +144,41 @@ synchronized List> getLatestRecords(String topic, MessageDeserializer deserializer) { initializeClient(); final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); - final var topicPartitions = partitionInfoSet.stream() + final var partitions = partitionInfoSet.stream() .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) .collect(Collectors.toList()); - kafkaConsumer.assign(topicPartitions); - for (var topicPartition : topicPartitions) { - kafkaConsumer.seek(topicPartition, count); //TODO fix latest offset - count > 0 + kafkaConsumer.assign(partitions); + final var latestOffsets = kafkaConsumer.endOffsets(partitions); + + for (var partition : partitions) { + final var latestOffset = Math.max(0, latestOffsets.get(partition) - 1); + kafkaConsumer.seek(partition, Math.max(0, latestOffset - count)); } - final var rawRecords = new ArrayList>(count); + final var totalCount = count * partitions.size(); + //final var rawRecords = new ArrayList>(totalCount); + final Map>> rawRecords + = partitions.stream().collect(Collectors.toMap(p -> p , p -> new ArrayList<>(count))); - while (rawRecords.size() <= count) { // TODO find all messages for each partition + var moreRecords = true; + while (rawRecords.size() < totalCount && moreRecords) { final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); - for (var record : polled) { - rawRecords.add(record); + + moreRecords = false; + for (var partition : polled.partitions()) { + var records = polled.records(partition); + if (!records.isEmpty()) { + rawRecords.get(partition).addAll(records); + moreRecords = records.get(records.size() - 1).offset() < latestOffsets.get(partition) - 1; + } } } return rawRecords - .subList(0, Math.min(count, rawRecords.size())) + .values() .stream() + .flatMap(Collection::stream) .map(rec -> new ConsumerRecord<>(rec.topic(), rec.partition(), rec.offset(), diff --git a/src/main/java/kafdrop/service/MessageInspector.java b/src/main/java/kafdrop/service/MessageInspector.java index 986343be..03e318f5 100644 --- a/src/main/java/kafdrop/service/MessageInspector.java +++ b/src/main/java/kafdrop/service/MessageInspector.java @@ -43,7 +43,7 @@ public List getMessages(String topicName, int partitionId, long offse } /** - * Gets messages for a given topic. + * Gets all messages from all partitions of a given topic. */ public List getMessages(String topicName, int count, MessageDeserializer deserializer) { diff --git a/src/main/resources/static/js/topic-messages.js b/src/main/resources/static/js/topic-messages.js new file mode 100644 index 00000000..46271689 --- /dev/null +++ b/src/main/resources/static/js/topic-messages.js @@ -0,0 +1,5 @@ +//jQuery(document).ready(function () { +// $( "#topic-messages" ).bind( "click", function() { +// +// }); +//}); \ No newline at end of file diff --git a/src/main/resources/templates/topic-detail.ftl b/src/main/resources/templates/topic-detail.ftl index e340820f..7c74af3c 100644 --- a/src/main/resources/templates/topic-detail.ftl +++ b/src/main/resources/templates/topic-detail.ftl @@ -25,6 +25,7 @@ word-break: break-all; } + <#setting number_format="0"> @@ -32,7 +33,7 @@

Topic: ${topic.name}

From ccfcead3a25330dff44a3bdd6150940fbeca412a Mon Sep 17 00:00:00 2001 From: Kacey B Date: Mon, 23 Sep 2019 21:48:36 +1000 Subject: [PATCH 6/9] Fixed bug when requested offset is before first offset. --- src/main/java/kafdrop/service/KafkaHighLevelConsumer.java | 3 +-- src/main/resources/static/js/topic-messages.js | 5 ----- src/main/resources/templates/topic-detail.ftl | 3 +-- 3 files changed, 2 insertions(+), 9 deletions(-) delete mode 100644 src/main/resources/static/js/topic-messages.js diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index abe73a7e..d8627531 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -41,7 +41,7 @@ private void initializeClient() { properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafdrop-client"); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerConnect()); @@ -157,7 +157,6 @@ synchronized List> getLatestRecords(String topic, } final var totalCount = count * partitions.size(); - //final var rawRecords = new ArrayList>(totalCount); final Map>> rawRecords = partitions.stream().collect(Collectors.toMap(p -> p , p -> new ArrayList<>(count))); diff --git a/src/main/resources/static/js/topic-messages.js b/src/main/resources/static/js/topic-messages.js deleted file mode 100644 index 46271689..00000000 --- a/src/main/resources/static/js/topic-messages.js +++ /dev/null @@ -1,5 +0,0 @@ -//jQuery(document).ready(function () { -// $( "#topic-messages" ).bind( "click", function() { -// -// }); -//}); \ No newline at end of file diff --git a/src/main/resources/templates/topic-detail.ftl b/src/main/resources/templates/topic-detail.ftl index 7c74af3c..7192b19f 100644 --- a/src/main/resources/templates/topic-detail.ftl +++ b/src/main/resources/templates/topic-detail.ftl @@ -25,7 +25,6 @@ word-break: break-all; } - <#setting number_format="0"> @@ -33,7 +32,7 @@

Topic: ${topic.name}

From 75580b8035a6fd66452ffaa4388188d8f094b3d4 Mon Sep 17 00:00:00 2001 From: Kacey B Date: Mon, 23 Sep 2019 22:07:59 +1000 Subject: [PATCH 7/9] Added back link to consumer group. --- src/main/resources/templates/topic-detail.ftl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/resources/templates/topic-detail.ftl b/src/main/resources/templates/topic-detail.ftl index 7192b19f..e7ca9d3e 100644 --- a/src/main/resources/templates/topic-detail.ftl +++ b/src/main/resources/templates/topic-detail.ftl @@ -34,7 +34,7 @@ - +
@@ -109,7 +109,7 @@ <#list topic.partitions as p> - ${p.id} + ${p.id} ${p.firstOffset} ${p.size} ${p.size - p.firstOffset} @@ -136,7 +136,7 @@ <#list consumers![] as c> - ${c.groupId} + ${c.groupId} ${c.getTopic(topic.name).lag} From 67782e64315b4e8d1d9f2de327e6c20dcab9d067 Mon Sep 17 00:00:00 2001 From: Kacey B Date: Mon, 23 Sep 2019 22:34:47 +1000 Subject: [PATCH 8/9] Fix for single-message case --- src/main/java/kafdrop/service/KafkaHighLevelConsumer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index d8627531..d8503a73 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -102,8 +102,8 @@ synchronized List> getLatestRecords(TopicPartitio kafkaConsumer.seek(partition, offset); final var rawRecords = new ArrayList>(count); - final var latestOffset = Math.max(0, kafkaConsumer.endOffsets(partitions).get(partition) - 1); - var currentOffset = offset; + final var latestOffset = kafkaConsumer.endOffsets(partitions).get(partition) - 1; + var currentOffset = offset - 1; // stop if get to count or get to the latest offset while (rawRecords.size() < count && currentOffset < latestOffset) { From 97d06091b9769ec24fe2d54d0df1a6927c79c328 Mon Sep 17 00:00:00 2001 From: Emil Koutanov Date: Tue, 24 Sep 2019 20:49:09 +1000 Subject: [PATCH 9/9] Null-safe deserialization of record values --- .../service/KafkaHighLevelConsumer.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index d8503a73..b625a705 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -37,7 +37,6 @@ private void initializeClient() { if (kafkaConsumer == null) { final var properties = new Properties(); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kafdrop-consumer-group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); @@ -127,7 +126,7 @@ synchronized List> getLatestRecords(TopicPartitio rec.serializedKeySize(), rec.serializedValueSize(), rec.key(), - deserializer.deserializeMessage(ByteBuffer.wrap(rec.value())), + deserialize(deserializer, rec.value()), rec.headers(), rec.leaderEpoch())) .collect(Collectors.toList()); @@ -187,19 +186,23 @@ synchronized List> getLatestRecords(String topic, rec.serializedKeySize(), rec.serializedValueSize(), rec.key(), - deserializer.deserializeMessage(ByteBuffer.wrap(rec.value())), + deserialize(deserializer, rec.value()), rec.headers(), rec.leaderEpoch())) .collect(Collectors.toList()); } + private static String deserialize(MessageDeserializer deserializer, byte[] bytes) { + return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty"; + } + synchronized Map getTopicsInfo(String[] topics) { initializeClient(); if (topics.length == 0) { final var topicSet = kafkaConsumer.listTopics().keySet(); topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class); } - final var topicVos = new HashMap(); + final var topicVos = new HashMap(topics.length, 1f); for (var topic : topics) { topicVos.put(topic, getTopicInfo(topic)); @@ -209,19 +212,19 @@ synchronized Map getTopicsInfo(String[] topics) { } private TopicVO getTopicInfo(String topic) { - final List partitionInfoList = kafkaConsumer.partitionsFor(topic); - final TopicVO topicVo = new TopicVO(topic); - final Map partitions = new TreeMap<>(); + final var partitionInfoList = kafkaConsumer.partitionsFor(topic); + final var topicVo = new TopicVO(topic); + final var partitions = new TreeMap(); - for (PartitionInfo partitionInfo : partitionInfoList) { + for (var partitionInfo : partitionInfoList) { final TopicPartitionVO topicPartitionVo = new TopicPartitionVO(partitionInfo.partition()); - final Node leader = partitionInfo.leader(); + final var leader = partitionInfo.leader(); if (leader != null) { topicPartitionVo.addReplica(new TopicPartitionVO.PartitionReplica(leader.id(), true, true)); } - for (Node node : partitionInfo.replicas()) { + for (var node : partitionInfo.replicas()) { topicPartitionVo.addReplica(new TopicPartitionVO.PartitionReplica(node.id(), true, false)); } partitions.put(partitionInfo.partition(), topicPartitionVo);