Skip to content

Commit

Permalink
upgrade kafka to 3.9.0 and remove zookeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacBlanco committed Feb 19, 2025
1 parent 7b02b6c commit fb7aa8f
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 54 deletions.
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ ext.versions = [
cassandra : '3.4.0',
commons_cli : '1.3.1',
thrift : '0.9.3',
kafka : '0.11.0.2',
zkclient : '0.10'
kafka : '3.9.0',
]

ext.libraries = [
Expand Down Expand Up @@ -105,7 +104,6 @@ ext.libraries = [
thrift : "org.apache.thrift:libthrift:${versions.thrift}",
kafka_clients : "org.apache.kafka:kafka-clients:${versions.kafka}",
kafka : "org.apache.kafka:kafka_2.12:${versions.kafka}",
zkclient : "com.101tec:zkclient:${versions.zkclient}"
]

ext.tempto_core = project(':tempto-core')
Expand Down
20 changes: 16 additions & 4 deletions tempto-examples/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,23 @@ services:

kafka:
hostname: kafka
image: spotify/kafka
image: apache/kafka
ports:
- 9092:9092
- 2181:2181
command: bash -c "sed -i 's/#delete.topic.enable=true/delete.topic.enable=true/' /opt/kafka_2.11-0.10.1.0/config/server.properties; exec supervisord -n"
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
KAFKA_DELETE_TOPIC_ENABLE: true


ssh:
Expand Down
3 changes: 0 additions & 3 deletions tempto-examples/src/main/resources/tempto-configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ databases:
broker:
host: kafka
port: 9092
zookeeper:
host: kafka
port: 2181
presto_database_name: presto
presto_kafka_catalog: kafka
table_manager_type: kafka
Expand Down
1 change: 0 additions & 1 deletion tempto-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ apply plugin: 'java'
dependencies {
implementation tempto_core
implementation libraries.kafka
implementation libraries.zkclient
implementation libraries.guava
implementation libraries.guice
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.prestodb.tempto.fulfillment.table.kafka;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
Expand All @@ -24,12 +25,8 @@
import io.prestodb.tempto.internal.fulfillment.table.TableName;
import io.prestodb.tempto.query.QueryExecutor;
import io.prestodb.tempto.query.QueryResult;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -38,10 +35,13 @@
import javax.inject.Named;
import javax.inject.Singleton;

import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand All @@ -55,25 +55,19 @@ public class KafkaTableManager
private final String brokerHost;
private final Integer brokerPort;
private final String prestoKafkaCatalog;
private final String zookeeperHost;
private final Integer zookeeperPort;

@Inject
public KafkaTableManager(
@Named("databaseName") String databaseName,
@Named("broker.host") String brokerHost,
@Named("broker.port") int brokerPort,
@Named("zookeeper.host") String zookeeperHost,
@Named("zookeeper.port") int zookeeperPort,
@Named("presto_database_name") String prestoDatabaseName,
@Named("presto_kafka_catalog") String prestoKafkaCatalog,
Injector injector)
{
this.databaseName = requireNonNull(databaseName, "databaseName is null");
this.brokerHost = requireNonNull(brokerHost, "brokerHost is null");
this.brokerPort = brokerPort;
this.zookeeperHost = requireNonNull(zookeeperHost, "zookeeperHost is null");
this.zookeeperPort = zookeeperPort;
requireNonNull(injector, "injector is null");
requireNonNull(prestoDatabaseName, "prestoDatabaseName is null");
this.prestoQueryExecutor = injector.getInstance(Key.get(QueryExecutor.class, Names.named(prestoDatabaseName)));
Expand Down Expand Up @@ -106,32 +100,51 @@ private void verifyTableExistsInPresto(String schema, String name)

private void deleteTopic(String topic)
{
withZookeeper(zkUtils -> {
if (AdminUtils.topicExists(zkUtils, topic)) {
AdminUtils.deleteTopic(zkUtils, topic);

for (int checkTry = 0; checkTry < 5; ++checkTry) {
if (!AdminUtils.topicExists(zkUtils, topic)) {
return;
}
try {
Thread.sleep(1_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("could not delete topic " + topic);
}

withAdminClient(adminClient -> {
Supplier<Boolean> topicExists = () -> {
try {
return adminClient.listTopics()
.names()
.get()
.stream()
.anyMatch(topic::equals);
}
catch (Exception e) {
throw new RuntimeException(e);
}
};

if (topicExists.get()) {
adminClient.deleteTopics(Collections.singletonList(topic));
}
for (int checkTry = 0; checkTry < 5; ++checkTry) {
if (!topicExists.get()) {
return;
}
try {
Thread.sleep(1_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("could not delete topic " + topic);
}
throw new RuntimeException("could not delete topic " + topic);
}
});
}

private void createTopic(String topic, int partitionsCount, int replicationLevel)
{
withZookeeper(zkUtils -> {
withAdminClient(adminClient -> {
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topic, partitionsCount, replicationLevel, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
adminClient.createTopics(Collections.singletonList(
new NewTopic(topic, partitionsCount, (short) replicationLevel)
.configs(topicConfiguration.stringPropertyNames()
.stream().collect(
toImmutableMap(
key -> key,
topicConfiguration::getProperty)
))));
});
}

Expand Down Expand Up @@ -164,22 +177,21 @@ private void insertDataIntoTopic(String topic, KafkaDataSource dataSource)
}
}

private void withZookeeper(Consumer<ZkUtils> routine)
private void withAdminClient(Consumer<AdminClient> routine)
{
int sessionTimeOutInMs = 15_000;
int connectionTimeOutInMs = 10_000;
String zookeeperHosts = zookeeperHost + ":" + zookeeperPort;

ZkClient zkClient = new ZkClient(zookeeperHosts,
sessionTimeOutInMs,
connectionTimeOutInMs,
ZKStringSerializer$.MODULE$);
try {
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
routine.accept(zkUtils);
}
finally {
zkClient.close();

String bootstrapHosts = brokerHost + ":" + brokerPort;
Properties clientConfig = new Properties();
clientConfig.putAll(ImmutableMap.of(
"bootstrap.servers", bootstrapHosts,
"request.timeout.ms", String.valueOf(sessionTimeOutInMs),
"connections.max.idle.ms", String.valueOf(connectionTimeOutInMs)
));

try (AdminClient adminClient = AdminClient.create(clientConfig)) {
routine.accept(adminClient);
}
}

Expand Down

0 comments on commit fb7aa8f

Please sign in to comment.