Skip to content

BE: Full text search support #1267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8a03bfd
Use typespec contracts
germanosin Jun 9, 2025
7d5af90
Merge branch 'main' into typespec
germanosin Jun 9, 2025
b8b91fc
Fixed styling
germanosin Jun 10, 2025
155b038
Merge remote-tracking branch 'origin/typespec' into typespec
germanosin Jun 10, 2025
a2175a1
Fixed styling
germanosin Jun 10, 2025
5d4f419
enable typespec by default
germanosin Jun 11, 2025
8cd0a0a
Added info
germanosin Jun 11, 2025
6ce97f8
Frontend use new contracts
germanosin Jun 11, 2025
a7a59d8
Fixes in contracts for backward compatibility
germanosin Jun 11, 2025
d0ef634
use typespec build
germanosin Jun 12, 2025
c29091f
Install pnpm dependencies
germanosin Jun 12, 2025
d67068c
Merged main
germanosin Aug 7, 2025
f114d44
Actualize typespec
germanosin Aug 7, 2025
4279d6c
Actualize typespec
germanosin Aug 7, 2025
a1c261b
fixed frontend build
germanosin Aug 7, 2025
ea75152
fixed frontend build
germanosin Aug 7, 2025
5341c20
fixed frontend build
germanosin Aug 7, 2025
c61da13
fixed frontend build
germanosin Aug 7, 2025
2c670f6
fixed frontend build
germanosin Aug 7, 2025
4bf3779
fixed frontend build
germanosin Aug 7, 2025
fa8b03b
fixed frontend build
germanosin Aug 7, 2025
bcae096
Enabled contract validation
germanosin Aug 7, 2025
db746fa
Removed frontend test report
germanosin Aug 7, 2025
a42e81e
Merge branch 'main' into typespec
germanosin Aug 12, 2025
38ee6c9
Synced with main
germanosin Aug 12, 2025
0474fd7
Synced with main
germanosin Aug 12, 2025
c27b32d
Added lucene
germanosin Aug 13, 2025
153c44d
Close stats
germanosin Aug 13, 2025
c9a6d94
fts
germanosin Aug 14, 2025
3706fb2
Merge branch 'main' into issues/1087-fts
germanosin Aug 14, 2025
d887668
Merge branch 'main' into issues/1087-fts
germanosin Aug 14, 2025
56889da
Merge branch 'main' into issues/1087-fts
germanosin Aug 15, 2025
bb49399
Merge branch 'main' into issues/1087-fts
germanosin Aug 18, 2025
6da3269
Ngram config and toics optimizations
germanosin Aug 21, 2025
41a4b52
Merge branch 'main' into issues/1087-fts
germanosin Aug 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ dependencies {
antlr libs.antlr
implementation libs.antlr.runtime

implementation libs.lucene
implementation libs.lucene.queryparser
implementation libs.lucene.analysis.common

implementation libs.opendatadiscovery.oddrn
implementation(libs.opendatadiscovery.client) {
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux'
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ClustersProperties {
MetricsStorage defaultMetricsStorage = new MetricsStorage();

CacheProperties cache = new CacheProperties();
FtsProperties fts = new FtsProperties();

@Data
public static class Cluster {
Expand Down Expand Up @@ -217,6 +218,18 @@ public static class CacheProperties {
Duration connectClusterCacheExpiry = Duration.ofHours(24);
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class FtsProperties {
boolean enabled = false;
boolean topicsNgramEnabled = false;
int topicsMinNGram = 3;
int topicsMaxNGram = 5;
int filterMinNGram = 1;
int filterMaxNGram = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe create

class NgramSettings {
 int minNGram = 1;
 int maxNGram = 4;
}

and tune it for each search

public static class FtsProperties {
  ...
  NgramSettings topiscNgram = new NgramSettings(3,5);
  NgramSettings schemasNgram = new NgramSettings(1,4);
  NgramSettings consumerGroupsNgram = new NgramSettings(1,4);
}

}

@PostConstruct
public void validateAndSetDefaults() {
if (clusters != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.kafbat.ui.controller;

import static org.apache.commons.lang3.Strings.CI;

import io.kafbat.ui.api.SchemasApi;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.mapper.KafkaSrMapper;
import io.kafbat.ui.mapper.KafkaSrMapperImpl;
Expand All @@ -15,13 +14,13 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.SchemaAction;
import io.kafbat.ui.service.SchemaRegistryService;
import io.kafbat.ui.service.index.SchemasFilter;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.List;
import java.util.Map;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand All @@ -38,6 +37,7 @@ public class SchemasController extends AbstractController implements SchemasApi,
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();

private final SchemaRegistryService schemaRegistryService;
private final ClustersProperties clustersProperties;

@Override
protected KafkaCluster getCluster(String clusterName) {
Expand Down Expand Up @@ -214,6 +214,8 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
.operationName("getSchemas")
.build();

ClustersProperties.FtsProperties fts = clustersProperties.getFts();

return schemaRegistryService
.getAllSubjectNames(getCluster(clusterName))
.flatMapIterable(l -> l)
Expand All @@ -222,10 +224,12 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
.flatMap(subjects -> {
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
List<String> filteredSubjects = subjects
.stream()
.filter(subj -> search == null || CI.contains(subj, search))
.sorted().toList();

SchemasFilter filter =
new SchemasFilter(subjects, fts.getFilterMinNGram(), fts.getFilterMaxNGram(), fts.isEnabled());

List<String> filteredSubjects = filter.find(search);

var totalPages = (filteredSubjects.size() / pageSize)
+ (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
List<String> subjectsToRender = filteredSubjects.stream()
Expand Down
24 changes: 18 additions & 6 deletions api/src/main/java/io/kafbat/ui/controller/TopicsController.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.apache.commons.lang3.Strings.CI;

import io.kafbat.ui.api.TopicsApi;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.mapper.ClusterMapper;
import io.kafbat.ui.model.InternalTopic;
import io.kafbat.ui.model.InternalTopicConfig;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
private final TopicsService topicsService;
private final TopicAnalysisService topicAnalysisService;
private final ClusterMapper clusterMapper;
private final ClustersProperties clustersProperties;

@Override
public Mono<ResponseEntity<TopicDTO>> createTopic(
Expand Down Expand Up @@ -181,23 +183,30 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
.operationName("getTopics")
.build();

return topicsService.getTopicsForPagination(getCluster(clusterName))
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal)
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
.flatMap(topics -> {
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
List<InternalTopic> filtered = topics.stream()
? comparatorForTopic : comparatorForTopic.reversed();

List<InternalTopic> filtered = fts.isEnabled() ? topics : topics.stream()
.filter(topic -> !topic.isInternal()
|| showInternal != null && showInternal)
.filter(topic -> search == null || CI.contains(topic.getName(), search))
.filter(
topic -> search == null || CI.contains(topic.getName(), search)
)
.sorted(comparator)
.toList();

var totalPages = (filtered.size() / pageSize)
+ (filtered.size() % pageSize == 0 ? 0 : 1);

List<String> topicsPage = filtered.stream()
.filter(t -> !t.isInternal() || showInternal != null && showInternal)
.skip(topicsToSkip)
.limit(pageSize)
.map(InternalTopic::getName)
Expand Down Expand Up @@ -348,9 +357,12 @@ public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates
}

private Comparator<InternalTopic> getComparatorForTopic(
TopicColumnsToSortDTO orderBy) {
TopicColumnsToSortDTO orderBy,
boolean ftsEnabled) {
var defaultComparator = Comparator.comparing(InternalTopic::getName);
if (orderBy == null) {
if (orderBy == null && ftsEnabled) {
return (o1, o2) -> 0;
} else if (orderBy == null) {
return defaultComparator;
}
return switch (orderBy) {
Expand Down
6 changes: 4 additions & 2 deletions api/src/main/java/io/kafbat/ui/model/InternalTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ public static InternalTopic from(TopicDescription topicDescription,
topic.segmentSize(stats.getSegmentSize());
});

topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
if (metrics != null) {
topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
}

topic.topicConfigs(
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
Expand Down
9 changes: 8 additions & 1 deletion api/src/main/java/io/kafbat/ui/model/Statistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@Value
@Builder(toBuilder = true)
public class Statistics {
public class Statistics implements AutoCloseable {
ServerStatusDTO status;
Throwable lastKafkaException;
String version;
Expand Down Expand Up @@ -46,4 +46,11 @@ public Stream<TopicDescription> topicDescriptions() {
public Statistics withClusterState(UnaryOperator<ScrapedClusterState> stateUpdate) {
return toBuilder().clusterState(stateUpdate.apply(clusterState)).build();
}

@Override
public void close() throws Exception {
if (clusterState != null) {
clusterState.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

import com.google.common.collect.Streams;
import com.google.common.collect.Table;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.emitter.EnhancedConsumer;
import io.kafbat.ui.model.ConsumerGroupOrderingDTO;
import io.kafbat.ui.model.InternalConsumerGroup;
import io.kafbat.ui.model.InternalTopicConsumerGroup;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.SortOrderDTO;
import io.kafbat.ui.service.index.ConsumerGroupFilter;
import io.kafbat.ui.service.rbac.AccessControlService;
import io.kafbat.ui.util.ApplicationMetrics;
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
Expand Down Expand Up @@ -41,6 +43,7 @@ public class ConsumerGroupService {

private final AdminClientService adminClientService;
private final AccessControlService accessControlService;
private final ClustersProperties clustersProperties;

private Mono<List<InternalConsumerGroup>> getConsumerGroups(
ReactiveAdminClient ac,
Expand Down Expand Up @@ -114,11 +117,7 @@ public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
SortOrderDTO sortOrderDto) {
return adminClientService.get(cluster).flatMap(ac ->
ac.listConsumerGroups()
.map(listing -> search == null
? listing
: listing.stream()
.filter(g -> CI.contains(g.groupId(), search))
.toList()
.map(listing -> filterGroups(listing, search)
)
.flatMapIterable(lst -> lst)
.filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName()))
Expand All @@ -131,6 +130,19 @@ public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
(allGroups.size() / perPage) + (allGroups.size() % perPage == 0 ? 0 : 1))))));
}

private Collection<ConsumerGroupListing> filterGroups(Collection<ConsumerGroupListing> groups, String search) {
if (search == null || search.isBlank()) {
return groups;
}
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
if (fts.isEnabled()) {
ConsumerGroupFilter filter = new ConsumerGroupFilter(groups, fts.getFilterMinNGram(), fts.getFilterMaxNGram());
return filter.find(search);
} else {
return groups.stream().filter(g -> CI.contains(g.groupId(), search)).toList();
}
}

private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdminClient ac,
List<ConsumerGroupListing> groups,
int pageNum,
Expand Down
23 changes: 18 additions & 5 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.kafbat.ui.model.NewConnectorDTO;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import io.kafbat.ui.service.index.KafkaConnectNgramFilter;
import io.kafbat.ui.util.ReactiveFailover;
import jakarta.validation.Valid;
import java.util.List;
Expand Down Expand Up @@ -151,15 +152,27 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
.topics(tuple.getT4().getTopics())
.build())))
.map(kafkaConnectMapper::fullConnectorInfo)
.filter(matchesSearchTerm(search));
.collectList()
.map(lst -> filterConnectors(lst, search))
.flatMapMany(Flux::fromIterable);
}

private Predicate<FullConnectorInfoDTO> matchesSearchTerm(@Nullable final String search) {
private List<FullConnectorInfoDTO> filterConnectors(List<FullConnectorInfoDTO> connectors, String search) {
if (search == null) {
return c -> true;
return connectors;
}
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
if (fts.isEnabled()) {
KafkaConnectNgramFilter filter =
new KafkaConnectNgramFilter(connectors, fts.getFilterMinNGram(), fts.getFilterMaxNGram());
return filter.find(search);
} else {
return connectors.stream()
.filter(connector -> getStringsForSearch(connector)
.anyMatch(string -> CI.contains(string, search)))
.toList();
}
return connector -> getStringsForSearch(connector)
.anyMatch(string -> CI.contains(string, search));

}

private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
Expand Down
18 changes: 16 additions & 2 deletions api/src/main/java/io/kafbat/ui/service/StatisticsCache.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package io.kafbat.ui.service;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.InternalPartitionsOffsets;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.ServerStatusDTO;
import io.kafbat.ui.model.Statistics;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class StatisticsCache {

Expand All @@ -28,12 +32,22 @@ public synchronized void replace(KafkaCluster c, Statistics stats) {
public synchronized void update(KafkaCluster c,
Map<String, TopicDescription> descriptions,
Map<String, List<ConfigEntry>> configs,
InternalPartitionsOffsets partitionsOffsets) {
InternalPartitionsOffsets partitionsOffsets,
ClustersProperties clustersProperties) {
var stats = get(c);
replace(
c,
stats.withClusterState(s -> s.updateTopics(descriptions, configs, partitionsOffsets))
stats.withClusterState(s ->
s.updateTopics(descriptions, configs, partitionsOffsets, clustersProperties)
)
);
try {
if (!stats.getStatus().equals(ServerStatusDTO.INITIALIZING)) {
stats.close();
}
} catch (Exception e) {
log.error("Error closing cluster {} stats", c.getName(), e);
}
}

public synchronized void onTopicDelete(KafkaCluster c, String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.ClusterFeature;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.Metrics;
Expand All @@ -22,6 +23,7 @@ public class StatisticsService {
private final AdminClientService adminClientService;
private final FeatureService featureService;
private final StatisticsCache cache;
private final ClustersProperties clustersProperties;

public Mono<Statistics> updateCache(KafkaCluster c) {
return getStatistics(c).doOnSuccess(m -> cache.replace(c, m));
Expand Down Expand Up @@ -62,7 +64,7 @@ private Statistics createStats(ClusterDescription description,

private Mono<ScrapedClusterState> loadClusterState(ClusterDescription clusterDescription,
ReactiveAdminClient ac) {
return ScrapedClusterState.scrape(clusterDescription, ac);
return ScrapedClusterState.scrape(clusterDescription, ac, clustersProperties);
}

private Mono<Metrics> scrapeMetrics(KafkaCluster cluster,
Expand Down
Loading
Loading