Skip to content

Commit

Permalink
Merge pull request #29 from obsidiandynamics/bootstrap
Browse files Browse the repository at this point in the history
Improved getLatestRecords() to handle empty polls
  • Loading branch information
ekoutanov authored Sep 24, 2019
2 parents 4239c83 + ec3f824 commit cd66e71
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 25 deletions.
37 changes: 36 additions & 1 deletion src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@ public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInsp
this.schemaRegistryProperties = schemaRegistryProperties;
}

/**
* Human friendly view of reading all topic messages sorted by timestamp.
* @param topicName Name of topic
* @param model
* @return View for seeing all messages in a topic sorted by timestamp.
*/
@GetMapping("/topic/{name:.+}/allmessages")
public String viewAllMessages(@PathVariable("name") String topicName,
Model model, @RequestParam(name = "count", required = false) Integer count) {
final int size = (count != null? count : 100);
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
final TopicVO topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

model.addAttribute("topic", topic);
model.addAttribute("defaultFormat", defaultFormat);
model.addAttribute("messageFormats", MessageFormat.values());

final var deserializer = getDeserializer(topicName, defaultFormat);
final List<MessageVO> messages = messageInspector.getMessages(topicName, size, deserializer);

for (TopicPartitionVO partition : topic.getPartitions()) {
messages.addAll(messageInspector.getMessages(topicName,
partition.getId(),
partition.getFirstOffset(),
size,
deserializer));
}

messages.sort(Comparator.comparing(MessageVO::getTimestamp));
model.addAttribute("messages", messages);

return "topic-messages";
}

