From c94f6ba72cbdcb36fc68005aae55652a1ab79782 Mon Sep 17 00:00:00 2001 From: pedro lastra Date: Sat, 8 Aug 2020 16:40:23 +0200 Subject: [PATCH 01/11] Added search message page --- .../kafdrop/controller/MessageController.java | 57 ++++++- .../java/kafdrop/form/SearchMessageForm.java | 77 +++++++++ .../service/KafkaHighLevelConsumer.java | 55 ++++++ .../java/kafdrop/service/KafkaMonitor.java | 2 + .../kafdrop/service/KafkaMonitorImpl.java | 22 +++ .../java/kafdrop/service/MessageSearcher.java | 41 +++++ .../resources/static/js/message-inspector.js | 4 + .../resources/templates/search-message.ftl | 157 ++++++++++++++++++ src/main/resources/templates/topic-detail.ftl | 4 + 9 files changed, 418 insertions(+), 1 deletion(-) create mode 100644 src/main/java/kafdrop/form/SearchMessageForm.java create mode 100644 src/main/java/kafdrop/service/MessageSearcher.java create mode 100644 src/main/resources/templates/search-message.ftl diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 82e0617c..4d9d961b 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -28,6 +28,8 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import kafdrop.form.SearchMessageForm; +import kafdrop.service.MessageSearcher; import kafdrop.util.*; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; @@ -66,6 +68,8 @@ public final class MessageController { private final MessageInspector messageInspector; + private final MessageSearcher messageSearcher; + private final MessageFormatConfiguration.MessageFormatProperties messageFormatProperties; private final MessageFormatConfiguration.MessageFormatProperties keyFormatProperties; @@ -73,13 +77,14 @@ public final class MessageController { private final ProtobufDescriptorConfiguration.ProtobufDescriptorProperties protobufProperties; - public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) { + public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties, MessageSearcher messageSearcher) { this.kafkaMonitor = kafkaMonitor; this.messageInspector = messageInspector; this.messageFormatProperties = messageFormatProperties; this.keyFormatProperties = keyFormatProperties; this.schemaRegistryProperties = schemaRegistryProperties; this.protobufProperties = protobufProperties; + this.messageSearcher = messageSearcher; } /** @@ -180,6 +185,56 @@ public String viewMessageForm(@PathVariable("name") String topicName, return "message-inspector"; } + /** + * Human friendly view of reading messages. + * @param topicName Name of topic + * @param searchMessageForm Message form for submitting requests to view messages. + * @param errors + * @param model + * @return View for seeing messages in a partition. + */ + @GetMapping("/topic/{name:.+}/search-messages") + public String searchMessageForm(@PathVariable("name") String topicName, + @Valid @ModelAttribute("searchMessageForm") SearchMessageForm searchMessageForm, + BindingResult errors, + Model model) { + final MessageFormat defaultFormat = messageFormatProperties.getFormat(); + final MessageFormat defaultKeyFormat = keyFormatProperties.getFormat(); + + if (searchMessageForm.isEmpty()) { + final SearchMessageForm defaultForm = new SearchMessageForm(); + + defaultForm.setSearchText(""); + defaultForm.setFormat(defaultFormat); + defaultForm.setKeyFormat(defaultFormat); + + model.addAttribute("searchMessageForm", defaultForm); + } + + final TopicVO topic = kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + model.addAttribute("topic", topic); + model.addAttribute("defaultFormat", defaultFormat); + model.addAttribute("messageFormats", MessageFormat.values()); + model.addAttribute("defaultKeyFormat", defaultKeyFormat); + model.addAttribute("keyFormats",KeyFormat.values()); + model.addAttribute("descFiles", protobufProperties.getDescFilesList()); + + if (!searchMessageForm.isEmpty() && !errors.hasErrors()) { + + final var deserializers = new Deserializers( + getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(),searchMessageForm.getMsgTypeName()), + getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(), searchMessageForm.getMsgTypeName()) + ); + + model.addAttribute( + "messages", + messageSearcher.search(topicName, searchMessageForm.getSearchText(), deserializers)); + } + + return "search-message"; + } + /** * Returns the selected nessagr format based on the diff --git a/src/main/java/kafdrop/form/SearchMessageForm.java b/src/main/java/kafdrop/form/SearchMessageForm.java new file mode 100644 index 00000000..16626cbd --- /dev/null +++ b/src/main/java/kafdrop/form/SearchMessageForm.java @@ -0,0 +1,77 @@ +package kafdrop.form; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import kafdrop.util.MessageFormat; + +import javax.validation.constraints.NotBlank; + +public class SearchMessageForm { + + @NotBlank + private String searchText; + + private MessageFormat format; + + private MessageFormat keyFormat; + + private String descFile; + + private String msgTypeName; + + public SearchMessageForm(String searchText, MessageFormat format) { + this.searchText = searchText; + this.format = format; + } + + public SearchMessageForm(String searchText) { + this(searchText, MessageFormat.DEFAULT); + } + + public SearchMessageForm() {} + + @JsonIgnore + public boolean isEmpty() { + return searchText == null || searchText.isEmpty(); + } + + public String getSearchText() { + return searchText; + } + + public void setSearchText(String searchText) { + this.searchText = searchText; + } + + public MessageFormat getKeyFormat() { + return keyFormat; + } + + public void setKeyFormat(MessageFormat keyFormat) { + this.keyFormat = keyFormat; + } + + public MessageFormat getFormat() { + return format; + } + + public void setFormat(MessageFormat format) { + this.format = format; + } + + public String getDescFile() { + return descFile; + } + + public void setDescFile(String descFile) { + this.descFile = descFile; + } + + public String getMsgTypeName() { + return msgTypeName; + } + + public void setMsgTypeName(String msgTypeName) { + this.msgTypeName = msgTypeName; + } +} + diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index c714fa17..ca0997d7 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -21,6 +21,7 @@ @Service public final class KafkaHighLevelConsumer { private static final int POLL_TIMEOUT_MS = 200; + private static final int SEARCH_POLL_TIMEOUT_MS = 1000; private static final Logger LOG = LoggerFactory.getLogger(KafkaHighLevelConsumer.class); @@ -195,6 +196,60 @@ synchronized List> getLatestRecords(String topic, .collect(Collectors.toList()); } + /** + * Searches records from all partitions of a given topic containing a given text. + * @param searchString Searched text. + * @param deserializers Key and Value deserializers + * @return A list of consumer records for a given topic. + */ + synchronized List> searchRecords(String topic, + String searchString, + Deserializers deserializers) { + initializeClient(); + final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); + final var partitions = partitionInfoSet.stream() + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), + partitionInfo.partition())) + .collect(Collectors.toList()); + kafkaConsumer.assign(partitions); + kafkaConsumer.seekToBeginning(partitions); + + final List> rawRecords = new ArrayList<>(); + + boolean moreRecords; + do { + final var polled = kafkaConsumer.poll(Duration.ofMillis(SEARCH_POLL_TIMEOUT_MS)); + + for (var partition : polled.partitions()) { + var records = polled.records(partition); + if (!records.isEmpty()) { + rawRecords.addAll(records); + } + } + + moreRecords = !polled.isEmpty(); + } while (moreRecords); + + String loweredSearchString = searchString.toLowerCase(); + return rawRecords + .stream() + .filter(rec -> deserialize(deserializers.getKeyDeserializer(), rec.key()).toLowerCase().contains(loweredSearchString)) + .map(rec -> new ConsumerRecord<>(rec.topic(), + rec.partition(), + rec.offset(), + rec.timestamp(), + rec.timestampType(), + 0L, + rec.serializedKeySize(), + rec.serializedValueSize(), + deserialize(deserializers.getKeyDeserializer(), rec.key()), + deserialize(deserializers.getValueDeserializer(), rec.value()), + rec.headers(), + rec.leaderEpoch()) + ) + .collect(Collectors.toList()); + } + private static String deserialize(MessageDeserializer deserializer, byte[] bytes) { return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty"; } diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index d4bcc000..2520d51c 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -46,6 +46,8 @@ List getMessages(TopicPartition topicPartition, long offset, int coun List getConsumers(Collection topicVos); + List searchMessages(String topic, String searchString, Deserializers deserializers); + /** * Create topic * @param createTopicDto topic params diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index ea403efe..2e5ea091 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -204,6 +204,28 @@ public List getConsumers(Collection topicVos) { return convert(consumerGroupOffsets, topicVos); } + @Override + public List searchMessages(String topic, String searchString, Deserializers deserializers) { + final int count = 10; + final var records = highLevelConsumer.searchRecords(topic, searchString, deserializers); + if (records != null) { + final var messageVos = new ArrayList(); + for (var record : records) { + final var messageVo = new MessageVO(); + messageVo.setPartition(record.partition()); + messageVo.setOffset(record.offset()); + messageVo.setKey(record.key()); + messageVo.setMessage(record.value()); + messageVo.setHeaders(headersToMap(record.headers())); + messageVo.setTimestamp(new Date(record.timestamp())); + messageVos.add(messageVo); + } + return messageVos; + } else { + return Collections.emptyList(); + } + } + @Override public void createTopic(CreateTopicVO createTopicDto) { var newTopic = new NewTopic( diff --git a/src/main/java/kafdrop/service/MessageSearcher.java b/src/main/java/kafdrop/service/MessageSearcher.java new file mode 100644 index 00000000..192ad09f --- /dev/null +++ b/src/main/java/kafdrop/service/MessageSearcher.java @@ -0,0 +1,41 @@ +/* + * Copyright 2017 Kafdrop contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package kafdrop.service; + +import kafdrop.model.MessageVO; +import kafdrop.util.Deserializers; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +@RequiredArgsConstructor +public final class MessageSearcher { + private final KafkaMonitor kafkaMonitor; + + /** + * Searches messages from all partitions of a given topic. + */ + public List search(final String topicName, final String searchText, final Deserializers deserializers) { + return kafkaMonitor.searchMessages(topicName, searchText, deserializers); + } +} diff --git a/src/main/resources/static/js/message-inspector.js b/src/main/resources/static/js/message-inspector.js index 2b852371..b1b78a94 100644 --- a/src/main/resources/static/js/message-inspector.js +++ b/src/main/resources/static/js/message-inspector.js @@ -17,6 +17,10 @@ */ jQuery(document).ready(function () { + jQuery(document).on('click', '#searchMessagesBtn', function (e) { + jQuery(this).attr('disabled', true).html(' Searching...'); + }); + jQuery(document).on('click', '.toggle-msg', function (e) { var link = jQuery(this), linkIcon = link.find('.fa'), diff --git a/src/main/resources/templates/search-message.ftl b/src/main/resources/templates/search-message.ftl new file mode 100644 index 00000000..f615dc24 --- /dev/null +++ b/src/main/resources/templates/search-message.ftl @@ -0,0 +1,157 @@ +<#ftl output_format="HTML"> +<#-- + Copyright 2016 Kafdrop contributors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<#import "/spring.ftl" as spring /> +<#import "lib/template.ftl" as template> +<@template.header "Topic: ${topic.name}: Search messages"> + + + + + + +<#setting number_format="0"> + + +

Search messages in: ${topic.name}

+ +<#assign selectedFormat=searchMessageForm.format!defaultFormat> +<#assign selectedKeyFormat=searchMessageForm.keyFormat!defaultFormat> + +
+
+
+ <@spring.bind path="searchMessageForm.searchText"/> +
+ + <@spring.formInput path="searchMessageForm.searchText" attributes='class="form-control" size="10"'/> + <#if spring.status.error> + <@spring.showErrors "
"/>
+ +
+    +
+ + +
+    +
+ + +
+    +
+ <#if descFiles?size != 0> + + + <#else> + No available descriptor, please check. + +
+    +
+ <#if descFiles?size != 0> + + <@spring.formInput path="searchMessageForm.msgTypeName" attributes='class="form-control"'/> + +
+    +
+ +
+
+ +<@spring.bind path="searchMessageForm.*"/> +
+ <#if messages?? && messages?size gt 0> +
+
+ ${messages?size} messages found. +
+
+ <#list messages as msg> +
+ Item: ${msg?index + 1}   + Partition: ${msg.partition}   + Offset: ${msg.offset}   + Key: ${msg.key!''}   + Timestamp: ${msg.timestamp?string('yyyy-MM-dd HH:mm:ss.SSS')} + Headers: ${msg.headersFormatted} +
+   +
${msg.message!''}
+
+
+ + <#elseif !(spring.status.error) && !(searchMessageForm.empty)> + No messages found containing ${(searchMessageForm.searchText)!"No search text"} + +
+ +<@template.footer/> \ No newline at end of file diff --git a/src/main/resources/templates/topic-detail.ftl b/src/main/resources/templates/topic-detail.ftl index e41db08a..c7c84833 100644 --- a/src/main/resources/templates/topic-detail.ftl +++ b/src/main/resources/templates/topic-detail.ftl @@ -44,6 +44,10 @@ View Messages +    + + Search Messages + <#if topicDeleteEnabled>
From 38ff63e710803b9947cf24e14778b6b9578c252a Mon Sep 17 00:00:00 2001 From: Nathan Daniels Date: Thu, 17 Jun 2021 19:59:24 -0400 Subject: [PATCH 02/11] Overhaul search functionality protoype. Improved performance by not reading the entire topic into memory and having a safetly limit on the number of messages to read befor the search aborts. Added a maximum results count. Added a date filter to allow user to specify which timestamp to start searching from. Added better output. --- .gitignore | 1 + .../config/HealthCheckConfiguration.java | 2 +- .../kafdrop/controller/MessageController.java | 23 ++- .../java/kafdrop/form/SearchMessageForm.java | 35 +++- .../java/kafdrop/model/SearchResultsVO.java | 43 +++++ .../service/KafkaHighLevelConsumer.java | 117 ++++++++---- .../java/kafdrop/service/KafkaMonitor.java | 11 +- .../kafdrop/service/KafkaMonitorImpl.java | 57 ++++++ .../java/kafdrop/service/SearchResults.java | 52 ++++++ .../resources/templates/search-message.ftlh | 175 ++++++++++++++++++ 10 files changed, 471 insertions(+), 45 deletions(-) create mode 100644 src/main/java/kafdrop/model/SearchResultsVO.java create mode 100644 src/main/java/kafdrop/service/SearchResults.java create mode 100644 src/main/resources/templates/search-message.ftlh diff --git a/.gitignore b/.gitignore index b8bb7eb3..4797ad35 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ settings.xml kafka.properties* kafka.truststore.jks* kafka.keystore.jks* +/.vs diff --git a/src/main/java/kafdrop/config/HealthCheckConfiguration.java b/src/main/java/kafdrop/config/HealthCheckConfiguration.java index 9c8dcc16..0f9e4e1f 100644 --- a/src/main/java/kafdrop/config/HealthCheckConfiguration.java +++ b/src/main/java/kafdrop/config/HealthCheckConfiguration.java @@ -68,4 +68,4 @@ private String getStatus(Health health) { } } } -} +} \ No newline at end of file diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 60a41680..c4b44f05 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -21,6 +21,7 @@ import java.io.File; import java.util.ArrayList; import java.util.Comparator; +import java.util.Date; import java.util.List; import javax.validation.Valid; @@ -52,6 +53,7 @@ import kafdrop.config.MessageFormatConfiguration.MessageFormatProperties; import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties; import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; +import kafdrop.form.SearchMessageForm; import kafdrop.model.MessageVO; import kafdrop.model.TopicPartitionVO; import kafdrop.model.TopicVO; @@ -182,9 +184,9 @@ public String viewMessageForm(@PathVariable("name") String topicName, } /** - * Human friendly view of reading messages. + * Human friendly view of searching messages. * @param topicName Name of topic - * @param searchMessageForm Message form for submitting requests to view messages. + * @param searchMessageForm Message form for submitting requests to search messages. * @param errors * @param model * @return View for seeing messages in a partition. @@ -203,12 +205,14 @@ public String searchMessageForm(@PathVariable("name") String topicName, defaultForm.setSearchText(""); defaultForm.setFormat(defaultFormat); defaultForm.setKeyFormat(defaultFormat); - + defaultForm.setMaximumCount(100); + defaultForm.setStartTimestamp(new Date(0)); model.addAttribute("searchMessageForm", defaultForm); } final TopicVO topic = kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); + model.addAttribute("topic", topic); model.addAttribute("defaultFormat", defaultFormat); model.addAttribute("messageFormats", MessageFormat.values()); @@ -223,15 +227,20 @@ public String searchMessageForm(@PathVariable("name") String topicName, getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(), searchMessageForm.getMsgTypeName()) ); - model.addAttribute( - "messages", - messageSearcher.search(topicName, searchMessageForm.getSearchText(), deserializers)); + var searchResults = kafkaMonitor.searchMessages( + topicName, + searchMessageForm.getSearchText(), + searchMessageForm.getMaximumCount(), + searchMessageForm.getStartTimestamp(), + deserializers); + + model.addAttribute("messages", searchResults.getMessages()); + model.addAttribute("details", searchResults.getCompletionDetails()); } return "search-message"; } - /** * Returns the selected nessagr format based on the * form submission diff --git a/src/main/java/kafdrop/form/SearchMessageForm.java b/src/main/java/kafdrop/form/SearchMessageForm.java index 16626cbd..016f7d26 100644 --- a/src/main/java/kafdrop/form/SearchMessageForm.java +++ b/src/main/java/kafdrop/form/SearchMessageForm.java @@ -1,15 +1,28 @@ package kafdrop.form; import com.fasterxml.jackson.annotation.JsonIgnore; + +import org.springframework.format.annotation.DateTimeFormat; + import kafdrop.util.MessageFormat; +import java.util.Date; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; public class SearchMessageForm { @NotBlank private String searchText; + @NotNull + @Min(1) + @Max(1000) + private Integer maximumCount; + private MessageFormat format; private MessageFormat keyFormat; @@ -17,12 +30,23 @@ public class SearchMessageForm { private String descFile; private String msgTypeName; + + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") + private Date startTimestamp; public SearchMessageForm(String searchText, MessageFormat format) { this.searchText = searchText; this.format = format; } + public Date getStartTimestamp() { + return startTimestamp; + } + + public void setStartTimestamp(Date startTimestamp) { + this.startTimestamp = startTimestamp; + } + public SearchMessageForm(String searchText) { this(searchText, MessageFormat.DEFAULT); } @@ -42,6 +66,14 @@ public void setSearchText(String searchText) { this.searchText = searchText; } + public Integer getMaximumCount() { + return maximumCount; + } + + public void setMaximumCount(Integer maximumCount) { + this.maximumCount = maximumCount; + } + public MessageFormat getKeyFormat() { return keyFormat; } @@ -73,5 +105,4 @@ public String getMsgTypeName() { public void setMsgTypeName(String msgTypeName) { this.msgTypeName = msgTypeName; } -} - +} \ No newline at end of file diff --git a/src/main/java/kafdrop/model/SearchResultsVO.java b/src/main/java/kafdrop/model/SearchResultsVO.java new file mode 100644 index 00000000..cee19727 --- /dev/null +++ b/src/main/java/kafdrop/model/SearchResultsVO.java @@ -0,0 +1,43 @@ +/* + * Copyright 2021 Kafdrop contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + + package kafdrop.model; + +import java.util.List; + +public final class SearchResultsVO { + private List messages; + + private String completionDetails; + + public List getMessages() { + return messages; + } + + public String getCompletionDetails() { + return completionDetails; + } + + public void setCompletionDetails(String completionDetails) { + this.completionDetails = completionDetails; + } + + public void setMessages(List messages) { + this.messages = messages; + } +} diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index ca0997d7..87677dbd 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -2,12 +2,11 @@ import kafdrop.config.*; import kafdrop.model.*; +import kafdrop.service.SearchResults.CompletionReason; import kafdrop.util.*; -import org.apache.kafka.clients.*; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.Node; import org.apache.kafka.common.*; -import org.apache.kafka.common.config.*; import org.apache.kafka.common.serialization.*; import org.slf4j.*; import org.springframework.stereotype.*; @@ -22,7 +21,7 @@ public final class KafkaHighLevelConsumer { private static final int POLL_TIMEOUT_MS = 200; private static final int SEARCH_POLL_TIMEOUT_MS = 1000; - + private static final int SEARCH_MAX_MESSAGES_TO_SCAN = 100000; private static final Logger LOG = LoggerFactory.getLogger(KafkaHighLevelConsumer.class); private KafkaConsumer kafkaConsumer; @@ -195,16 +194,21 @@ synchronized List> getLatestRecords(String topic, rec.leaderEpoch())) .collect(Collectors.toList()); } - + /** * Searches records from all partitions of a given topic containing a given text. + * @param topic The topic * @param searchString Searched text. + * @param maximumCount The maximum number of results to return + * @param startTimestamp The begining message timestamp to search from * @param deserializers Key and Value deserializers * @return A list of consumer records for a given topic. */ - synchronized List> searchRecords(String topic, - String searchString, - Deserializers deserializers) { + synchronized SearchResults searchRecords(String topic, + String searchString, + Integer maximumCount, + Date startTimestamp, + Deserializers deserializers) { initializeClient(); final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); final var partitions = partitionInfoSet.stream() @@ -212,44 +216,93 @@ synchronized List> searchRecords(String topic, partitionInfo.partition())) .collect(Collectors.toList()); kafkaConsumer.assign(partitions); - kafkaConsumer.seekToBeginning(partitions); + + // make the consumer seek to the correct offsets for the start timestamp + final var partitionOffsets = kafkaConsumer.offsetsForTimes(partitions.stream().collect(Collectors.toMap(tp-> tp, tp-> startTimestamp.getTime()))); + + // Seek each partition to that correct offset + for (var partition : partitionOffsets.keySet()) { + var offset = partitionOffsets.get(partition); + + // Old kafka message versions don't have timestamps so the offset would be null for that partition + if (offset == null) { + offset = new OffsetAndTimestamp(0, 0); + } + + kafkaConsumer.seek(partition, offset.offset()); + } + + // Time to search! + final List> foundRecords = new ArrayList<>(); + var moreRecords = true; + var scannedCount = 0; + var loweredSearchString = searchString.toLowerCase(); + var endingTimestamp = Long.MAX_VALUE; - final List> rawRecords = new ArrayList<>(); + while (foundRecords.size() < maximumCount && moreRecords && scannedCount < SEARCH_MAX_MESSAGES_TO_SCAN) { - boolean moreRecords; - do { final var polled = kafkaConsumer.poll(Duration.ofMillis(SEARCH_POLL_TIMEOUT_MS)); + //Loop each partition for (var partition : polled.partitions()) { + + endingTimestamp = Long.MAX_VALUE; + + //Pull records from one partition var records = polled.records(partition); if (!records.isEmpty()) { - rawRecords.addAll(records); + + // Keep track of the lowest timestamp among this batch of records. + // This is what we will report to the user in the event that the search terminates + // early, so that they can perform a new search with this new timestamp as a starting point + var firstTimestamp = records.get(0).timestamp(); + if (firstTimestamp < endingTimestamp){ + endingTimestamp = firstTimestamp; + } + + scannedCount += records.size(); + //Add to found records if it matches the search cretirea + foundRecords.addAll(records.stream() + .filter(rec -> + deserialize(deserializers.getKeyDeserializer(), rec.key()).toLowerCase().contains(loweredSearchString) || + deserialize(deserializers.getValueDeserializer(), rec.value()).toLowerCase().contains(loweredSearchString)) + .map(rec -> new ConsumerRecord<>(rec.topic(), + rec.partition(), + rec.offset(), + rec.timestamp(), + rec.timestampType(), + 0L, + rec.serializedKeySize(), + rec.serializedValueSize(), + deserialize(deserializers.getKeyDeserializer(), rec.key()), + deserialize(deserializers.getValueDeserializer(), rec.value()), + rec.headers(), + rec.leaderEpoch())) + .collect(Collectors.toList())); } } - + + //If no more polled exit the loop moreRecords = !polled.isEmpty(); - } while (moreRecords); + } - String loweredSearchString = searchString.toLowerCase(); - return rawRecords - .stream() - .filter(rec -> deserialize(deserializers.getKeyDeserializer(), rec.key()).toLowerCase().contains(loweredSearchString)) - .map(rec -> new ConsumerRecord<>(rec.topic(), - rec.partition(), - rec.offset(), - rec.timestamp(), - rec.timestampType(), - 0L, - rec.serializedKeySize(), - rec.serializedValueSize(), - deserialize(deserializers.getKeyDeserializer(), rec.key()), - deserialize(deserializers.getValueDeserializer(), rec.value()), - rec.headers(), - rec.leaderEpoch()) - ) - .collect(Collectors.toList()); + SearchResults.CompletionReason completionReason; + if (!moreRecords) { + completionReason = CompletionReason.NO_MORE_MESSAGES_IN_TOPIC; + } + else if (foundRecords.size() >= maximumCount) { + completionReason = CompletionReason.FOUND_REQUESTED_NUMBER_OF_RESULTS; + } + else if (scannedCount >= SEARCH_MAX_MESSAGES_TO_SCAN){ + completionReason = CompletionReason.EXCEEDED_MAX_SCAN_COUNT; + } else { + completionReason = CompletionReason.REACHED_END_OF_TIMESPAN; + } + + return new SearchResults(foundRecords, completionReason, new Date(endingTimestamp), scannedCount); } + private static String deserialize(MessageDeserializer deserializer, byte[] bytes) { return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty"; } diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index 2520d51c..7b83cbdf 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -45,9 +45,14 @@ List getMessages(TopicPartition topicPartition, long offset, int coun ClusterSummaryVO getClusterSummary(Collection topics); List getConsumers(Collection topicVos); - - List searchMessages(String topic, String searchString, Deserializers deserializers); - + + /* Seach messages */ + SearchResultsVO searchMessages(String topic, + String searchString, + Integer maximumCount, + Date startTimestamp, + Deserializers deserializers); + /** * Create topic * @param createTopicDto topic params diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 7026c2de..52790b96 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -195,6 +195,63 @@ private Map getTopicPartitionSizes(TopicVO topic) { return highLevelConsumer.getPartitionSize(topic.getName()); } + @Override + public SearchResultsVO searchMessages(String topic, + String searchString, + Integer maxmuimCount, + Date startTimestamp, + Deserializers deserializers) + { + final var records = highLevelConsumer.searchRecords(topic, searchString, maxmuimCount, startTimestamp, deserializers); + final var results = new SearchResultsVO(); + + if (records != null) { + final var messageVos = new ArrayList(); + results.setMessages(messageVos); + + for (var record : records.getResults()) { + final var messageVo = new MessageVO(); + messageVo.setPartition(record.partition()); + messageVo.setOffset(record.offset()); + messageVo.setKey(record.key()); + messageVo.setMessage(record.value()); + messageVo.setHeaders(headersToMap(record.headers())); + messageVo.setTimestamp(new Date(record.timestamp())); + messageVos.add(messageVo); + } + + switch (records.getCompletionReason()) { + case FOUND_REQUESTED_NUMBER_OF_RESULTS: + results.setCompletionDetails(String.format("Search completed after finding requested number of results. Scanned %d messages.", records.getMessagesScannedCount())); + break; + case EXCEEDED_MAX_SCAN_COUNT: + results.setCompletionDetails( + String.format( + "Search timed out after scanning %d messages. Last scanned message timestamp was %d. Adjust your time span for more results.", + records.getMessagesScannedCount(), + records.getFinalMessageTimestamp())); + break; + case NO_MORE_MESSAGES_IN_TOPIC: + results.setCompletionDetails( + String.format( + "Search reached the end of the topic before finding requested number of results. Scanned %d messages.", + records.getMessagesScannedCount())); + break; + case REACHED_END_OF_TIMESPAN: + results.setCompletionDetails( + String.format( + "Search reached the end of the specified time span before finding requested number of results. Scanned %d messages.", + records.getMessagesScannedCount())); + } + + } else { + results.setCompletionDetails("Unknown error"); + results.setMessages(Collections.emptyList()); + } + + return results; + } + @Override public List getConsumers(Collection topicVos) { final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet()); diff --git a/src/main/java/kafdrop/service/SearchResults.java b/src/main/java/kafdrop/service/SearchResults.java new file mode 100644 index 00000000..367f4bf2 --- /dev/null +++ b/src/main/java/kafdrop/service/SearchResults.java @@ -0,0 +1,52 @@ +package kafdrop.service; + +import java.util.Date; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public final class SearchResults { + + private Date finalMessageTimestamp; + + private long messagesScannedCount; + + private CompletionReason completionReason; + + private List> results; + + public enum CompletionReason { + NO_MORE_MESSAGES_IN_TOPIC, + FOUND_REQUESTED_NUMBER_OF_RESULTS, + EXCEEDED_MAX_SCAN_COUNT, + REACHED_END_OF_TIMESPAN + } + + public SearchResults( + List> results, + CompletionReason completionReason, + Date finalMessageTimestamp, + long messagesScannedCount) { + + this.finalMessageTimestamp = finalMessageTimestamp; + this.messagesScannedCount = messagesScannedCount; + this.completionReason = completionReason; + this.results = results; + } + + public List> getResults() { + return results; + } + + public CompletionReason getCompletionReason() { + return completionReason; + } + + public long getMessagesScannedCount() { + return messagesScannedCount; + } + + public Date getFinalMessageTimestamp() { + return finalMessageTimestamp; + } +} diff --git a/src/main/resources/templates/search-message.ftlh b/src/main/resources/templates/search-message.ftlh new file mode 100644 index 00000000..ed80384f --- /dev/null +++ b/src/main/resources/templates/search-message.ftlh @@ -0,0 +1,175 @@ +<#-- + Copyright 2021 Kafdrop contributors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<#import "/spring.ftl" as spring /> +<#import "lib/template.ftlh" as template> +<@template.header "Topic: ${topic.name}: Search messages"> + + + + + + +<#setting number_format="0"> + +

