Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Upgrade kafka to 3.9.0
Browse files Browse the repository at this point in the history
This fixes issues with kafka tests failing due to issues in the
zookeeper client code with Java 14+. Upgrading kafka allows
us to deploy an embedded cluster with KRaft instead of ZK
which allows us to remove use of the ZK client codepath in
testing
ZacBlanco committed Jan 9, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 8435a73 commit 1adafde
Showing 5 changed files with 142 additions and 83 deletions.
8 changes: 6 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@
<dep.gcs.version>1.9.17</dep.gcs.version>
<dep.alluxio.version>313</dep.alluxio.version>
<dep.slf4j.version>1.7.32</dep.slf4j.version>
<dep.kafka.version>2.3.1</dep.kafka.version>
<dep.kafka.version>3.9.0</dep.kafka.version>
<dep.pinot.version>0.11.0</dep.pinot.version>
<dep.druid.version>30.0.1</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>
@@ -1327,7 +1327,7 @@
<dependency>
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
<version>2.1.2</version>
<version>4.0.2</version>
</dependency>

<dependency>
@@ -2060,6 +2060,10 @@
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
94 changes: 85 additions & 9 deletions presto-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -60,15 +60,27 @@

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>runtime</scope>
<!-- This is the version used by kafka tranitively -->
<version>3.8.4</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>


<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
@@ -144,11 +156,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.101tec</groupId>
@@ -174,6 +181,58 @@
<scope>test</scope>
</dependency>

<!-- Dependencies required for EmbeddedKafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.9.0</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.9.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-server-common</artifactId>
<version>3.9.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
<!-- Dependencies required for EmbeddedKafka -->

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
@@ -241,13 +300,30 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-testng</artifactId>
<version>3.0.0-M7</version>
</dependency>
</dependencies>
<configuration>
<!-- integration tests take a very long time so only run them in the CI server -->
<excludes>
<exclude>**/TestKafkaDistributed.java</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredResourcePatterns combine.children="append">
<ignoredResourcePattern>kafka/kafka-version.properties</ignoredResourcePattern>
<ignoredResourcePattern>log4j.properties</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
</plugins>
</build>

Original file line number Diff line number Diff line change
@@ -90,8 +90,10 @@ public void spinUp()
@AfterMethod(alwaysRun = true)
public void tearDown()
{
queryRunner.close();
queryRunner = null;
if (queryRunner != null) {
queryRunner.close();
queryRunner = null;
}
}

private void createMessages(String topicName, int count)
Original file line number Diff line number Diff line change
@@ -13,30 +13,28 @@
*/
package com.facebook.presto.kafka.util;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.LongSerializer;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.facebook.presto.kafka.util.TestUtils.findUnusedPort;
import static com.facebook.presto.kafka.util.TestUtils.toProperties;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -45,61 +43,47 @@
public class EmbeddedKafka
implements Closeable
{
private final EmbeddedZookeeper zookeeper;
private final int port;
private final File kafkaDataDir;
private final KafkaServerStartable kafka;
private final KafkaClusterTestKit kafka;

private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean stopped = new AtomicBoolean();

public static EmbeddedKafka createEmbeddedKafka()
throws IOException
{
return new EmbeddedKafka(new EmbeddedZookeeper(), new Properties());
return new EmbeddedKafka();
}

public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
EmbeddedKafka()
throws IOException
{
return new EmbeddedKafka(new EmbeddedZookeeper(), overrideProperties);
}

EmbeddedKafka(EmbeddedZookeeper zookeeper, Properties overrideProperties)
throws IOException
{
this.zookeeper = requireNonNull(zookeeper, "zookeeper is null");
requireNonNull(overrideProperties, "overrideProperties is null");

this.port = findUnusedPort();
this.kafkaDataDir = Files.createTempDir();

Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("broker.id", "0")
.put("host.name", "localhost")
.put("num.partitions", "2")
.put("log.flush.interval.messages", "10000")
.put("log.flush.interval.ms", "1000")
.put("log.retention.minutes", "60")
.put("auto.create.topics.enable", "false")
.put("zookeeper.connection.timeout.ms", "1000000")
.put("port", Integer.toString(port))
.put("log.dirs", kafkaDataDir.getAbsolutePath())
.put("zookeeper.connect", zookeeper.getConnectString())
.put("offsets.topic.replication.factor", "1")
.putAll(Maps.fromProperties(overrideProperties))
TestKitNodes nodes = new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
.setCombined(true)
.build();

KafkaConfig config = new KafkaConfig(toProperties(properties));
this.kafka = new KafkaServerStartable(config);
try {
this.kafka = new KafkaClusterTestKit.Builder(nodes)
.setConfigProp("log.dirs", kafkaDataDir.getAbsolutePath())
.build();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

public void start()
throws InterruptedException, IOException
throws Exception
{
if (!started.getAndSet(true)) {
zookeeper.start();
kafka.format();
kafka.startup();
kafka.waitForActiveController();
kafka.waitForReadyBrokers();
}
}

@@ -108,9 +92,12 @@ public void close()
throws IOException
{
if (started.get() && !stopped.getAndSet(true)) {
kafka.shutdown();
kafka.awaitShutdown();
zookeeper.close();
try {
kafka.close();
}
catch (Exception e) {
throw new RuntimeException(e);
}
deleteRecursively(kafkaDataDir.toPath(), ALLOW_INSECURE);
}
}
@@ -124,14 +111,15 @@ public void createTopics(int partitions, int replication, Properties topicProper
{
checkState(started.get() && !stopped.get(), "not started!");

ZkUtils zkUtils = ZkUtils.apply(getZookeeperConnectString(), 30_000, 30_000, false);
try {
for (String topic : topics) {
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicProperties, RackAwareMode.Disabled$.MODULE$);
}
}
finally {
zkUtils.close();
try (AdminClient client = AdminClient.create(kafka.clientProperties())) {
client.createTopics(Arrays.stream(topics)
.map(topic -> new NewTopic(topic, partitions, (short) replication)
.configs(topicProperties.stringPropertyNames()
.stream()
.collect(toImmutableMap(
identity(),
prop -> (String) topicProperties.get(prop)))))
.collect(Collectors.toList()));
}
}

@@ -146,23 +134,8 @@ public KafkaProducer<Long, Object> createProducer()
return new KafkaProducer<>(properties);
}

public int getZookeeperPort()
{
return zookeeper.getPort();
}

public int getPort()
{
return port;
}

public String getConnectString()
{
return "localhost:" + Integer.toString(port);
}

public String getZookeeperConnectString()
{
return zookeeper.getConnectString();
return kafka.bootstrapServers();
}
}
4 changes: 4 additions & 0 deletions presto-product-tests/pom.xml
Original file line number Diff line number Diff line change
@@ -179,6 +179,10 @@
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

0 comments on commit 1adafde

Please sign in to comment.