Skip to content

Commit 37d35fe

Browse files
Elliot Kennedyartembilan
Elliot Kennedy
authored andcommitted
GH-539: Wait for partitions from embedded topics
Fixes #539 * Add `KafkaStreamsBranchTests` * Move `AddressableEmbeddedBrokerTests` to the `rule` package **Cherry-pick to 2.0.x and 1.3.x** # Conflicts: # spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java # spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java
1 parent ad4e4cf commit 37d35fe

File tree

3 files changed

+181
-23
lines changed

3 files changed

+181
-23
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2017 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,6 +77,7 @@
7777
* @author Marius Bogoevici
7878
* @author Artem Bilan
7979
* @author Gary Russell
80+
* @author Elliot Kennedy
8081
*/
8182
public class KafkaEmbedded extends ExternalResource implements KafkaRule, InitializingBean, DisposableBean {
8283

@@ -458,23 +459,7 @@ public boolean isEmbedded() {
458459
* @throws Exception an exception.
459460
*/
460461
public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) throws Exception {
461-
final CountDownLatch consumerLatch = new CountDownLatch(1);
462-
consumer.subscribe(Arrays.asList(this.topics), new ConsumerRebalanceListener() {
463-
464-
@Override
465-
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
466-
}
467-
468-
@Override
469-
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
470-
consumerLatch.countDown();
471-
}
472-
473-
});
474-
consumer.poll(0); // force assignment
475-
assertThat(consumerLatch.await(30, TimeUnit.SECONDS))
476-
.as("Failed to be assigned partitions from the embedded topics")
477-
.isTrue();
462+
consumeFromEmbeddedTopics(consumer, this.topics);
478463
}
479464

