Skip to content

Commit f314ee4

Browse files
garyrussellartembilan
authored andcommitted
GH-2720: Add Predicate to KafkaAdmin
Resolves #2720 **cherry-pick to 2.9.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java
1 parent bb95a84 commit f314ee4

File tree

3 files changed

+68
-5
lines changed

3 files changed

+68
-5
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ private KafkaAdmin admin;
152152
----
153153
====
154154

155+
Starting with versions 2.9.10, 3.0.9, you can provide a `Predicate<NewTopic>` which can be used to determine whether a particular `NewTopic` bean should be considered for creation or modification.
156+
This is useful, for example, if you have multiple `KafkaAdmin` instances pointing to different clusters and you wish to select those topics that should be created or modified by each admin.
157+
158+
====
159+
[source, java]
160+
----
161+
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));
162+
----
163+
====
164+
155165
[[sending-messages]]
156166
==== Sending Messages
157167

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.TimeoutException;
3434
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.function.Predicate;
3536
import java.util.stream.Collectors;
3637

3738
import org.apache.commons.logging.LogFactory;
@@ -61,6 +62,7 @@
6162
import org.springframework.core.log.LogAccessor;
6263
import org.springframework.kafka.KafkaException;
6364
import org.springframework.kafka.support.TopicForRetryable;
65+
import org.springframework.util.Assert;
6466

6567
/**
6668
* An admin that delegates to an {@link AdminClient} to create topics defined
@@ -87,6 +89,8 @@ public class KafkaAdmin extends KafkaResourceFactory
8789

8890
private ApplicationContext applicationContext;
8991

92+
private Predicate<NewTopic> createOrModifyTopic = nt -> true;
93+
9094
private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT;
9195

9296
private int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
@@ -157,6 +161,31 @@ public void setModifyTopicConfigs(boolean modifyTopicConfigs) {
157161
this.modifyTopicConfigs = modifyTopicConfigs;
158162
}
159163

164+
/**
165+
* Set a predicate that returns true if a discovered {@link NewTopic} bean should be
166+
* considered for creation or modification by this admin instance. The default
167+
* predicate returns true for all {@link NewTopic}s. Used by the default
168+
* implementation of {@link #newTopics()}.
169+
* @param createOrModifyTopic the predicate.
170+
* @since 2.9.10
171+
* @see #newTopics()
172+
*/
173+
public void setCreateOrModifyTopic(Predicate<NewTopic> createOrModifyTopic) {
174+
Assert.notNull(createOrModifyTopic, "'createOrModifyTopic' cannot be null");
175+
this.createOrModifyTopic = createOrModifyTopic;
176+
}
177+
178+
/**
179+
* Return the predicate used to determine whether a {@link NewTopic} should be
180+
* considered for creation or modification.
181+
* @return the predicate.
182+
* @since 2.9.10
183+
* @see #newTopics()
184+
*/
185+
protected Predicate<NewTopic> getCreateOrModifyTopic() {
186+
return this.createOrModifyTopic;
187+
}
188+
160189
@Override
161190
public Map<String, Object> getConfigurationProperties() {
162191
Map<String, Object> configs2 = new HashMap<>(this.configs);
@@ -219,10 +248,17 @@ public final boolean initialize() {
219248
return false;
220249
}
221250

222-
/*
223-
* Remove any TopicForRetryable bean if there is also a NewTopic with the same topic name.
251+
/**
252+
* Return a collection of {@link NewTopic}s to create or modify. The default
253+
* implementation retrieves all {@link NewTopic} beans in the application context and
254+
* applies the {@link #setCreateOrModifyTopic(Predicate)} predicate to each one. It
255+
* also removes any {@link TopicForRetryable} bean if there is also a NewTopic with
256+
* the same topic name. This is performed before calling the predicate.
257+
* @return the collection of {@link NewTopic}s.
258+
* @since 2.9.10
259+
* @see #setCreateOrModifyTopic(Predicate)
224260
*/
225-
private Collection<NewTopic> newTopics() {
261+
protected Collection<NewTopic> newTopics() {
226262
Map<String, NewTopic> newTopicsMap = new HashMap<>(
227263
this.applicationContext.getBeansOfType(NewTopic.class, false, false));
228264
Map<String, NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false);
@@ -250,6 +286,13 @@ private Collection<NewTopic> newTopics() {
250286
newTopicsMap.remove(entry.getKey());
251287
}
252288
}
289+
Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator();
290+
while (iterator.hasNext()) {
291+
Entry<String, NewTopic> next = iterator.next();
292+
if (!this.createOrModifyTopic.test(next.getValue())) {
293+
iterator.remove();
294+
}
295+
}
253296
return new ArrayList<>(newTopicsMap.values());
254297
}
255298

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -85,6 +85,9 @@ public class KafkaAdminTests {
8585
@Autowired
8686
private NewTopic mismatchconfig;
8787

88+
@Autowired
89+
private NewTopic dontCreateThisOne;
90+
8891
@Test
8992
public void testTopicConfigs() {
9093
assertThat(topic1.configs()).containsEntry(
@@ -97,6 +100,7 @@ public void testTopicConfigs() {
97100
.replicas(3)
98101
.build().replicationFactor()).isEqualTo((short) 3);
99102
assertThat(topic3.replicasAssignments()).hasSize(3);
103+
assertThat(admin.newTopics()).doesNotContain(this.dontCreateThisOne);
100104
}
101105

102106
@Test
@@ -269,6 +273,7 @@ public KafkaAdmin admin() {
269273
KafkaAdmin admin = new KafkaAdmin(configs);
270274
admin.setBootstrapServersSupplier(() ->
271275
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
276+
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreate"));
272277
return admin;
273278
}
274279

@@ -338,6 +343,11 @@ public NewTopics topics456() {
338343
.build());
339344
}
340345

346+
@Bean
347+
NewTopic dontCreateThisOne() {
348+
return TopicBuilder.name("dontCreate").build();
349+
}
350+
341351
}
342352

343353
}

0 commit comments

Comments
 (0)