Skip to content

Commit

Permalink
Upgrade kafka to 3.9.0
Browse files Browse the repository at this point in the history
This fixes issues with kafka tests failing due to issues in the
zookeeper client code with Java 14+. Upgrading kafka allows
us to deploy an embedded cluster with KRaft instead of ZK
which allows us to remove use of the ZK client codepath in
testing
  • Loading branch information
ZacBlanco committed Jan 7, 2025
1 parent fbee5d3 commit 12df941
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 83 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
<dep.gcs.version>1.9.17</dep.gcs.version>
<dep.alluxio.version>313</dep.alluxio.version>
<dep.slf4j.version>1.7.32</dep.slf4j.version>
<dep.kafka.version>2.3.1</dep.kafka.version>
<dep.kafka.version>3.9.0</dep.kafka.version>
<dep.pinot.version>0.11.0</dep.pinot.version>
<dep.druid.version>30.0.1</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>
Expand Down Expand Up @@ -1318,7 +1318,7 @@
<dependency>
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
<version>2.1.2</version>
<version>4.0.2</version>
</dependency>

<dependency>
Expand Down
94 changes: 85 additions & 9 deletions presto-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,27 @@

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>runtime</scope>
<!-- This is the version used by kafka tranitively -->
<version>3.8.4</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>


<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down Expand Up @@ -144,11 +156,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.101tec</groupId>
Expand All @@ -174,6 +181,58 @@
<scope>test</scope>
</dependency>

<!-- Dependencies required for EmbeddedKafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.9.0</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.9.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-server-common</artifactId>
<version>3.9.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
<!-- Dependencies required for EmbeddedKafka -->

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down Expand Up @@ -241,13 +300,30 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-testng</artifactId>
<version>3.0.0-M7</version>
</dependency>
</dependencies>
<configuration>
<!-- integration tests take a very long time so only run them in the CI server -->
<excludes>
<exclude>**/TestKafkaDistributed.java</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredResourcePatterns combine.children="append">
<ignoredResourcePattern>kafka/kafka-version.properties</ignoredResourcePattern>
<ignoredResourcePattern>log4j.properties</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ public void spinUp()
@AfterMethod(alwaysRun = true)
public void tearDown()
{
queryRunner.close();
queryRunner = null;
if (queryRunner != null) {
queryRunner.close();
queryRunner = null;
}
}

private void createMessages(String topicName, int count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,28 @@
*/
package com.facebook.presto.kafka.util;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.LongSerializer;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.facebook.presto.kafka.util.TestUtils.findUnusedPort;
import static com.facebook.presto.kafka.util.TestUtils.toProperties;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
Expand All @@ -45,61 +43,47 @@
public class EmbeddedKafka
implements Closeable
{
private final EmbeddedZookeeper zookeeper;
private final int port;
private final File kafkaDataDir;
private final KafkaServerStartable kafka;
private final KafkaClusterTestKit kafka;

private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean stopped = new AtomicBoolean();

public static EmbeddedKafka createEmbeddedKafka()
throws IOException
{
return new EmbeddedKafka(new EmbeddedZookeeper(), new Properties());
return new EmbeddedKafka();
}

public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
EmbeddedKafka()
throws IOException
{
return new EmbeddedKafka(new EmbeddedZookeeper(), overrideProperties);
}

EmbeddedKafka(EmbeddedZookeeper zookeeper, Properties overrideProperties)
throws IOException
{
this.zookeeper = requireNonNull(zookeeper, "zookeeper is null");
requireNonNull(overrideProperties, "overrideProperties is null");

this.port = findUnusedPort();
this.kafkaDataDir = Files.createTempDir();

Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("broker.id", "0")
.put("host.name", "localhost")
.put("num.partitions", "2")
.put("log.flush.interval.messages", "10000")
.put("log.flush.interval.ms", "1000")
.put("log.retention.minutes", "60")
.put("auto.create.topics.enable", "false")
.put("zookeeper.connection.timeout.ms", "1000000")
.put("port", Integer.toString(port))
.put("log.dirs", kafkaDataDir.getAbsolutePath())
.put("zookeeper.connect", zookeeper.getConnectString())
.put("offsets.topic.replication.factor", "1")
.putAll(Maps.fromProperties(overrideProperties))
TestKitNodes nodes = new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
.setCombined(true)
.build();

KafkaConfig config = new KafkaConfig(toProperties(properties));
this.kafka = new KafkaServerStartable(config);
try {
this.kafka = new KafkaClusterTestKit.Builder(nodes)
.setConfigProp("log.dirs", kafkaDataDir.getAbsolutePath())
.build();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

public void start()
throws InterruptedException, IOException
throws Exception
{
if (!started.getAndSet(true)) {
zookeeper.start();
kafka.format();
kafka.startup();
kafka.waitForActiveController();
kafka.waitForReadyBrokers();
}
}

Expand All @@ -108,9 +92,12 @@ public void close()
throws IOException
{
if (started.get() && !stopped.getAndSet(true)) {
kafka.shutdown();
kafka.awaitShutdown();
zookeeper.close();
try {
kafka.close();
}
catch (Exception e) {
throw new RuntimeException(e);
}
deleteRecursively(kafkaDataDir.toPath(), ALLOW_INSECURE);
}
}
Expand All @@ -124,14 +111,15 @@ public void createTopics(int partitions, int replication, Properties topicProper
{
checkState(started.get() && !stopped.get(), "not started!");

ZkUtils zkUtils = ZkUtils.apply(getZookeeperConnectString(), 30_000, 30_000, false);
try {
for (String topic : topics) {
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicProperties, RackAwareMode.Disabled$.MODULE$);
}
}
finally {
zkUtils.close();
try (AdminClient client = AdminClient.create(kafka.clientProperties())) {
client.createTopics(Arrays.stream(topics)
.map(topic -> new NewTopic(topic, partitions, (short) replication)
.configs(topicProperties.stringPropertyNames()
.stream()
.collect(toImmutableMap(
identity(),
prop -> (String) topicProperties.get(prop)))))
.collect(Collectors.toList()));
}
}

Expand All @@ -146,23 +134,8 @@ public KafkaProducer<Long, Object> createProducer()
return new KafkaProducer<>(properties);
}

public int getZookeeperPort()
{
return zookeeper.getPort();
}

public int getPort()
{
return port;
}

public String getConnectString()
{
return "localhost:" + Integer.toString(port);
}

public String getZookeeperConnectString()
{
return zookeeper.getConnectString();
return kafka.bootstrapServers();
}
}

0 comments on commit 12df941

Please sign in to comment.