Skip to content

Commit 3706405

Browse files
garyrussellartembilan
authored andcommitted
GH-634: 1.1.0 Client compatibility
* GH-634: 1.1.0 Client compatibility Fixes #634 * Revert version * Fix version check * Add docs * Polishing - PR Comments
1 parent e77e1ef commit 3706405

File tree

4 files changed

+111
-6
lines changed

4 files changed

+111
-6
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.io.File;
22+
import java.lang.reflect.InvocationTargetException;
23+
import java.lang.reflect.Method;
2124
import java.util.ArrayList;
2225
import java.util.Arrays;
2326
import java.util.Collection;
@@ -43,6 +46,7 @@
4346
import org.apache.kafka.clients.consumer.ConsumerRecords;
4447
import org.apache.kafka.common.TopicPartition;
4548
import org.apache.kafka.common.security.auth.SecurityProtocol;
49+
import org.apache.kafka.common.utils.AppInfoParser;
4650
import org.apache.kafka.common.utils.Time;
4751
import org.junit.rules.ExternalResource;
4852

@@ -85,6 +89,29 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule, Initia
8589

8690
public static final long METADATA_PROPAGATION_TIMEOUT = 10000L;
8791

92+
private static final String clientVersion;
93+
94+
private static final Method testUtilsCreateBrokerConfigMethod;
95+
96+
static {
97+
clientVersion = AppInfoParser.getVersion();
98+
if (clientVersion.startsWith("1.1.")) {
99+
try {
100+
testUtilsCreateBrokerConfigMethod = TestUtils.class.getDeclaredMethod("createBrokerConfig",
101+
int.class, String.class, boolean.class, boolean.class, int.class,
102+
scala.Option.class, scala.Option.class, scala.Option.class,
103+
boolean.class, boolean.class, int.class, boolean.class, int.class, boolean.class,
104+
int.class, scala.Option.class, int.class, boolean.class);
105+
}
106+
catch (NoSuchMethodException | SecurityException e) {
107+
throw new RuntimeException("Failed to determine TestUtils.createBrokerConfig() method");
108+
}
109+
}
110+
else {
111+
testUtilsCreateBrokerConfigMethod = null;
112+
}
113+
}
114+
88115
private final int count;
89116

90117
private final boolean controlledShutdown;
@@ -191,12 +218,7 @@ public void before() throws Exception { //NOSONAR
191218
ZKStringSerializer$.MODULE$);
192219
this.kafkaServers.clear();
193220
for (int i = 0; i < this.count; i++) {
194-
Properties brokerConfigProperties = TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown,
195-
true, this.kafkaPorts[i],
196-
scala.Option.apply(null),
197-
scala.Option.apply(null),
198-
scala.Option.apply(null),
199-
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1);
221+
Properties brokerConfigProperties = createBrokerProperties(i);
200222
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
201223
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
202224
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
@@ -222,6 +244,31 @@ public void before() throws Exception { //NOSONAR
222244
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
223245
}
224246

247+
public Properties createBrokerProperties(int i) {
248+
if (testUtilsCreateBrokerConfigMethod == null) {
249+
return TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown,
250+
true, this.kafkaPorts[i],
251+
scala.Option.apply(null),
252+
scala.Option.apply(null),
253+
scala.Option.apply(null),
254+
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1);
255+
}
256+
else {
257+
try {
258+
return (Properties) testUtilsCreateBrokerConfigMethod.invoke(null, i, this.zkConnect,
259+
this.controlledShutdown,
260+
true, this.kafkaPorts[i],
261+
scala.Option.<SecurityProtocol>apply(null),
262+
scala.Option.<File>apply(null),
263+
scala.Option.<Properties>apply(null),
264+
true, false, 0, false, 0, false, 0, scala.Option.<String>apply(null), 1, false);
265+
}
266+
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
267+
throw new RuntimeException(e);
268+
}
269+
}
270+
}
271+
225272

226273
@Override
227274
public void destroy() throws Exception {

src/reference/asciidoc/appendix.adoc

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,56 @@
1+
[[deps-for-11x]]
2+
== Override Dependencies to use the 1.1.x kafka-clients
3+
4+
When using `spring-kafka-test` (_version 2.1.x_, starting with _version 2.1.5_) with the 1.1.x `kafka-clients` jar, you will need to override certain transitive dependencies as follows:
5+
6+
[source, xml]
7+
----
8+
<dependency>
9+
<groupId>org.springframework.kafka</groupId>
10+
<artifactId>spring-kafka</artifactId>
11+
<version>${spring.kafka.version}</version>
12+
</dependency>
13+
14+
<dependency>
15+
<groupId>org.springframework.kafka</groupId>
16+
<artifactId>spring-kafka-test</artifactId>
17+
<version>${spring.kafka.version}</version>
18+
<scope>test</scope>
19+
<exclusions>
20+
<exclusion>
21+
<groupId>org.apache.kafka</groupId>
22+
<artifactId>kafka-clients</artifactId>
23+
</exclusion>
24+
</exclusions>
25+
</dependency>
26+
27+
<dependency>
28+
<groupId>org.apache.kafka</groupId>
29+
<artifactId>kafka-clients</artifactId>
30+
<version>1.1.0</version>
31+
</dependency>
32+
33+
<dependency>
34+
<groupId>org.apache.kafka</groupId>
35+
<artifactId>kafka-clients</artifactId>
36+
<version>1.1.0</version>
37+
<classifier>test</classifier>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.apache.kafka</groupId>
42+
<artifactId>kafka_2.11</artifactId>
43+
<version>1.1.0</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.kafka</groupId>
48+
<artifactId>kafka_2.11</artifactId>
49+
<version>1.1.0</version>
50+
<classifier>test</classifier>
51+
</dependency>
52+
----
53+
154
[[history]]
255
== Change History
356

src/reference/asciidoc/testing.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
The `spring-kafka-test` jar contains some useful utilities to assist with testing your applications.
77

8+
NOTE: See <<deps-for-11x>> if you wish to use the 1.1.x `kafka-clients` jar with _version 2.1.x_.
9+
810
==== JUnit
911

1012
`o.s.kafka.test.utils.KafkaTestUtils` provides some static methods to set up producer and consumer properties:

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
This version requires the 1.0.0 `kafka-clients` or higher.
66

7+
NOTE: The 1.1.x client is supported, with _version 2.1.5_, but you will need to override dependencies as described in <<deps-for-11x>>.
8+
The 1.1.x client will be supported natively in _version 2.2_.
9+
710
==== JSON Improvements
811

912
The `StringJsonMessageConverter` and `JsonSerializer` now add type information in `Headers`, allowing the converter and `JsonDeserializer` to create specific types on reception, based on the message itself rather than a fixed configured type.

0 commit comments

Comments
 (0)