From e7a8f71c0c36387b0435f73efaa9657e470cdbf2 Mon Sep 17 00:00:00 2001 From: Mark Zitnik Date: Tue, 31 Jan 2023 07:48:41 +0200 Subject: [PATCH] Support msk (#46) * refactor escapeTopicName to a Utils class * change confluent version to 6.1.0 in integration tests * adding gson to package * remove unused code --- build.gradle.kts | 4 +- ...lickHouseSinkConnectorIntegrationTest.java | 99 +++++++++---------- .../db/helper/ClickHouseHelperClientTest.java | 4 +- .../containers/ConfluentPlatform.java | 6 +- .../connect/sink/db/ClickHouseWriter.java | 16 +-- .../kafka/connect/sink/db/mapping/Table.java | 4 +- .../clickhouse/kafka/connect/util/Utils.java | 8 ++ 7 files changed, 71 insertions(+), 70 deletions(-) create mode 100644 src/main/java/com/clickhouse/kafka/connect/util/Utils.java diff --git a/build.gradle.kts b/build.gradle.kts index 9efe7b42..c1211b21 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -38,7 +38,7 @@ plugins { } group = "com.clickhouse.kafka" -version = "0.0.4" +version = "0.0.5" description = "The official ClickHouse Apache Kafka Connect Connector." repositories { @@ -84,7 +84,7 @@ dependencies { clickhouseDependencies("io.lettuce:lettuce-core:6.2.0.RELEASE") clickhouseDependencies("com.clickhouse:clickhouse-client:${project.extra["clickHouseDriverVersion"]}") clickhouseDependencies("com.clickhouse:clickhouse-http-client:${project.extra["clickHouseDriverVersion"]}") - + clickhouseDependencies("com.google.code.gson:gson:2.10") // Unit Tests testImplementation(platform("org.junit:junit-bom:${project.extra["junitJupiterVersion"]}")) diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConnectorIntegrationTest.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConnectorIntegrationTest.java index bc7d7eb0..8ba7ac0c 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConnectorIntegrationTest.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConnectorIntegrationTest.java @@ -7,6 +7,7 @@ import org.junit.Ignore; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.testcontainers.containers.ClickHouseContainer; import org.testcontainers.containers.ConfluentPlatform; @@ -178,123 +179,121 @@ private void sleep(long l) { } } - @Test - @Description("stockGenSingleTaskSchemalessTest") - public void stockGenSingleTaskSchemalessTest() throws IOException { + @Order(1) + @Description("stockGenSingleTask") + public void stockGenSingleTaskTest() throws IOException { dropStateTable(); // Create KeeperMap table //createStateTable(); - String topicName = "stock_gen_topic_single_schemaless_task"; - String flatTableName = String.format("%s_flat", topicName); + String topicName = "stock_gen_topic_single_task"; int parCount = 1; String payloadDataGen = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/stock_gen.json"))); confluentPlatform.createTopic(topicName, 1); - confluentPlatform.createConnect(String.format(payloadDataGen, "DatagenConnectorConnector_Single_Schemaless", "DatagenConnectorConnector_Single_Schemaless", parCount, topicName)); - sleep(5 * 1000); - String ksqlCreateStreamPayload = String.format("{\"ksql\": \"CREATE STREAM tmp_%s (side STRING, symbol STRING, userid STRING) WITH (KAFKA_TOPIC='%s', VALUE_FORMAT = 'AVRO');\"}", topicName, topicName); - System.out.println(ksqlCreateStreamPayload); - confluentPlatform.runKsql(ksqlCreateStreamPayload); - sleep(5 * 1000); - String ksqlCreateStreamJSONPayload = String.format("{\"ksql\": \"CREATE STREAM %s_flat WITH (KAFKA_TOPIC='%s_flat', VALUE_FORMAT = 'JSON') AS SELECT side, symbol FROM tmp_%s EMIT CHANGES;\"}", topicName, topicName, topicName); - System.out.println(ksqlCreateStreamJSONPayload); - confluentPlatform.runKsql(ksqlCreateStreamJSONPayload); - sleep(5 * 1000); - + confluentPlatform.createConnect(String.format(payloadDataGen, "DatagenConnectorConnector_Single", "DatagenConnectorConnector_Single", parCount, topicName)); // Now let's create the correct table & configure Sink to insert data to ClickHouse - dropFlatTable(topicName); - createFlatTable(topicName); - String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink_json.json"))); + dropTable(topicName); + createTable(topicName); + String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink.json"))); sleep(5 * 1000); - confluentPlatform.createConnect(String.format(payloadClickHouseSink, "ClickHouseSinkConnectorConnector_Single_Schemaless", "ClickHouseSinkConnectorConnector_Single_Schemaless", parCount, flatTableName, hostname, port, password)); + confluentPlatform.createConnect(String.format(payloadClickHouseSink, "ClickHouseSinkConnectorConnector_Single", "ClickHouseSinkConnectorConnector_Single", parCount, topicName, hostname, port, password)); long count = 0; while (count < 10000) { - sleep(2*1000); + sleep(5*1000); long endOffset = confluentPlatform.getOffset(topicName, 0 ); if (endOffset % 100 == 0) System.out.println(endOffset); - if (endOffset >= 10000 / 4) { + if (endOffset == 10000) { break; } count+=1; } // TODO : see the progress of the offset currently it is 1 min - sleep(2 * 1000); + sleep(30 * 1000); - count = countRows(flatTableName); + count = countRows(topicName); System.out.println(count); - while (count < 10000 / 10) { - long tmpCount = countRows(flatTableName); + while (count < 10000) { + long tmpCount = countRows(topicName); System.out.println(tmpCount); sleep(2 * 1000); if (tmpCount > count) count = tmpCount; } - assertTrue(countRows(flatTableName) >= 1000); - //assertEquals(10000, countRows(flatTableName)); + assertEquals(10000, countRows(topicName)); + } - @Ignore + @Test - @Description("stockGenSingleTask") - public void stockGenSingleTaskTest() throws IOException { + @Order(2) + @Description("stockGenSingleTaskSchemalessTest") + public void stockGenSingleTaskSchemalessTest() throws IOException { dropStateTable(); // Create KeeperMap table //createStateTable(); - String topicName = "stock_gen_topic_single_task"; + String topicName = "stock_gen_topic_single_schemaless_task"; + String flatTableName = String.format("%s_flat", topicName); int parCount = 1; String payloadDataGen = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/stock_gen.json"))); confluentPlatform.createTopic(topicName, 1); - confluentPlatform.createConnect(String.format(payloadDataGen, "DatagenConnectorConnector_Single", "DatagenConnectorConnector_Single", parCount, topicName)); + confluentPlatform.createConnect(String.format(payloadDataGen, "DatagenConnectorConnector_Single_Schemaless", "DatagenConnectorConnector_Single_Schemaless", parCount, topicName)); + sleep(5 * 1000); + String ksqlCreateStreamPayload = String.format("{\"ksql\": \"CREATE STREAM tmp_%s (side STRING, symbol STRING, userid STRING) WITH (KAFKA_TOPIC='%s', VALUE_FORMAT = 'AVRO');\"}", topicName, topicName); + System.out.println(ksqlCreateStreamPayload); + confluentPlatform.runKsql(ksqlCreateStreamPayload); + sleep(5 * 1000); + String ksqlCreateStreamJSONPayload = String.format("{\"ksql\": \"CREATE STREAM %s_flat WITH (KAFKA_TOPIC='%s_flat', VALUE_FORMAT = 'JSON') AS SELECT side, symbol FROM tmp_%s EMIT CHANGES;\"}", topicName, topicName, topicName); + System.out.println(ksqlCreateStreamJSONPayload); + confluentPlatform.runKsql(ksqlCreateStreamJSONPayload); + sleep(5 * 1000); + // Now let's create the correct table & configure Sink to insert data to ClickHouse - dropTable(topicName); - createTable(topicName); - String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink.json"))); + dropFlatTable(topicName); + createFlatTable(topicName); + String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink_json.json"))); sleep(5 * 1000); - confluentPlatform.createConnect(String.format(payloadClickHouseSink, "ClickHouseSinkConnectorConnector_Single", "ClickHouseSinkConnectorConnector_Single", parCount, topicName, hostname, port, password)); + confluentPlatform.createConnect(String.format(payloadClickHouseSink, "ClickHouseSinkConnectorConnector_Single_Schemaless", "ClickHouseSinkConnectorConnector_Single_Schemaless", parCount, flatTableName, hostname, port, password)); long count = 0; while (count < 10000) { - sleep(5*1000); + sleep(2*1000); long endOffset = confluentPlatform.getOffset(topicName, 0 ); if (endOffset % 100 == 0) System.out.println(endOffset); - if (endOffset == 10000) { + if (endOffset >= 10000 / 4) { break; } count+=1; } // TODO : see the progress of the offset currently it is 1 min - sleep(30 * 1000); + sleep(2 * 1000); - count = countRows(topicName); + count = countRows(flatTableName); System.out.println(count); - while (count < 10000) { - long tmpCount = countRows(topicName); + while (count < 10000 / 10) { + long tmpCount = countRows(flatTableName); System.out.println(tmpCount); sleep(2 * 1000); if (tmpCount > count) count = tmpCount; } - assertEquals(10000, countRows(topicName)); - + assertTrue(countRows(flatTableName) >= 1000); + //assertEquals(10000, countRows(flatTableName)); } - - @Test - @Ignore @Description("stockMultiTask") public void stockGenMultiTaskTest() throws IOException { dropStateTable(); @@ -331,8 +330,8 @@ public void stockGenMultiTaskTest() throws IOException { } - @Test - @Ignore +// @Test +// @Ignore @Description("stockMultiTaskTopic") public void stockGenMultiTaskTopicTest() throws IOException { dropStateTable(); diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClientTest.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClientTest.java index c85d9c2d..403ea9e8 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClientTest.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClientTest.java @@ -3,6 +3,7 @@ import com.clickhouse.client.*; import com.clickhouse.kafka.connect.sink.db.mapping.Table; import com.clickhouse.kafka.connect.sink.db.mapping.Type; +import com.clickhouse.kafka.connect.util.Utils; import jdk.jfr.Description; import org.junit.jupiter.api.*; import org.testcontainers.containers.ClickHouseContainer; @@ -43,7 +44,6 @@ public void CreateTableTest() { assertEquals(1, tables.size()); assertEquals(name, tables.get(0)); } - @Test @Order(2) @Description("DescTableTest") @@ -52,7 +52,7 @@ public void DescTableTest() { String createTable = String.format("CREATE TABLE %s ( `off` Int8 , `str` String , `double` DOUBLE, `arr` Array(Int8), `bool` BOOLEAN) Engine = MergeTree ORDER BY off", name); chc.query(createTable); Table table = chc.describeTable(name); - assertEquals(name, table.getName()); + assertEquals(Utils.escapeTopicName(name), table.getName()); assertEquals(Type.INT8, table.getColumnByName("off").getType()); assertEquals(Type.STRING, table.getColumnByName("str").getType()); assertEquals(Type.FLOAT64, table.getColumnByName("double").getType()); diff --git a/src/integrationTest/java/org/testcontainers/containers/ConfluentPlatform.java b/src/integrationTest/java/org/testcontainers/containers/ConfluentPlatform.java index 6ceb605e..c6fe375e 100644 --- a/src/integrationTest/java/org/testcontainers/containers/ConfluentPlatform.java +++ b/src/integrationTest/java/org/testcontainers/containers/ConfluentPlatform.java @@ -19,7 +19,7 @@ public class ConfluentPlatform { - private static final String CONFLUENT_VERSION = "7.2.1"; + private static final String CONFLUENT_VERSION = "6.1.0"; private static final DockerImageName KAFKA_REST_IMAGE = DockerImageName.parse( "confluentinc/cp-kafka-rest:" + CONFLUENT_VERSION ); @@ -35,9 +35,9 @@ public class ConfluentPlatform { private static final DockerImageName CP_SCHEMA_REGISTRY = DockerImageName.parse( "confluentinc/cp-schema-registry:" + CONFLUENT_VERSION ); - +// 0.4.0-6.0.1 private static final DockerImageName CP_DATA_GEN = DockerImageName.parse( - "cnfldemos/cp-server-connect-datagen:0.5.3-" + CONFLUENT_VERSION + "cnfldemos/cp-server-connect-datagen:0.4.0-" + CONFLUENT_VERSION ); private static final DockerImageName CP_CONTROL_CENTER = DockerImageName.parse( diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 4e7bd174..c68c6c6e 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -14,6 +14,7 @@ import com.clickhouse.kafka.connect.sink.db.mapping.Type; import com.clickhouse.kafka.connect.util.Mask; +import com.clickhouse.kafka.connect.util.Utils; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.apache.kafka.connect.data.Field; @@ -29,7 +30,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; - public class ClickHouseWriter implements DBWriter{ private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseWriter.class); @@ -95,9 +95,6 @@ public void setBinary(boolean binary) { isBinary = binary; } - private String escapeTopicName(String topic) { - return String.format("`%s`", topic); - } // TODO: we need to refactor that private String convertHelper(Object v) { if (v instanceof List) { @@ -266,7 +263,7 @@ public void doInsertRawBinary(List records) { Record first = records.get(0); String topic = first.getTopic(); LOGGER.info(String.format("Number of records to insert %d to table name %s", batchSize, topic)); - Table table = this.mapping.get(escapeTopicName(topic)); + Table table = this.mapping.get(Utils.escapeTopicName(topic)); if (table == null) { //TODO to pick the correct exception here throw new RuntimeException(String.format("Table %s does not exists", topic)); @@ -340,7 +337,7 @@ public void doInsertJson(List records) { Record first = records.get(0); String topic = first.getTopic(); LOGGER.info(String.format("Number of records to insert %d to table name %s", batchSize, topic)); - Table table = this.mapping.get(escapeTopicName(topic)); + Table table = this.mapping.get(Utils.escapeTopicName(topic)); if (table == null) { //TODO to pick the correct exception here throw new RuntimeException(String.format("Table %s does not exists", topic)); @@ -349,11 +346,6 @@ public void doInsertJson(List records) { if ( !validateDataSchema(table, first, true) ) throw new RuntimeException(); - - - - - try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) { ClickHouseRequest.Mutation request = client.connect(chc.getServer()) .write() @@ -426,7 +418,7 @@ public void doInsertSimple(List records) { LOGGER.info(String.format("Number of records to insert %d to table name %s", batchSize, topic)); // Build the insert SQL StringBuffer sb = new StringBuffer(); - sb.append(String.format("INSERT INTO %s ", escapeTopicName(topic))); + sb.append(String.format("INSERT INTO %s ", Utils.escapeTopicName(topic))); sb.append(extractFields(first.getFields(), "(", ")", ",", "")); sb.append(" VALUES "); LOGGER.debug("sb {}", sb); diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java index 971a25c9..7aa6824e 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java @@ -1,5 +1,7 @@ package com.clickhouse.kafka.connect.sink.db.mapping; +import com.clickhouse.kafka.connect.util.Utils; + import java.util.ArrayList; import java.util.List; import java.util.HashMap; @@ -17,7 +19,7 @@ public Table(String name) { } public String getName() { - return String.format("`%s`", name); + return Utils.escapeTopicName(name); } public void addColumn(Column column) { diff --git a/src/main/java/com/clickhouse/kafka/connect/util/Utils.java b/src/main/java/com/clickhouse/kafka/connect/util/Utils.java new file mode 100644 index 00000000..0dc3a251 --- /dev/null +++ b/src/main/java/com/clickhouse/kafka/connect/util/Utils.java @@ -0,0 +1,8 @@ +package com.clickhouse.kafka.connect.util; + +public class Utils { + + public static String escapeTopicName(String topic) { + return String.format("`%s`", topic); + } +}