/**
* Human friendly view of reading messages.
* @param topicName Name of topic
Expand All @@ -61,7 +96,7 @@ public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInsp
* @param model
* @return View for seeing messages in a partition.
*/
@RequestMapping(method = RequestMethod.GET, value = "/topic/{name:.+}/messages")
@GetMapping("/topic/{name:.+}/messages")
public String viewMessageForm(@PathVariable("name") String topicName,
@Valid @ModelAttribute("messageForm") PartitionOffsetInfo messageForm,
BindingResult errors,
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/kafdrop/model/MessageVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@
import java.util.stream.*;

public final class MessageVO {
private int partition;
private long offset;
private String message;
private String key;
private Map<String, String> headers;
private Date timestamp;

public int getPartition() { return partition; }
public void setPartition(int partition) { this.partition = partition; }

public long getOffset() { return offset; }
public void setOffset(long offset) { this.offset = offset; }

public String getMessage() {
return message;
}
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/kafdrop/service/CuratorKafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,29 @@ private Map<String, TopicVO> getTopicMetadata(String... topics) {
return kafkaHighLevelConsumer.getTopicsInfo(topics);
}

@Override
public List<MessageVO> getMessages(String topic, int count,
MessageDeserializer deserializer) {
final var records =
kafkaHighLevelConsumer.getLatestRecords(topic, count, deserializer);
if (records != null) {
final var messageVos = new ArrayList<MessageVO>();
for (var record : records) {
final var messageVo = new MessageVO();
messageVo.setPartition(record.partition());
messageVo.setOffset(record.offset());
messageVo.setKey(record.key());
messageVo.setMessage(record.value());
messageVo.setHeaders(headersToMap(record.headers()));
messageVo.setTimestamp(new Date(record.timestamp()));
messageVos.add(messageVo);
}
return messageVos;
} else {
return Collections.emptyList();
}
}

@Override
public List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int count,
MessageDeserializer deserializer) {
Expand All @@ -228,6 +251,8 @@ public List<MessageVO> getMessages(TopicPartition topicPartition, long offset, i
final var messageVos = new ArrayList<MessageVO>();
for (var record : records) {
final var messageVo = new MessageVO();
messageVo.setPartition(topicPartition.partition());
messageVo.setOffset(record.offset());
messageVo.setKey(record.key());
messageVo.setMessage(record.value());
messageVo.setHeaders(headersToMap(record.headers()));
Expand Down
114 changes: 97 additions & 17 deletions src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ private void initializeClient() {
if (kafkaConsumer == null) {
final var properties = new Properties();
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kafdrop-consumer-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafdrop-client");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerConnect());

Expand Down Expand Up @@ -86,18 +85,35 @@ synchronized Map<Integer, TopicPartitionVO> getPartitionSize(String topic) {
return partitionsVo;
}

synchronized List<ConsumerRecord<String, String>> getLatestRecords(TopicPartition topicPartition, long offset, int count,
/**
* Retrieves latest records from the given offset.
* @param partition Topic partition
* @param offset Offset to seek from
* @param count Maximum number of records returned
* @param deserializer Message deserialiser
* @return Latest records
*/
synchronized List<ConsumerRecord<String, String>> getLatestRecords(TopicPartition partition, long offset, int count,
MessageDeserializer deserializer) {
initializeClient();
kafkaConsumer.assign(Collections.singletonList(topicPartition));
kafkaConsumer.seek(topicPartition, offset);
final var partitions = Collections.singletonList(partition);
kafkaConsumer.assign(partitions);
kafkaConsumer.seek(partition, offset);

final var rawRecords = new ArrayList<ConsumerRecord<String, byte[]>>(count);
while (rawRecords.size() <= count) {
final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)).records(topicPartition);
if (polled.isEmpty()) break;
rawRecords.addAll(polled);
final var latestOffset = kafkaConsumer.endOffsets(partitions).get(partition) - 1;
var currentOffset = offset - 1;

// stop if get to count or get to the latest offset
while (rawRecords.size() < count && currentOffset < latestOffset) {
final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)).records(partition);

if (!polled.isEmpty()) {
rawRecords.addAll(polled);
currentOffset = polled.get(polled.size() - 1).offset();
}
}

return rawRecords
.subList(0, Math.min(count, rawRecords.size()))
.stream()
Expand All @@ -110,19 +126,83 @@ synchronized List<ConsumerRecord<String, String>> getLatestRecords(TopicPartitio
rec.serializedKeySize(),
rec.serializedValueSize(),
rec.key(),
deserializer.deserializeMessage(ByteBuffer.wrap(rec.value())),
deserialize(deserializer, rec.value()),
rec.headers(),
rec.leaderEpoch()))
.collect(Collectors.toList());
}

/**
* Gets records from all partitions of a given topic.
* @param count The maximum number of records getting back.
* @param deserializer Message deserializer
* @return A list of consumer records for a given topic.
*/
synchronized List<ConsumerRecord<String, String>> getLatestRecords(String topic,
int count,
MessageDeserializer deserializer) {
initializeClient();
final var partitionInfoSet = kafkaConsumer.partitionsFor(topic);
final var partitions = partitionInfoSet.stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
.collect(Collectors.toList());
kafkaConsumer.assign(partitions);
final var latestOffsets = kafkaConsumer.endOffsets(partitions);

for (var partition : partitions) {
final var latestOffset = Math.max(0, latestOffsets.get(partition) - 1);
kafkaConsumer.seek(partition, Math.max(0, latestOffset - count));
}

final var totalCount = count * partitions.size();
final Map<TopicPartition, List<ConsumerRecord<String, byte[]>>> rawRecords
= partitions.stream().collect(Collectors.toMap(p -> p , p -> new ArrayList<>(count)));

var moreRecords = true;
while (rawRecords.size() < totalCount && moreRecords) {
final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));

moreRecords = false;
for (var partition : polled.partitions()) {
var records = polled.records(partition);
if (!records.isEmpty()) {
rawRecords.get(partition).addAll(records);
moreRecords = records.get(records.size() - 1).offset() < latestOffsets.get(partition) - 1;
}
}
}

return rawRecords
.values()
.stream()
.flatMap(Collection::stream)
.map(rec -> new ConsumerRecord<>(rec.topic(),
rec.partition(),
rec.offset(),
rec.timestamp(),
rec.timestampType(),
0L,
rec.serializedKeySize(),
rec.serializedValueSize(),
rec.key(),
deserialize(deserializer, rec.value()),
rec.headers(),
rec.leaderEpoch()))
.collect(Collectors.toList());
}

private static String deserialize(MessageDeserializer deserializer, byte[] bytes) {
return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty";
}

synchronized Map<String, TopicVO> getTopicsInfo(String[] topics) {
initializeClient();
if (topics.length == 0) {
final var topicSet = kafkaConsumer.listTopics().keySet();
topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class);
}
final var topicVos = new HashMap<String, TopicVO>();
final var topicVos = new HashMap<String, TopicVO>(topics.length, 1f);

for (var topic : topics) {
topicVos.put(topic, getTopicInfo(topic));
Expand All @@ -132,19 +212,19 @@ synchronized Map<String, TopicVO> getTopicsInfo(String[] topics) {
}

private TopicVO getTopicInfo(String topic) {
final List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
final TopicVO topicVo = new TopicVO(topic);
final Map<Integer, TopicPartitionVO> partitions = new TreeMap<>();
final var partitionInfoList = kafkaConsumer.partitionsFor(topic);
final var topicVo = new TopicVO(topic);
final var partitions = new TreeMap<Integer, TopicPartitionVO>();

for (PartitionInfo partitionInfo : partitionInfoList) {
for (var partitionInfo : partitionInfoList) {
final TopicPartitionVO topicPartitionVo = new TopicPartitionVO(partitionInfo.partition());

final Node leader = partitionInfo.leader();
final var leader = partitionInfo.leader();
if (leader != null) {
topicPartitionVo.addReplica(new TopicPartitionVO.PartitionReplica(leader.id(), true, true));
}

for (Node node : partitionInfo.replicas()) {
for (var node : partitionInfo.replicas()) {
topicPartitionVo.addReplica(new TopicPartitionVO.PartitionReplica(node.id(), true, false));
}
partitions.put(partitionInfo.partition(), topicPartitionVo);
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public interface KafkaMonitor {

List<TopicVO> getTopics();

/**
* Returns messages for a given topic.
*/
List<MessageVO> getMessages(String topic, int count,
MessageDeserializer deserializer);

List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int count,
MessageDeserializer deserializer);

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/kafdrop/service/MessageInspector.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,20 @@ public MessageInspector(KafkaMonitor kafkaMonitor) {
this.kafkaMonitor = kafkaMonitor;
}

/**
* Gets messages for a given partition.
*/
public List<MessageVO> getMessages(String topicName, int partitionId, long offset, int count,
MessageDeserializer deserializer) {
final var topicPartition = new TopicPartition(topicName, partitionId);
return kafkaMonitor.getMessages(topicPartition, offset, count, deserializer);
}

/**
* Gets all messages from all partitions of a given topic.
*/
public List<MessageVO> getMessages(String topicName, int count,
MessageDeserializer deserializer) {
return kafkaMonitor.getMessages(topicName, count, deserializer);
}
}
11 changes: 7 additions & 4 deletions src/main/resources/templates/message-inspector.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<#import "lib/template.ftl" as template>
<@template.header "Topic: ${topic.name}: Messages">
<style type="text/css">
h1 {
h2 {
margin-bottom: 16px;
}
Expand All @@ -30,6 +30,10 @@
margin-left: 16px;
}
.badge {
margin-right: 5px;
}
.toggle-msg {
float: left;
}
Expand Down Expand Up @@ -101,9 +105,8 @@
<div id="message-display" class="container">
<#if messages?? && messages?size gt 0>
<#list messages as msg>
<#assign offset=messageForm.offset + msg_index>
<div data-offset="${offset}" class="message-detail">
<span class="badge badge-light">Offset:</span> ${offset} &nbsp;
<div class="message-detail">
<span class="badge badge-light">Offset:</span> ${msg.offset} &nbsp;
<span class="badge badge-light">Key:</span> ${msg.key!''} &nbsp;
<span class="badge badge-light">Timestamp:</span> ${msg.timestamp?string('yyyy-MM-dd HH:mm:ss.SSS')}
<span class="badge badge-light">Headers:</span> ${msg.headersFormatted}
Expand Down
5 changes: 2 additions & 3 deletions src/main/resources/templates/topic-detail.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@
<h2>Topic: ${topic.name}</h2>

<div id="action-bar" class="container">
<a class="btn btn-outline-light" href="<@spring.url '/topic/${topic.name}/messages'/>"><i class="fa fa-eye"></i> View Messages</a>
<a id="topic-messages" class="btn btn-outline-light" href="<@spring.url '/topic/${topic.name}/messages'/>"><i class="fa fa-eye"></i> View Messages</a>
</div>
<br/>

<div class="container-fluid">
<div class="row">

Expand Down Expand Up @@ -110,7 +109,7 @@
<tbody>
<#list topic.partitions as p>
<tr>
<td>${p.id}</td>
<td><a href="<@spring.url '/topic/${topic.name}/messages?partition=${p.id}&offset=${p.firstOffset}&count=${p.size - p.firstOffset}'/>">${p.id}</a></td>
<td>${p.firstOffset}</td>
<td>${p.size}</td>
<td>${p.size - p.firstOffset}</td>
Expand Down
Loading

0 comments on commit cd66e71

Please sign in to comment.