480465
/**
@@ -507,13 +492,17 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
507492
@Override
508493
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
509494
consumerLatch.countDown();
495+
if (logger.isDebugEnabled()) {
496+
logger.debug("partitions assigned: " + partitions);
497+
}
510498
}
511499

512500
});
513501
consumer.poll(0); // force assignment
514502
assertThat(consumerLatch.await(30, TimeUnit.SECONDS))
515-
.as("Failed to be assigned partitions from the embedded topics")
516-
.isTrue();
503+
.as("Failed to be assigned partitions from the embedded topics")
504+
.isTrue();
505+
logger.debug("Subscription Initiated");
517506
}
518507

519508
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.test.hamcrest;
17+
package org.springframework.kafka.test.rule;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

@@ -29,11 +29,11 @@
2929
import org.springframework.beans.factory.annotation.Autowired;
3030
import org.springframework.context.annotation.Bean;
3131
import org.springframework.context.annotation.Configuration;
32-
import org.springframework.kafka.test.rule.KafkaEmbedded;
3332
import org.springframework.test.context.junit4.SpringRunner;
3433

3534
/**
3635
* @author Gary Russell
36+
* @author Elliot Kennedy
3737
* @since 1.3
3838
*
3939
*/
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.kstream;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.UUID;
26+
27+
import org.apache.kafka.clients.consumer.Consumer;
28+
import org.apache.kafka.clients.consumer.ConsumerConfig;
29+
import org.apache.kafka.clients.consumer.ConsumerRecords;
30+
import org.apache.kafka.common.serialization.Serdes;
31+
import org.apache.kafka.common.serialization.StringDeserializer;
32+
import org.apache.kafka.streams.Consumed;
33+
import org.apache.kafka.streams.StreamsBuilder;
34+
import org.apache.kafka.streams.StreamsConfig;
35+
import org.apache.kafka.streams.kstream.KStream;
36+
import org.apache.kafka.streams.kstream.Produced;
37+
import org.junit.Test;
38+
import org.junit.runner.RunWith;
39+
40+
import org.springframework.beans.factory.annotation.Autowired;
41+
import org.springframework.beans.factory.annotation.Value;
42+
import org.springframework.context.annotation.Bean;
43+
import org.springframework.context.annotation.Configuration;
44+
import org.springframework.kafka.annotation.EnableKafkaStreams;
45+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
46+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
47+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
48+
import org.springframework.kafka.core.KafkaTemplate;
49+
import org.springframework.kafka.core.ProducerFactory;
50+
import org.springframework.kafka.test.context.EmbeddedKafka;
51+
import org.springframework.kafka.test.rule.KafkaEmbedded;
52+
import org.springframework.kafka.test.utils.KafkaTestUtils;
53+
import org.springframework.test.annotation.DirtiesContext;
54+
import org.springframework.test.context.junit4.SpringRunner;
55+
56+
/**
57+
* @author Elliot Kennedy
58+
* @author Artem Bilan
59+
* @since 1.3.3
60+
*/
61+
@RunWith(SpringRunner.class)
62+
@DirtiesContext
63+
@EmbeddedKafka(partitions = 1,
64+
topics = {
65+
KafkaStreamsBranchTests.TRUE_TOPIC,
66+
KafkaStreamsBranchTests.FALSE_TOPIC,
67+
KafkaStreamsBranchTests.TRUE_FALSE_INPUT_TOPIC })
68+
public class KafkaStreamsBranchTests {
69+
70+
public static final String TRUE_TOPIC = "true-output-topic";
71+
72+
public static final String FALSE_TOPIC = "false-output-topic";
73+
74+
public static final String TRUE_FALSE_INPUT_TOPIC = "input-topic";
75+
76+
@Autowired
77+
private KafkaTemplate<String, String> kafkaTemplate;
78+
79+
@Autowired
80+
private KafkaEmbedded kafkaEmbedded;
81+
82+
@Test
83+
public void testBranchingStream() throws Exception {
84+
Consumer<String, String> falseConsumer = createConsumer();
85+
this.kafkaEmbedded.consumeFromAnEmbeddedTopic(falseConsumer, FALSE_TOPIC);
86+
87+
Consumer<String, String> trueConsumer = createConsumer();
88+
this.kafkaEmbedded.consumeFromAnEmbeddedTopic(trueConsumer, TRUE_TOPIC);
89+
90+
this.kafkaTemplate.sendDefault(String.valueOf(true));
91+
this.kafkaTemplate.sendDefault(String.valueOf(true));
92+
this.kafkaTemplate.sendDefault(String.valueOf(false));
93+
94+
ConsumerRecords<String, String> trueRecords = KafkaTestUtils.getRecords(trueConsumer);
95+
ConsumerRecords<String, String> falseRecords = KafkaTestUtils.getRecords(falseConsumer);
96+
97+
List<String> trueValues = new ArrayList<>();
98+
trueRecords.forEach(trueRecord -> trueValues.add(trueRecord.value()));
99+
100+
List<String> falseValues = new ArrayList<>();
101+
falseRecords.forEach(falseRecord -> falseValues.add(falseRecord.value()));
102+
103+
assertThat(trueValues).containsExactly("true", "true");
104+
assertThat(falseValues).containsExactly("false");
105+
}
106+
107+
private Consumer<String, String> createConsumer() {
108+
Map<String, Object> consumerProps =
109+
KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", this.kafkaEmbedded);
110+
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
111+
112+
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
113+
new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer());
114+
return kafkaConsumerFactory.createConsumer();
115+
}
116+
117+
@Configuration
118+
@EnableKafkaStreams
119+
public static class Config {
120+
121+
@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
122+
private String brokerAddresses;
123+
124+
@Bean
125+
public ProducerFactory<Integer, String> producerFactory() {
126+
return new DefaultKafkaProducerFactory<>(producerConfigs());
127+
}
128+
129+
@Bean
130+
public Map<String, Object> producerConfigs() {
131+
return KafkaTestUtils.senderProps(this.brokerAddresses);
132+
}
133+
134+
@Bean
135+
public KafkaTemplate<?, ?> kafkaTemplate() {
136+
KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
137+
kafkaTemplate.setDefaultTopic(TRUE_FALSE_INPUT_TOPIC);
138+
return kafkaTemplate;
139+
}
140+
141+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
142+
public StreamsConfig kStreamsConfigs() {
143+
Map<String, Object> props = new HashMap<>();
144+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
145+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
146+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
147+
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
148+
return new StreamsConfig(props);
149+
}
150+
151+
@Bean
152+
@SuppressWarnings("unchecked")
153+
public KStream<String, String> trueFalseStream(StreamsBuilder streamsBuilder) {
154+
KStream<String, String> trueFalseStream = streamsBuilder
155+
.stream(TRUE_FALSE_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
156+
157+
KStream<String, String>[] branches =
158+
trueFalseStream.branch((key, value) -> String.valueOf(true).equals(value),
159+
(key, value) -> String.valueOf(false).equals(value));
160+
161+
branches[0].to(TRUE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
162+
branches[1].to(FALSE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
163+
164+
return trueFalseStream;
165+
}
166+
167+
}
168+
169+
}

0 commit comments

Comments
 (0)