Search messages in: ${topic.name}

+ +<#assign selectedFormat=searchMessageForm.format!defaultFormat> +<#assign selectedKeyFormat=searchMessageForm.keyFormat!defaultFormat> + +
+ +
+ <@spring.bind path="searchMessageForm.searchText"/> +
+ + <@spring.formInput path="searchMessageForm.searchText" attributes='class="form-control" size="20"'/> + <#if spring.status.error> + <@spring.showErrors "
"/>
+ +
+    + <@spring.bind path="searchMessageForm.maximumCount"/> +
+ + +
+    + <@spring.bind path="searchMessageForm.startTimestamp"/> +
+ + <@spring.formInput path="searchMessageForm.startTimestamp" attributes='class="form-control ${spring.status.error?string("has-error", "")}" size="20"'/> + <#if spring.status.error> + <@spring.showErrors "
"/>
+ +
+
+
+    +
+ + +
+    +
+ + +
+    +
+ <#if descFiles?size != 0> + + + <#else> + No available descriptor, please check. + +
+    +
+ <#if descFiles?size != 0> + + <@spring.formInput path="searchMessageForm.msgTypeName" attributes='class="form-control"'/> + +
+    +
+ + +
+ +<@spring.bind path="searchMessageForm.*"/> +
+ <#if messages?? && messages?size gt 0> +
+
+ ${details} ${messages?size} messages found. +
+
+ <#list messages as msg> +
+ Item: ${msg?index + 1}   + Partition: ${msg.partition}   + Offset: ${msg.offset}   + Key: ${msg.key!''}   + Timestamp: ${msg.timestamp?string('yyyy-MM-dd HH:mm:ss.SSS')} + Headers: ${msg.headersFormatted} +
+   +
${msg.message!''}
+
+
+ + <#elseif !(spring.status.error) && !(searchMessageForm.empty)> + No messages found containing ${(searchMessageForm.searchText)!"No search text"}. ${details} + +
+ +<@template.footer/> \ No newline at end of file From 379c31cc96c856449e5d4f4fc805a5e8ecbf9d78 Mon Sep 17 00:00:00 2001 From: Nathan Daniels Date: Thu, 17 Jun 2021 20:14:36 -0400 Subject: [PATCH 03/11] Fix bad merge --- .../kafdrop/service/KafkaMonitorImpl.java | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 52790b96..eb85ef18 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -261,28 +261,6 @@ public List getConsumers(Collection topicVos) { return convert(consumerGroupOffsets, topicVos); } - @Override - public List searchMessages(String topic, String searchString, Deserializers deserializers) { - final int count = 10; - final var records = highLevelConsumer.searchRecords(topic, searchString, deserializers); - if (records != null) { - final var messageVos = new ArrayList(); - for (var record : records) { - final var messageVo = new MessageVO(); - messageVo.setPartition(record.partition()); - messageVo.setOffset(record.offset()); - messageVo.setKey(record.key()); - messageVo.setMessage(record.value()); - messageVo.setHeaders(headersToMap(record.headers())); - messageVo.setTimestamp(new Date(record.timestamp())); - messageVos.add(messageVo); - } - return messageVos; - } else { - return Collections.emptyList(); - } - } - @Override public void createTopic(CreateTopicVO createTopicDto) { var newTopic = new NewTopic( From ddcebe3f4b0608b1cf150c017a95b3bdc41cbdab Mon Sep 17 00:00:00 2001 From: Nathan Daniels Date: Thu, 17 Jun 2021 20:17:17 -0400 Subject: [PATCH 04/11] Remove redundant class --- .../kafdrop/controller/MessageController.java | 7 +--- .../java/kafdrop/service/MessageSearcher.java | 41 ------------------- 2 files changed, 1 insertion(+), 47 deletions(-) delete mode 100644 src/main/java/kafdrop/service/MessageSearcher.java diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index c4b44f05..766cdde9 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -30,7 +30,6 @@ import javax.validation.constraints.NotNull; import kafdrop.form.SearchMessageForm; -import kafdrop.service.MessageSearcher; import kafdrop.util.*; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; @@ -53,7 +52,6 @@ import kafdrop.config.MessageFormatConfiguration.MessageFormatProperties; import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties; import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; -import kafdrop.form.SearchMessageForm; import kafdrop.model.MessageVO; import kafdrop.model.TopicPartitionVO; import kafdrop.model.TopicVO; @@ -67,22 +65,19 @@ public final class MessageController { private final MessageInspector messageInspector; - private final MessageSearcher messageSearcher; - private final MessageFormatProperties messageFormatProperties; private final MessageFormatProperties keyFormatProperties; private final SchemaRegistryProperties schemaRegistryProperties; private final ProtobufDescriptorProperties protobufProperties; - public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties, MessageSearcher messageSearcher) { + public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) { this.kafkaMonitor = kafkaMonitor; this.messageInspector = messageInspector; this.messageFormatProperties = messageFormatProperties; this.keyFormatProperties = keyFormatProperties; this.schemaRegistryProperties = schemaRegistryProperties; this.protobufProperties = protobufProperties; - this.messageSearcher = messageSearcher; } /** diff --git a/src/main/java/kafdrop/service/MessageSearcher.java b/src/main/java/kafdrop/service/MessageSearcher.java deleted file mode 100644 index 192ad09f..00000000 --- a/src/main/java/kafdrop/service/MessageSearcher.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2017 Kafdrop contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * - */ - -package kafdrop.service; - -import kafdrop.model.MessageVO; -import kafdrop.util.Deserializers; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.internals.Topic; -import org.springframework.stereotype.Service; - -import java.util.List; - -@Service -@RequiredArgsConstructor -public final class MessageSearcher { - private final KafkaMonitor kafkaMonitor; - - /** - * Searches messages from all partitions of a given topic. - */ - public List search(final String topicName, final String searchText, final Deserializers deserializers) { - return kafkaMonitor.searchMessages(topicName, searchText, deserializers); - } -} From 4d878511dce5481ce05af4412b8a26033c234f68 Mon Sep 17 00:00:00 2001 From: Nathan Daniels Date: Thu, 17 Jun 2021 20:22:29 -0400 Subject: [PATCH 05/11] Remove redundant file --- .../resources/templates/search-message.ftl | 157 ------------------ 1 file changed, 157 deletions(-) delete mode 100644 src/main/resources/templates/search-message.ftl diff --git a/src/main/resources/templates/search-message.ftl b/src/main/resources/templates/search-message.ftl deleted file mode 100644 index f615dc24..00000000 --- a/src/main/resources/templates/search-message.ftl +++ /dev/null @@ -1,157 +0,0 @@ -<#ftl output_format="HTML"> -<#-- - Copyright 2016 Kafdrop contributors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<#import "/spring.ftl" as spring /> -<#import "lib/template.ftl" as template> -<@template.header "Topic: ${topic.name}: Search messages"> - - - - - - -<#setting number_format="0"> - - -

Search messages in: ${topic.name}

- -<#assign selectedFormat=searchMessageForm.format!defaultFormat> -<#assign selectedKeyFormat=searchMessageForm.keyFormat!defaultFormat> - -
-
-
- <@spring.bind path="searchMessageForm.searchText"/> -
- - <@spring.formInput path="searchMessageForm.searchText" attributes='class="form-control" size="10"'/> - <#if spring.status.error> - <@spring.showErrors "
"/>
- -
-    -
- - -
-    -
- - -
-    -
- <#if descFiles?size != 0> - - - <#else> - No available descriptor, please check. - -
-    -
- <#if descFiles?size != 0> - - <@spring.formInput path="searchMessageForm.msgTypeName" attributes='class="form-control"'/> - -
-    -
- -
-
- -<@spring.bind path="searchMessageForm.*"/> -
- <#if messages?? && messages?size gt 0> -
-
- ${messages?size} messages found. -
-
- <#list messages as msg> -
- Item: ${msg?index + 1}   - Partition: ${msg.partition}   - Offset: ${msg.offset}   - Key: ${msg.key!''}   - Timestamp: ${msg.timestamp?string('yyyy-MM-dd HH:mm:ss.SSS')} - Headers: ${msg.headersFormatted} -
-   -
${msg.message!''}
-
-
- - <#elseif !(spring.status.error) && !(searchMessageForm.empty)> - No messages found containing ${(searchMessageForm.searchText)!"No search text"} - -
- -<@template.footer/> \ No newline at end of file From 1ee983d578c6dbfee341f53547bf7e3960257eef Mon Sep 17 00:00:00 2001 From: Nathan Daniels Date: Thu, 17 Jun 2021 20:50:59 -0400 Subject: [PATCH 06/11] Make pom customizable for our needs --- pom.xml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index fcb5cd8d..9528b9e3 100644 --- a/pom.xml +++ b/pom.xml @@ -5,11 +5,12 @@ com.obsidiandynamics.kafdrop kafdrop - 3.28.0-SNAPSHOT + ${version} For when you have a Kafka cluster to monitor + 3.28.0-SNAPSHOT 2.4.4 -Xdoclint:none @@ -18,6 +19,7 @@ UTF-8 3.11.1 1.14.3 + obsidiandymanics @@ -266,7 +268,7 @@ docker-maven-plugin 1.2.0 - obsidiandynamics/kafdrop + ${docker.repository}/kafdrop true ${project.build.directory}/docker-ready From 0426a9fb66d0e489427c41b1796182d378d2ef23 Mon Sep 17 00:00:00 2001 From: Nathan Daniels Date: Mon, 12 Jul 2021 20:14:14 -0400 Subject: [PATCH 07/11] Fix formatting and input a bit --- src/main/resources/static/js/message-inspector.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/resources/static/js/message-inspector.js b/src/main/resources/static/js/message-inspector.js index b1b78a94..41044bec 100644 --- a/src/main/resources/static/js/message-inspector.js +++ b/src/main/resources/static/js/message-inspector.js @@ -17,8 +17,8 @@ */ jQuery(document).ready(function () { - jQuery(document).on('click', '#searchMessagesBtn', function (e) { - jQuery(this).attr('disabled', true).html(' Searching...'); + jQuery(document).on('submit', '#searchMessageForm', function (e) { + jQuery("#searchMessagesBtn").attr('disabled', true).html(' Searching...'); }); jQuery(document).on('click', '.toggle-msg', function (e) { From 33436068f9201dbca27a935ffcb7fa9b2ef5c45a Mon Sep 17 00:00:00 2001 From: Nathan Daniels Date: Tue, 13 Jul 2021 09:45:26 -0400 Subject: [PATCH 08/11] Update search message page card layout --- .../resources/templates/search-message.ftlh | 141 +++++++++--------- 1 file changed, 69 insertions(+), 72 deletions(-) diff --git a/src/main/resources/templates/search-message.ftlh b/src/main/resources/templates/search-message.ftlh index ed80384f..1cbd9426 100644 --- a/src/main/resources/templates/search-message.ftlh +++ b/src/main/resources/templates/search-message.ftlh @@ -67,81 +67,78 @@ <#assign selectedKeyFormat=searchMessageForm.keyFormat!defaultFormat>
-
-
- <@spring.bind path="searchMessageForm.searchText"/> -
- - <@spring.formInput path="searchMessageForm.searchText" attributes='class="form-control" size="20"'/> - <#if spring.status.error> - <@spring.showErrors "
"/>
- -
-    - <@spring.bind path="searchMessageForm.maximumCount"/> -
- - -
-    - <@spring.bind path="searchMessageForm.startTimestamp"/> -
- - <@spring.formInput path="searchMessageForm.startTimestamp" attributes='class="form-control ${spring.status.error?string("has-error", "")}" size="20"'/> - <#if spring.status.error> - <@spring.showErrors "
"/>
- -
-
-
-    -
- - -
-    -
- - -
-    -
- <#if descFiles?size != 0> - - - <#else> - No available descriptor, please check. - + +
+ <@spring.bind path="searchMessageForm.searchText"/> +
+ + <@spring.formInput path="searchMessageForm.searchText" attributes='class="form-control" size="40"'/> + <#if spring.status.error> + <@spring.showErrors "
"/>
+ +
+ <@spring.bind path="searchMessageForm.startTimestamp"/> +
+ + <@spring.formInput path="searchMessageForm.startTimestamp" attributes='pattern="[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]+" class="form-control ${spring.status.error?string("has-error", "")}" size="30"'/> + yyyy-MM-dd HH:mm:ss.SSS + <#if spring.status.error> + <@spring.showErrors "
"/>
+ +
+ <@spring.bind path="searchMessageForm.maximumCount"/> +
+ + +
-    -
- <#if descFiles?size != 0> - - <@spring.formInput path="searchMessageForm.msgTypeName" attributes='class="form-control"'/> - +
+
+ + +
+
+ + +
+ +
+ <#if descFiles?size != 0> + + + <#else> + No available descriptor, please check. + +
+
+ <#if descFiles?size != 0> + + <@spring.formInput path="searchMessageForm.msgTypeName" attributes='class="form-control"'/> + +
-    +
+
-
From 0653bf07cf60250c17925d8d5143e1fafffc35da Mon Sep 17 00:00:00 2001 From: Bert Roos Date: Tue, 18 Apr 2023 17:24:24 +0200 Subject: [PATCH 09/11] Standardize code formatting --- .../kafdrop/controller/MessageController.java | 21 +-- .../java/kafdrop/form/SearchMessageForm.java | 144 +++++++++--------- .../java/kafdrop/model/SearchResultsVO.java | 30 ++-- .../service/KafkaHighLevelConsumer.java | 86 +++++------ .../java/kafdrop/service/KafkaMonitor.java | 4 +- .../kafdrop/service/KafkaMonitorImpl.java | 25 +-- .../java/kafdrop/service/SearchResults.java | 90 +++++------ 7 files changed, 202 insertions(+), 198 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 6260eeb1..d9604797 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -197,7 +197,8 @@ public String viewMessageForm(@PathVariable("name") String topicName, /** * Human friendly view of searching messages. - * @param topicName Name of topic + * + * @param topicName Name of topic * @param searchMessageForm Message form for submitting requests to search messages. * @param errors * @param model @@ -205,9 +206,9 @@ public String viewMessageForm(@PathVariable("name") String topicName, */ @GetMapping("/topic/{name:.+}/search-messages") public String searchMessageForm(@PathVariable("name") String topicName, - @Valid @ModelAttribute("searchMessageForm") SearchMessageForm searchMessageForm, - BindingResult errors, - Model model) { + @Valid @ModelAttribute("searchMessageForm") SearchMessageForm searchMessageForm, + BindingResult errors, + Model model) { final MessageFormat defaultFormat = messageFormatProperties.getFormat(); final MessageFormat defaultKeyFormat = keyFormatProperties.getFormat(); @@ -223,20 +224,22 @@ public String searchMessageForm(@PathVariable("name") String topicName, } final TopicVO topic = kafkaMonitor.getTopic(topicName) - .orElseThrow(() -> new TopicNotFoundException(topicName)); + .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); model.addAttribute("defaultFormat", defaultFormat); model.addAttribute("messageFormats", MessageFormat.values()); model.addAttribute("defaultKeyFormat", defaultKeyFormat); - model.addAttribute("keyFormats",KeyFormat.values()); + model.addAttribute("keyFormats", KeyFormat.values()); model.addAttribute("descFiles", protobufProperties.getDescFilesList()); if (!searchMessageForm.isEmpty() && !errors.hasErrors()) { final var deserializers = new Deserializers( - getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(),searchMessageForm.getMsgTypeName()), - getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(), searchMessageForm.getMsgTypeName()) + getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(), + searchMessageForm.getMsgTypeName()), + getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(), + searchMessageForm.getMsgTypeName()) ); var searchResults = kafkaMonitor.searchMessages( @@ -245,7 +248,7 @@ public String searchMessageForm(@PathVariable("name") String topicName, searchMessageForm.getMaximumCount(), searchMessageForm.getStartTimestamp(), deserializers); - + model.addAttribute("messages", searchResults.getMessages()); model.addAttribute("details", searchResults.getCompletionDetails()); } diff --git a/src/main/java/kafdrop/form/SearchMessageForm.java b/src/main/java/kafdrop/form/SearchMessageForm.java index 016f7d26..d9ffb490 100644 --- a/src/main/java/kafdrop/form/SearchMessageForm.java +++ b/src/main/java/kafdrop/form/SearchMessageForm.java @@ -1,108 +1,106 @@ package kafdrop.form; import com.fasterxml.jackson.annotation.JsonIgnore; - -import org.springframework.format.annotation.DateTimeFormat; - import kafdrop.util.MessageFormat; - -import java.util.Date; +import org.springframework.format.annotation.DateTimeFormat; import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; +import java.util.Date; public class SearchMessageForm { - @NotBlank - private String searchText; + @NotBlank + private String searchText; + + @NotNull + @Min(1) + @Max(1000) + private Integer maximumCount; - @NotNull - @Min(1) - @Max(1000) - private Integer maximumCount; + private MessageFormat format; - private MessageFormat format; + private MessageFormat keyFormat; - private MessageFormat keyFormat; + private String descFile; - private String descFile; + private String msgTypeName; - private String msgTypeName; - - @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") - private Date startTimestamp; + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") + private Date startTimestamp; - public SearchMessageForm(String searchText, MessageFormat format) { - this.searchText = searchText; - this.format = format; - } + public SearchMessageForm(String searchText, MessageFormat format) { + this.searchText = searchText; + this.format = format; + } - public Date getStartTimestamp() { - return startTimestamp; - } + public Date getStartTimestamp() { + return startTimestamp; + } - public void setStartTimestamp(Date startTimestamp) { - this.startTimestamp = startTimestamp; - } + public void setStartTimestamp(Date startTimestamp) { + this.startTimestamp = startTimestamp; + } - public SearchMessageForm(String searchText) { - this(searchText, MessageFormat.DEFAULT); - } + public SearchMessageForm(String searchText) { + this(searchText, MessageFormat.DEFAULT); + } - public SearchMessageForm() {} + public SearchMessageForm() { + } - @JsonIgnore - public boolean isEmpty() { - return searchText == null || searchText.isEmpty(); - } + @JsonIgnore + public boolean isEmpty() { + return searchText == null || searchText.isEmpty(); + } - public String getSearchText() { - return searchText; - } + public String getSearchText() { + return searchText; + } - public void setSearchText(String searchText) { - this.searchText = searchText; - } + public void setSearchText(String searchText) { + this.searchText = searchText; + } - public Integer getMaximumCount() { - return maximumCount; - } + public Integer getMaximumCount() { + return maximumCount; + } - public void setMaximumCount(Integer maximumCount) { - this.maximumCount = maximumCount; - } + public void setMaximumCount(Integer maximumCount) { + this.maximumCount = maximumCount; + } - public MessageFormat getKeyFormat() { - return keyFormat; - } + public MessageFormat getKeyFormat() { + return keyFormat; + } - public void setKeyFormat(MessageFormat keyFormat) { - this.keyFormat = keyFormat; - } + public void setKeyFormat(MessageFormat keyFormat) { + this.keyFormat = keyFormat; + } - public MessageFormat getFormat() { - return format; - } + public MessageFormat getFormat() { + return format; + } - public void setFormat(MessageFormat format) { - this.format = format; - } + public void setFormat(MessageFormat format) { + this.format = format; + } - public String getDescFile() { - return descFile; - } + public String getDescFile() { + return descFile; + } - public void setDescFile(String descFile) { - this.descFile = descFile; - } + public void setDescFile(String descFile) { + this.descFile = descFile; + } - public String getMsgTypeName() { - return msgTypeName; - } + public String getMsgTypeName() { + return msgTypeName; + } - public void setMsgTypeName(String msgTypeName) { - this.msgTypeName = msgTypeName; - } -} \ No newline at end of file + public void setMsgTypeName(String msgTypeName) { + this.msgTypeName = msgTypeName; + } +} diff --git a/src/main/java/kafdrop/model/SearchResultsVO.java b/src/main/java/kafdrop/model/SearchResultsVO.java index cee19727..8e632721 100644 --- a/src/main/java/kafdrop/model/SearchResultsVO.java +++ b/src/main/java/kafdrop/model/SearchResultsVO.java @@ -16,28 +16,28 @@ * */ - package kafdrop.model; +package kafdrop.model; import java.util.List; public final class SearchResultsVO { - private List messages; + private List messages; - private String completionDetails; + private String completionDetails; - public List getMessages() { - return messages; - } + public List getMessages() { + return messages; + } - public String getCompletionDetails() { - return completionDetails; - } + public String getCompletionDetails() { + return completionDetails; + } - public void setCompletionDetails(String completionDetails) { - this.completionDetails = completionDetails; - } + public void setCompletionDetails(String completionDetails) { + this.completionDetails = completionDetails; + } - public void setMessages(List messages) { - this.messages = messages; - } + public void setMessages(List messages) { + this.messages = messages; + } } diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index d53b3598..17a97c80 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -211,14 +211,15 @@ synchronized List> getLatestRecords(String topic, rec.leaderEpoch())) .collect(Collectors.toList()); } - + /** * Searches records from all partitions of a given topic containing a given text. - * @param topic The topic - * @param searchString Searched text. - * @param maximumCount The maximum number of results to return + * + * @param topic The topic + * @param searchString Searched text. + * @param maximumCount The maximum number of results to return * @param startTimestamp The begining message timestamp to search from - * @param deserializers Key and Value deserializers + * @param deserializers Key and Value deserializers * @return A list of consumer records for a given topic. */ synchronized SearchResults searchRecords(String topic, @@ -229,14 +230,15 @@ synchronized SearchResults searchRecords(String topic, initializeClient(); final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); final var partitions = partitionInfoSet.stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), - partitionInfo.partition())) - .collect(Collectors.toList()); + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), + partitionInfo.partition())) + .collect(Collectors.toList()); kafkaConsumer.assign(partitions); - + // make the consumer seek to the correct offsets for the start timestamp - final var partitionOffsets = kafkaConsumer.offsetsForTimes(partitions.stream().collect(Collectors.toMap(tp-> tp, tp-> startTimestamp.getTime()))); - + final var partitionOffsets = kafkaConsumer.offsetsForTimes(partitions.stream().collect(Collectors.toMap(tp -> tp, + tp -> startTimestamp.getTime()))); + // Seek each partition to that correct offset for (var partition : partitionOffsets.keySet()) { var offset = partitionOffsets.get(partition); @@ -269,36 +271,36 @@ synchronized SearchResults searchRecords(String topic, var records = polled.records(partition); if (!records.isEmpty()) { - // Keep track of the lowest timestamp among this batch of records. - // This is what we will report to the user in the event that the search terminates - // early, so that they can perform a new search with this new timestamp as a starting point - var firstTimestamp = records.get(0).timestamp(); - if (firstTimestamp < endingTimestamp){ - endingTimestamp = firstTimestamp; - } - - scannedCount += records.size(); - //Add to found records if it matches the search cretirea - foundRecords.addAll(records.stream() - .filter(rec -> - deserialize(deserializers.getKeyDeserializer(), rec.key()).toLowerCase().contains(loweredSearchString) || - deserialize(deserializers.getValueDeserializer(), rec.value()).toLowerCase().contains(loweredSearchString)) - .map(rec -> new ConsumerRecord<>(rec.topic(), - rec.partition(), - rec.offset(), - rec.timestamp(), - rec.timestampType(), - 0L, - rec.serializedKeySize(), - rec.serializedValueSize(), - deserialize(deserializers.getKeyDeserializer(), rec.key()), - deserialize(deserializers.getValueDeserializer(), rec.value()), - rec.headers(), - rec.leaderEpoch())) - .collect(Collectors.toList())); + // Keep track of the lowest timestamp among this batch of records. + // This is what we will report to the user in the event that the search terminates + // early, so that they can perform a new search with this new timestamp as a starting point + var firstTimestamp = records.get(0).timestamp(); + if (firstTimestamp < endingTimestamp) { + endingTimestamp = firstTimestamp; + } + + scannedCount += records.size(); + //Add to found records if it matches the search cretirea + foundRecords.addAll(records.stream() + .filter(rec -> + deserialize(deserializers.getKeyDeserializer(), rec.key()).toLowerCase().contains(loweredSearchString) || + deserialize(deserializers.getValueDeserializer(), rec.value()).toLowerCase().contains(loweredSearchString)) + .map(rec -> new ConsumerRecord<>(rec.topic(), + rec.partition(), + rec.offset(), + rec.timestamp(), + rec.timestampType(), + 0L, + rec.serializedKeySize(), + rec.serializedValueSize(), + deserialize(deserializers.getKeyDeserializer(), rec.key()), + deserialize(deserializers.getValueDeserializer(), rec.value()), + rec.headers(), + rec.leaderEpoch())) + .collect(Collectors.toList())); } } - + //If no more polled exit the loop moreRecords = !polled.isEmpty(); } @@ -306,11 +308,9 @@ synchronized SearchResults searchRecords(String topic, SearchResults.CompletionReason completionReason; if (!moreRecords) { completionReason = CompletionReason.NO_MORE_MESSAGES_IN_TOPIC; - } - else if (foundRecords.size() >= maximumCount) { + } else if (foundRecords.size() >= maximumCount) { completionReason = CompletionReason.FOUND_REQUESTED_NUMBER_OF_RESULTS; - } - else if (scannedCount >= SEARCH_MAX_MESSAGES_TO_SCAN){ + } else if (scannedCount >= SEARCH_MAX_MESSAGES_TO_SCAN) { completionReason = CompletionReason.EXCEEDED_MAX_SCAN_COUNT; } else { completionReason = CompletionReason.REACHED_END_OF_TIMESPAN; diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index c4b45ecc..5cb95d9e 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -56,9 +56,9 @@ List getMessages(TopicPartition topicPartition, long offset, int coun List getConsumersByTopics(Collection topicVos); - SearchResultsVO searchMessages(String topic, + SearchResultsVO searchMessages(String topic, String searchString, - Integer maximumCount, + Integer maximumCount, Date startTimestamp, Deserializers deserializers); diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 0cad96fc..f9123544 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -247,13 +247,13 @@ public List getConsumersByTopics(Collection topicVos) { } @Override - public SearchResultsVO searchMessages(String topic, + public SearchResultsVO searchMessages(String topic, String searchString, - Integer maxmuimCount, + Integer maxmuimCount, Date startTimestamp, - Deserializers deserializers) - { - final var records = highLevelConsumer.searchRecords(topic, searchString, maxmuimCount, startTimestamp, deserializers); + Deserializers deserializers) { + final var records = highLevelConsumer.searchRecords(topic, searchString, maxmuimCount, startTimestamp, + deserializers); final var results = new SearchResultsVO(); if (records != null) { @@ -269,29 +269,32 @@ public SearchResultsVO searchMessages(String topic, messageVo.setHeaders(headersToMap(record.headers())); messageVo.setTimestamp(new Date(record.timestamp())); messageVos.add(messageVo); - } + } switch (records.getCompletionReason()) { case FOUND_REQUESTED_NUMBER_OF_RESULTS: - results.setCompletionDetails(String.format("Search completed after finding requested number of results. Scanned %d messages.", records.getMessagesScannedCount())); + results.setCompletionDetails(String.format("Search completed after finding requested number of results. " + + "Scanned %d messages.", records.getMessagesScannedCount())); break; case EXCEEDED_MAX_SCAN_COUNT: results.setCompletionDetails( String.format( - "Search timed out after scanning %d messages. Last scanned message timestamp was %d. Adjust your time span for more results.", + "Search timed out after scanning %d messages. Last scanned message timestamp was %d. Adjust your time" + + " span for more results.", records.getMessagesScannedCount(), records.getFinalMessageTimestamp())); break; case NO_MORE_MESSAGES_IN_TOPIC: results.setCompletionDetails( String.format( - "Search reached the end of the topic before finding requested number of results. Scanned %d messages.", + "Search reached the end of the topic before finding requested number of results. Scanned %d messages.", records.getMessagesScannedCount())); break; case REACHED_END_OF_TIMESPAN: results.setCompletionDetails( String.format( - "Search reached the end of the specified time span before finding requested number of results. Scanned %d messages.", + "Search reached the end of the specified time span before finding requested number of results. Scanned" + + " %d messages.", records.getMessagesScannedCount())); } @@ -302,7 +305,7 @@ public SearchResultsVO searchMessages(String topic, return results; } - + @Override public void createTopic(CreateTopicVO createTopicDto) { var newTopic = new NewTopic( diff --git a/src/main/java/kafdrop/service/SearchResults.java b/src/main/java/kafdrop/service/SearchResults.java index 367f4bf2..8e636a81 100644 --- a/src/main/java/kafdrop/service/SearchResults.java +++ b/src/main/java/kafdrop/service/SearchResults.java @@ -1,52 +1,52 @@ package kafdrop.service; +import org.apache.kafka.clients.consumer.ConsumerRecord; + import java.util.Date; import java.util.List; -import org.apache.kafka.clients.consumer.ConsumerRecord; - public final class SearchResults { - - private Date finalMessageTimestamp; - - private long messagesScannedCount; - - private CompletionReason completionReason; - - private List> results; - - public enum CompletionReason { - NO_MORE_MESSAGES_IN_TOPIC, - FOUND_REQUESTED_NUMBER_OF_RESULTS, - EXCEEDED_MAX_SCAN_COUNT, - REACHED_END_OF_TIMESPAN - } - - public SearchResults( - List> results, - CompletionReason completionReason, - Date finalMessageTimestamp, - long messagesScannedCount) { - - this.finalMessageTimestamp = finalMessageTimestamp; - this.messagesScannedCount = messagesScannedCount; - this.completionReason = completionReason; - this.results = results; - } - - public List> getResults() { - return results; - } - - public CompletionReason getCompletionReason() { - return completionReason; - } - - public long getMessagesScannedCount() { - return messagesScannedCount; - } - - public Date getFinalMessageTimestamp() { - return finalMessageTimestamp; - } + + private Date finalMessageTimestamp; + + private long messagesScannedCount; + + private CompletionReason completionReason; + + private List> results; + + public enum CompletionReason { + NO_MORE_MESSAGES_IN_TOPIC, + FOUND_REQUESTED_NUMBER_OF_RESULTS, + EXCEEDED_MAX_SCAN_COUNT, + REACHED_END_OF_TIMESPAN + } + + public SearchResults( + List> results, + CompletionReason completionReason, + Date finalMessageTimestamp, + long messagesScannedCount) { + + this.finalMessageTimestamp = finalMessageTimestamp; + this.messagesScannedCount = messagesScannedCount; + this.completionReason = completionReason; + this.results = results; + } + + public List> getResults() { + return results; + } + + public CompletionReason getCompletionReason() { + return completionReason; + } + + public long getMessagesScannedCount() { + return messagesScannedCount; + } + + public Date getFinalMessageTimestamp() { + return finalMessageTimestamp; + } } From fb5ece2b5ec877676bc56f39d06ef537eeae21f8 Mon Sep 17 00:00:00 2001 From: Bert Roos Date: Wed, 19 Apr 2023 15:42:20 +0200 Subject: [PATCH 10/11] Addressed compilation errors and refactored Refactored to increase maintainability --- .../config/HealthCheckConfiguration.java | 2 +- .../kafdrop/controller/MessageController.java | 11 +- .../java/kafdrop/form/SearchMessageForm.java | 8 +- .../service/KafkaHighLevelConsumer.java | 207 +++++++++--------- .../java/kafdrop/service/KafkaMonitor.java | 2 + .../kafdrop/service/KafkaMonitorImpl.java | 1 + 6 files changed, 119 insertions(+), 112 deletions(-) diff --git a/src/main/java/kafdrop/config/HealthCheckConfiguration.java b/src/main/java/kafdrop/config/HealthCheckConfiguration.java index 2e7d5064..ebe011b8 100644 --- a/src/main/java/kafdrop/config/HealthCheckConfiguration.java +++ b/src/main/java/kafdrop/config/HealthCheckConfiguration.java @@ -72,4 +72,4 @@ private String getStatus(Health health) { } } } -} \ No newline at end of file +} diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index d9604797..deae0f06 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -210,7 +210,7 @@ public String searchMessageForm(@PathVariable("name") String topicName, BindingResult errors, Model model) { final MessageFormat defaultFormat = messageFormatProperties.getFormat(); - final MessageFormat defaultKeyFormat = keyFormatProperties.getFormat(); + final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat(); if (searchMessageForm.isEmpty()) { final SearchMessageForm defaultForm = new SearchMessageForm(); @@ -237,9 +237,11 @@ public String searchMessageForm(@PathVariable("name") String topicName, final var deserializers = new Deserializers( getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(), - searchMessageForm.getMsgTypeName()), + searchMessageForm.getMsgTypeName(), + protobufProperties.getParseAnyProto()), getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(), - searchMessageForm.getMsgTypeName()) + searchMessageForm.getMsgTypeName(), + protobufProperties.getParseAnyProto()) ); var searchResults = kafkaMonitor.searchMessages( @@ -257,8 +259,7 @@ public String searchMessageForm(@PathVariable("name") String topicName, } /** - * Returns the selected nessagr format based on the - * form submission + * Returns the selected message format based on the form submission * * @param format String representation of format name * @return diff --git a/src/main/java/kafdrop/form/SearchMessageForm.java b/src/main/java/kafdrop/form/SearchMessageForm.java index d9ffb490..6127539a 100644 --- a/src/main/java/kafdrop/form/SearchMessageForm.java +++ b/src/main/java/kafdrop/form/SearchMessageForm.java @@ -1,13 +1,13 @@ package kafdrop.form; import com.fasterxml.jackson.annotation.JsonIgnore; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; import kafdrop.util.MessageFormat; import org.springframework.format.annotation.DateTimeFormat; -import javax.validation.constraints.Max; -import javax.validation.constraints.Min; -import javax.validation.constraints.NotBlank; -import javax.validation.constraints.NotNull; import java.util.Date; public class SearchMessageForm { diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 17a97c80..07e470bf 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -10,11 +10,14 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -25,6 +28,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; @@ -70,14 +74,14 @@ synchronized void setTopicPartitionSizes(List topics) { Map> allTopics = topics.stream().map(topicVO -> { List topicPartitions = topicVO.getPartitions().stream().map(topicPartitionVO -> new TopicPartition(topicVO.getName(), topicPartitionVO.getId()) - ).collect(Collectors.toList()); + ).toList(); return Pair.of(topicVO, topicPartitions); }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); List allTopicPartitions = allTopics.values().stream() .flatMap(Collection::stream) - .collect(Collectors.toList()); + .toList(); kafkaConsumer.assign(allTopicPartitions); Map beginningOffset = kafkaConsumer.beginningOffsets(allTopicPartitions); @@ -104,7 +108,7 @@ synchronized void setTopicPartitionSizes(List topics) { * @param partition Topic partition * @param offset Offset to seek from * @param count Maximum number of records returned - * @param deserializers Key and Value deserialiser + * @param deserializers Key and Value deserializer * @return Latest records */ synchronized List> getLatestRecords(TopicPartition partition, long offset, int count, @@ -138,18 +142,8 @@ synchronized List> getLatestRecords(TopicPartitio return rawRecords .subList(0, Math.min(count, rawRecords.size())) .stream() - .map(rec -> new ConsumerRecord<>(rec.topic(), - rec.partition(), - rec.offset(), - rec.timestamp(), - rec.timestampType(), - rec.serializedKeySize(), - rec.serializedValueSize(), - deserialize(deserializers.getKeyDeserializer(), rec.key()), - deserialize(deserializers.getValueDeserializer(), rec.value()), - rec.headers(), - rec.leaderEpoch())) - .collect(Collectors.toList()); + .map(rec -> createConsumerRecord(rec, deserializers)) + .toList(); } /** @@ -163,11 +157,7 @@ synchronized List> getLatestRecords(String topic, int count, Deserializers deserializers) { initializeClient(); - final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); - final var partitions = partitionInfoSet.stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), - partitionInfo.partition())) - .collect(Collectors.toList()); + final List partitions = determinePartitionsForTopic(topic); kafkaConsumer.assign(partitions); final var latestOffsets = kafkaConsumer.endOffsets(partitions); @@ -198,18 +188,8 @@ synchronized List> getLatestRecords(String topic, .values() .stream() .flatMap(Collection::stream) - .map(rec -> new ConsumerRecord<>(rec.topic(), - rec.partition(), - rec.offset(), - rec.timestamp(), - rec.timestampType(), - rec.serializedKeySize(), - rec.serializedValueSize(), - deserialize(deserializers.getKeyDeserializer(), rec.key()), - deserialize(deserializers.getValueDeserializer(), rec.value()), - rec.headers(), - rec.leaderEpoch())) - .collect(Collectors.toList()); + .map(rec -> createConsumerRecord(rec, deserializers)) + .toList(); } /** @@ -218,7 +198,7 @@ synchronized List> getLatestRecords(String topic, * @param topic The topic * @param searchString Searched text. * @param maximumCount The maximum number of results to return - * @param startTimestamp The begining message timestamp to search from + * @param startTimestamp The message timestamp to search from * @param deserializers Key and Value deserializers * @return A list of consumer records for a given topic. */ @@ -228,95 +208,118 @@ synchronized SearchResults searchRecords(String topic, Date startTimestamp, Deserializers deserializers) { initializeClient(); - final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); - final var partitions = partitionInfoSet.stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), - partitionInfo.partition())) - .collect(Collectors.toList()); + final List partitions = determinePartitionsForTopic(topic); kafkaConsumer.assign(partitions); + seekToTimestamp(partitions, startTimestamp); - // make the consumer seek to the correct offsets for the start timestamp - final var partitionOffsets = kafkaConsumer.offsetsForTimes(partitions.stream().collect(Collectors.toMap(tp -> tp, - tp -> startTimestamp.getTime()))); - - // Seek each partition to that correct offset - for (var partition : partitionOffsets.keySet()) { - var offset = partitionOffsets.get(partition); - - // Old kafka message versions don't have timestamps so the offset would be null for that partition - if (offset == null) { - offset = new OffsetAndTimestamp(0, 0); - } + return searchRecords(searchString, maximumCount, deserializers); + } - kafkaConsumer.seek(partition, offset.offset()); - } + private void seekToTimestamp(List partitions, Date startTimestamp) { + kafkaConsumer.offsetsForTimes(partitions.stream() + .collect(Collectors.toMap(tp -> tp, tp -> startTimestamp.getTime()))) + .forEach(this::seekToOffset); + } - // Time to search! + @NotNull + private SearchResults searchRecords(String searchString, Integer maximumCount, Deserializers deserializers) { final List> foundRecords = new ArrayList<>(); - var moreRecords = true; - var scannedCount = 0; + boolean moreRecords; var loweredSearchString = searchString.toLowerCase(); - var endingTimestamp = Long.MAX_VALUE; - - while (foundRecords.size() < maximumCount && moreRecords && scannedCount < SEARCH_MAX_MESSAGES_TO_SCAN) { + var scanStatus = new ScanStatus(Long.MAX_VALUE, 0); + do { final var polled = kafkaConsumer.poll(Duration.ofMillis(SEARCH_POLL_TIMEOUT_MS)); + scanStatus = searchPolledRecords(polled, deserializers, loweredSearchString, foundRecords, scanStatus); - //Loop each partition - for (var partition : polled.partitions()) { - - endingTimestamp = Long.MAX_VALUE; - - //Pull records from one partition - var records = polled.records(partition); - if (!records.isEmpty()) { - - // Keep track of the lowest timestamp among this batch of records. - // This is what we will report to the user in the event that the search terminates - // early, so that they can perform a new search with this new timestamp as a starting point - var firstTimestamp = records.get(0).timestamp(); - if (firstTimestamp < endingTimestamp) { - endingTimestamp = firstTimestamp; - } - - scannedCount += records.size(); - //Add to found records if it matches the search cretirea - foundRecords.addAll(records.stream() - .filter(rec -> - deserialize(deserializers.getKeyDeserializer(), rec.key()).toLowerCase().contains(loweredSearchString) || - deserialize(deserializers.getValueDeserializer(), rec.value()).toLowerCase().contains(loweredSearchString)) - .map(rec -> new ConsumerRecord<>(rec.topic(), - rec.partition(), - rec.offset(), - rec.timestamp(), - rec.timestampType(), - 0L, - rec.serializedKeySize(), - rec.serializedValueSize(), - deserialize(deserializers.getKeyDeserializer(), rec.key()), - deserialize(deserializers.getValueDeserializer(), rec.value()), - rec.headers(), - rec.leaderEpoch())) - .collect(Collectors.toList())); - } - } - - //If no more polled exit the loop moreRecords = !polled.isEmpty(); - } + } while (foundRecords.size() < maximumCount + && moreRecords + && scanStatus.scannedCount() < SEARCH_MAX_MESSAGES_TO_SCAN); + - SearchResults.CompletionReason completionReason; + CompletionReason completionReason; if (!moreRecords) { completionReason = CompletionReason.NO_MORE_MESSAGES_IN_TOPIC; } else if (foundRecords.size() >= maximumCount) { completionReason = CompletionReason.FOUND_REQUESTED_NUMBER_OF_RESULTS; - } else if (scannedCount >= SEARCH_MAX_MESSAGES_TO_SCAN) { + } else if (scanStatus.scannedCount() >= SEARCH_MAX_MESSAGES_TO_SCAN) { completionReason = CompletionReason.EXCEEDED_MAX_SCAN_COUNT; } else { + // TODO: This situation cannot occur. There is no exit criterion on end of timespan completionReason = CompletionReason.REACHED_END_OF_TIMESPAN; } - return new SearchResults(foundRecords, completionReason, new Date(endingTimestamp), scannedCount); + return new SearchResults(foundRecords, completionReason, new Date(scanStatus.endingTimestamp()), + scanStatus.scannedCount()); + } + + private ScanStatus searchPolledRecords(ConsumerRecords polled, Deserializers deserializers, + String loweredSearchString, + List> foundRecords, ScanStatus scanStatus) { + for (var partition : polled.partitions()) { + scanStatus = searchPolledRecordsOfPartition(polled, partition, deserializers, loweredSearchString, foundRecords, + scanStatus); + } + return scanStatus; + } + + private ScanStatus searchPolledRecordsOfPartition(ConsumerRecords polled, + TopicPartition partition, Deserializers deserializers, + String loweredSearchString, + List> foundRecords, + ScanStatus scanStatus) { + var records = polled.records(partition); + if (!records.isEmpty()) { + + scanStatus = ScanStatus.createInstance(scanStatus, records.get(0).timestamp(), records.size()); + // Add to found records if it matches the search criteria + foundRecords.addAll(records.stream() + .filter(rec -> + containsValue(deserializers.getKeyDeserializer(), rec.key(), loweredSearchString) || + containsValue(deserializers.getValueDeserializer(), rec.value(), loweredSearchString)) + .map(rec -> createConsumerRecord(rec, deserializers)) + .toList()); + } + return scanStatus; + } + + private record ScanStatus(long endingTimestamp, int scannedCount) { + public static ScanStatus createInstance(ScanStatus previousScanStatus, + long firstTimestampOfBLock, + int sizeOfBlock) { + // Keep track of the lowest timestamp among this batch of records. + // This is what we will report to the user in the event that the search terminates + // early, so that they can perform a new search with this new timestamp as a starting point + return new ScanStatus((firstTimestampOfBLock < previousScanStatus.endingTimestamp) ? + firstTimestampOfBLock : + previousScanStatus.endingTimestamp(), previousScanStatus.scannedCount() + sizeOfBlock); + } + } + + @NotNull + private List determinePartitionsForTopic(String topic) { + return kafkaConsumer.partitionsFor(topic).stream() + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) + .toList(); + } + + private void seekToOffset(TopicPartition partition, OffsetAndTimestamp offset) { + // Old kafka message versions don't have timestamps so the offset would be null for that partition + kafkaConsumer.seek(partition, (offset == null) ? 0 : offset.offset()); + } + + @NotNull + private static ConsumerRecord createConsumerRecord(ConsumerRecord rec, + Deserializers deserializers) { + return new ConsumerRecord<>(rec.topic(), rec.partition(), rec.offset(), rec.timestamp(), rec.timestampType(), + rec.serializedKeySize(), rec.serializedValueSize(), deserialize(deserializers.getKeyDeserializer(), rec.key()), + deserialize(deserializers.getValueDeserializer(), rec.value()), rec.headers(), + rec.leaderEpoch()); + } + + private boolean containsValue(MessageDeserializer deserializer, byte[] value, String loweredSearchString) { + return deserialize(deserializer, value).toLowerCase().contains(loweredSearchString); } diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index 5cb95d9e..c45a195a 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -24,11 +24,13 @@ import kafdrop.model.ConsumerVO; import kafdrop.model.CreateTopicVO; import kafdrop.model.MessageVO; +import kafdrop.model.SearchResultsVO; import kafdrop.model.TopicVO; import kafdrop.util.Deserializers; import org.apache.kafka.common.TopicPartition; import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.Optional; diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index f9123544..483f050f 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -26,6 +26,7 @@ import kafdrop.model.ConsumerVO; import kafdrop.model.CreateTopicVO; import kafdrop.model.MessageVO; +import kafdrop.model.SearchResultsVO; import kafdrop.model.TopicPartitionVO; import kafdrop.model.TopicVO; import kafdrop.util.Deserializers; From df47e04bc70e52f995cd0277aadb939b1db44fff Mon Sep 17 00:00:00 2001 From: Bert Roos Date: Tue, 18 Jul 2023 20:35:31 +0200 Subject: [PATCH 11/11] Removed impossible end state REACHED_END_OF_TIMESPAN --- src/main/java/kafdrop/service/KafkaHighLevelConsumer.java | 3 +-- src/main/java/kafdrop/service/KafkaMonitorImpl.java | 6 ------ src/main/java/kafdrop/service/SearchResults.java | 3 +-- 3 files changed, 2 insertions(+), 10 deletions(-) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 07e470bf..fa0f0b29 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -246,8 +246,7 @@ private SearchResults searchRecords(String searchString, Integer maximumCount, D } else if (scanStatus.scannedCount() >= SEARCH_MAX_MESSAGES_TO_SCAN) { completionReason = CompletionReason.EXCEEDED_MAX_SCAN_COUNT; } else { - // TODO: This situation cannot occur. There is no exit criterion on end of timespan - completionReason = CompletionReason.REACHED_END_OF_TIMESPAN; + throw new IllegalStateException("This situation is unexpected"); } return new SearchResults(foundRecords, completionReason, new Date(scanStatus.endingTimestamp()), diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 483f050f..89aacf6a 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -291,12 +291,6 @@ public SearchResultsVO searchMessages(String topic, "Search reached the end of the topic before finding requested number of results. Scanned %d messages.", records.getMessagesScannedCount())); break; - case REACHED_END_OF_TIMESPAN: - results.setCompletionDetails( - String.format( - "Search reached the end of the specified time span before finding requested number of results. Scanned" + - " %d messages.", - records.getMessagesScannedCount())); } } else { diff --git a/src/main/java/kafdrop/service/SearchResults.java b/src/main/java/kafdrop/service/SearchResults.java index 8e636a81..9d8e4f3c 100644 --- a/src/main/java/kafdrop/service/SearchResults.java +++ b/src/main/java/kafdrop/service/SearchResults.java @@ -18,8 +18,7 @@ public final class SearchResults { public enum CompletionReason { NO_MORE_MESSAGES_IN_TOPIC, FOUND_REQUESTED_NUMBER_OF_RESULTS, - EXCEEDED_MAX_SCAN_COUNT, - REACHED_END_OF_TIMESPAN + EXCEEDED_MAX_SCAN_COUNT } public SearchResults(