From 5c2af87e4eacd7dee6c146e30c188be739774d06 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Wed, 20 Nov 2024 13:36:46 +0200 Subject: [PATCH] Removed RedisStateProvider + optimze imports for project --- CHANGELOG.md | 1 + build.gradle.kts | 7 ++- .../connect/sink/ClickHouseCloudTest.java | 7 ++- ...lickHouseSinkConnectorIntegrationTest.java | 16 +++++-- .../kafka/connect/sink/ExactlyOnceTest.java | 16 +++++-- .../connect/sink/helper/ClickHouseAPI.java | 14 ++++-- .../sink/helper/ClickHouseTestHelpers.java | 6 ++- .../sink/helper/ConfluentPlatform.java | 8 ++-- .../connect/sink/helper/SchemaTestData.java | 14 +++++- .../sink/helper/SchemalessTestData.java | 7 ++- .../connect/ClickHouseSinkConnector.java | 2 - .../connect/sink/ClickHouseSinkConfig.java | 6 --- .../connect/sink/ClickHouseSinkTask.java | 21 +-------- .../kafka/connect/sink/ProxySinkTask.java | 4 +- .../kafka/connect/sink/data/Record.java | 4 +- .../connect/sink/data/StructToJsonMap.java | 9 +++- .../data/convert/EmptyRecordConvertor.java | 4 -- .../sink/data/convert/RecordConvertor.java | 4 +- .../data/convert/SchemaRecordConvertor.java | 2 - .../convert/SchemalessRecordConvertor.java | 2 - .../connect/sink/db/ClickHouseWriter.java | 43 ++++++++++------- .../kafka/connect/sink/db/DBWriter.java | 1 - .../connect/sink/db/InMemoryDBWriter.java | 1 - .../sink/db/TableMappingRefresher.java | 1 + .../db/helper/ClickHouseHelperClient.java | 7 ++- .../kafka/connect/sink/db/mapping/Column.java | 7 ++- .../kafka/connect/sink/db/mapping/Table.java | 5 +- .../connect/sink/processing/Processing.java | 3 -- .../sink/state/provider/InMemoryState.java | 1 - .../state/provider/KeeperStateProvider.java | 8 +++- .../state/provider/RedisStateProvider.java | 47 ------------------- .../connect/transforms/ExtractTopic.java | 10 ++-- .../transforms/ExtractTopicConfig.java | 6 +-- .../kafka/connect/transforms/KeyToValue.java | 2 +- .../clickhouse/kafka/connect/util/Utils.java | 4 -- .../connect/util/jmx/MBeanServerUtils.java | 8 ++-- .../kafka/connect/sink/ClickHouseBase.java | 3 -- .../ClickHouseSinkJdbcPropertiesTest.java | 21 +++------ .../sink/ClickHouseSinkTaskMappingTest.java | 5 -- ...ClickHouseSinkTaskSchemalessProxyTest.java | 9 ++-- .../ClickHouseSinkTaskSchemalessTest.java | 9 ++-- .../sink/ClickHouseSinkTaskStringTest.java | 9 ---- .../connect/sink/ClickHouseSinkTaskTest.java | 4 -- ...ClickHouseSinkTaskWithSchemaProxyTest.java | 8 ++-- .../ClickHouseSinkTaskWithSchemaTest.java | 6 ++- .../connect/sink/db/ClickHouseWriterTest.java | 12 ++--- .../db/helper/ClickHouseHelperClientTest.java | 2 - .../kafka/connect/sink/dlq/InMemoryDLQ.java | 1 - .../sink/helper/ClickHouseTestHelpers.java | 14 ++++-- .../connect/sink/helper/SchemaTestData.java | 21 +++++++-- .../sink/helper/SchemalessTestData.java | 7 ++- .../connect/sink/kafa/RangeContainerTest.java | 4 +- .../sink/processing/ProcessingTest.java | 11 ++--- .../sink/provider/LocalProviderTest.java | 4 +- .../kafka/connect/sink/util/MaskTest.java | 8 ---- src/test/java/transforms/KeyToValueTest.java | 1 - 56 files changed, 217 insertions(+), 250 deletions(-) delete mode 100644 src/main/java/com/clickhouse/kafka/connect/sink/state/provider/RedisStateProvider.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 13c67330..bcf112e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ # 1.2.5 +* Remove redis state provide since we are using KeeperMap for state storage * Remove unused avro property from `build.gradle.kts` * Trim schemaless data to only pass the fields that are in the table * Allow bypassing the schema validation diff --git a/build.gradle.kts b/build.gradle.kts index 3c0798d6..153bf242 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -7,9 +7,9 @@ */ import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar +import org.gradle.api.tasks.testing.logging.TestExceptionFormat import java.time.LocalDateTime import java.time.format.DateTimeFormatter -import org.gradle.api.tasks.testing.logging.TestExceptionFormat val defaultJdkVersion = 17 java { @@ -72,7 +72,7 @@ dependencies { implementation("com.clickhouse:clickhouse-http-client:${project.extra["clickHouseDriverVersion"]}") implementation("com.clickhouse:clickhouse-data:${project.extra["clickHouseDriverVersion"]}") implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}") - implementation("io.lettuce:lettuce-core:6.5.0.RELEASE") + implementation("io.projectreactor:reactor-core:3.7.0") implementation("com.google.code.gson:gson:2.11.0") // https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5 implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1") @@ -148,6 +148,9 @@ tasks.create("integrationTest", Test::class.java) { systemProperties = System.getProperties() as Map } +tasks.withType { + options.compilerArgs.addAll(listOf("-Xlint:unchecked", "-Xlint:deprecation")) +} tasks.withType { maxParallelForks = (Runtime.getRuntime().availableProcessors() / 2).takeIf { it > 0 } ?: 1 diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseCloudTest.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseCloudTest.java index cab9c1a6..b8cd51c5 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseCloudTest.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseCloudTest.java @@ -9,7 +9,12 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertTrue; diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConnectorIntegrationTest.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConnectorIntegrationTest.java index 30bd0cae..45ff105c 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConnectorIntegrationTest.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConnectorIntegrationTest.java @@ -1,14 +1,17 @@ package com.clickhouse.kafka.connect.sink; -import com.clickhouse.client.*; +import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.config.ClickHouseProxyType; import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; +import com.clickhouse.kafka.connect.sink.helper.ConfluentPlatform; import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; -import org.junit.jupiter.api.*; -import com.clickhouse.kafka.connect.sink.helper.ConfluentPlatform; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.clickhouse.ClickHouseContainer; @@ -19,10 +22,13 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.*; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.*; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ClickHouseSinkConnectorIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkConnectorIntegrationTest.class); diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ExactlyOnceTest.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ExactlyOnceTest.java index 48f97524..13a1e015 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ExactlyOnceTest.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/ExactlyOnceTest.java @@ -3,22 +3,28 @@ import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.api.query.Records; import com.clickhouse.client.config.ClickHouseProxyType; -import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI; import com.clickhouse.kafka.connect.sink.helper.ConfluentPlatform; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Network; -import java.io.*; +import java.io.File; +import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; -import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.*; +import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.createReplicatedMergeTreeTable; +import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.dropTable; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseAPI.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseAPI.java index de805d1d..eea694a1 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseAPI.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseAPI.java @@ -1,9 +1,11 @@ package com.clickhouse.kafka.connect.sink.helper; -import com.clickhouse.client.*; -import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseNodeSelector; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.api.query.Records; -import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,7 +18,11 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; -import java.util.*; +import java.util.Arrays; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java index 8f196e01..d3b5012f 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java @@ -1,6 +1,10 @@ package com.clickhouse.kafka.connect.sink.helper; -import com.clickhouse.client.*; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseNodeSelector; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import org.slf4j.Logger; diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ConfluentPlatform.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ConfluentPlatform.java index e6aa5f39..61d734fe 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ConfluentPlatform.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/ConfluentPlatform.java @@ -1,8 +1,6 @@ package com.clickhouse.kafka.connect.sink.helper; - - import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -21,9 +19,11 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.regex.Pattern; -import java.util.stream.Stream; public class ConfluentPlatform { diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java index cbc1e02e..3199d08d 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java @@ -1,15 +1,25 @@ package com.clickhouse.kafka.connect.sink.helper; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.connect.data.*; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.sink.SinkRecord; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Date; -import java.util.*; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.stream.LongStream; public class SchemaTestData { diff --git a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java index f2e6c23f..1859ff7e 100644 --- a/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java +++ b/src/integrationTest/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java @@ -4,7 +4,12 @@ import org.apache.kafka.connect.sink.SinkRecord; import java.math.BigDecimal; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.stream.LongStream; public class SchemalessTestData { diff --git a/src/main/java/com/clickhouse/kafka/connect/ClickHouseSinkConnector.java b/src/main/java/com/clickhouse/kafka/connect/ClickHouseSinkConnector.java index f35332c1..359aa71a 100644 --- a/src/main/java/com/clickhouse/kafka/connect/ClickHouseSinkConnector.java +++ b/src/main/java/com/clickhouse/kafka/connect/ClickHouseSinkConnector.java @@ -7,13 +7,11 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; - import org.apache.kafka.connect.sink.SinkConnectorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java index 094fa929..552be1c0 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java @@ -62,12 +62,6 @@ public class ClickHouseSinkConfig { public static final Integer tableRefreshIntervalDefault = 0; public static final Boolean exactlyOnceDefault = Boolean.FALSE; public static final Boolean customInsertFormatDefault = Boolean.FALSE; - public enum StateStores { - NONE, - IN_MEMORY, - REDIS, - KEEPER_MAP - } private final String hostname; private final int port; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java index a4ad45dd..e1cf603e 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java @@ -1,35 +1,18 @@ package com.clickhouse.kafka.connect.sink; -import com.clickhouse.client.*; -import com.clickhouse.kafka.connect.ClickHouseSinkConnector; -import com.clickhouse.kafka.connect.sink.data.Record; -import com.clickhouse.kafka.connect.sink.db.ClickHouseWriter; -import com.clickhouse.kafka.connect.sink.db.DBWriter; -import com.clickhouse.kafka.connect.sink.db.InMemoryDBWriter; import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter; -import com.clickhouse.kafka.connect.sink.kafka.RangeContainer; -import com.clickhouse.kafka.connect.sink.processing.Processing; -import com.clickhouse.kafka.connect.sink.state.State; -import com.clickhouse.kafka.connect.sink.state.StateProvider; -import com.clickhouse.kafka.connect.sink.state.StateRecord; -import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState; -import com.clickhouse.kafka.connect.sink.state.provider.RedisStateProvider; import com.clickhouse.kafka.connect.util.Utils; -import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.stream.Collectors; +import java.util.Collection; +import java.util.Map; public class ClickHouseSinkTask extends SinkTask { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java index 478f12c5..fb94e764 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java @@ -9,9 +9,9 @@ import com.clickhouse.kafka.connect.sink.state.StateProvider; import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState; import com.clickhouse.kafka.connect.sink.state.provider.KeeperStateProvider; +import com.clickhouse.kafka.connect.util.jmx.ExecutionTimer; import com.clickhouse.kafka.connect.util.jmx.MBeanServerUtils; import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics; -import com.clickhouse.kafka.connect.util.jmx.ExecutionTimer; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,10 +20,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Timer; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.Timer; public class ProxySinkTask { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/Record.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/Record.java index 83bc7e68..231a7652 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/data/Record.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/Record.java @@ -2,8 +2,8 @@ import com.clickhouse.kafka.connect.sink.data.convert.EmptyRecordConvertor; import com.clickhouse.kafka.connect.sink.data.convert.RecordConvertor; -import com.clickhouse.kafka.connect.sink.data.convert.SchemalessRecordConvertor; import com.clickhouse.kafka.connect.sink.data.convert.SchemaRecordConvertor; +import com.clickhouse.kafka.connect.sink.data.convert.SchemalessRecordConvertor; import com.clickhouse.kafka.connect.sink.data.convert.StringRecordConvertor; import com.clickhouse.kafka.connect.sink.kafka.OffsetContainer; import lombok.Getter; @@ -12,8 +12,6 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/StructToJsonMap.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/StructToJsonMap.java index 9dc51df0..02876fc5 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/data/StructToJsonMap.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/StructToJsonMap.java @@ -1,6 +1,12 @@ package com.clickhouse.kafka.connect.sink.data; -import org.apache.kafka.connect.data.*; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,7 +15,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class StructToJsonMap { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/EmptyRecordConvertor.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/EmptyRecordConvertor.java index 5093066f..4958e9ac 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/EmptyRecordConvertor.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/EmptyRecordConvertor.java @@ -1,17 +1,13 @@ package com.clickhouse.kafka.connect.sink.data.convert; -import com.clickhouse.kafka.connect.sink.data.Data; import com.clickhouse.kafka.connect.sink.data.Record; import com.clickhouse.kafka.connect.sink.data.SchemaType; import com.clickhouse.kafka.connect.sink.kafka.OffsetContainer; import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class EmptyRecordConvertor extends RecordConvertor { @Override diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/RecordConvertor.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/RecordConvertor.java index 54a12804..5db9da4b 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/RecordConvertor.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/RecordConvertor.java @@ -1,10 +1,10 @@ package com.clickhouse.kafka.connect.sink.data.convert; -import java.util.regex.Pattern; -import org.apache.kafka.connect.data.Schema; import com.clickhouse.kafka.connect.sink.data.Record; import org.apache.kafka.connect.sink.SinkRecord; +import java.util.regex.Pattern; + public abstract class RecordConvertor { public Record convert(SinkRecord sinkRecord, boolean splitDBTopic, String dbTopicSeparatorChar, String configurationDatabase) { String database = configurationDatabase; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemaRecordConvertor.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemaRecordConvertor.java index ddfc1d89..106d8960 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemaRecordConvertor.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemaRecordConvertor.java @@ -7,8 +7,6 @@ import com.clickhouse.kafka.connect.sink.kafka.OffsetContainer; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Map; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemalessRecordConvertor.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemalessRecordConvertor.java index ce6163cd..17017c88 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemalessRecordConvertor.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemalessRecordConvertor.java @@ -7,8 +7,6 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 67578f04..aa42ad6f 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -1,6 +1,13 @@ package com.clickhouse.kafka.connect.sink.db; -import com.clickhouse.client.*; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseConfig; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseNodeSelector; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.client.api.Client; import com.clickhouse.client.api.insert.InsertResponse; import com.clickhouse.client.api.insert.InsertSettings; @@ -18,40 +25,40 @@ import com.clickhouse.kafka.connect.sink.db.mapping.Table; import com.clickhouse.kafka.connect.sink.db.mapping.Type; import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter; - import com.clickhouse.kafka.connect.util.QueryIdentifier; import com.clickhouse.kafka.connect.util.Utils; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.math.BigDecimal; -import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoField; -import java.util.*; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.DataException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.collect.Streams; -import reactor.util.function.Tuples; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class ClickHouseWriter implements DBWriter { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/DBWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/DBWriter.java index 832c6bb6..78be693d 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/DBWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/DBWriter.java @@ -7,7 +7,6 @@ import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; public interface DBWriter { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/InMemoryDBWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/InMemoryDBWriter.java index 80972a4c..4c30dec7 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/InMemoryDBWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/InMemoryDBWriter.java @@ -5,7 +5,6 @@ import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter; import com.clickhouse.kafka.connect.util.QueryIdentifier; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/TableMappingRefresher.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/TableMappingRefresher.java index 501714b4..6ba49ab4 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/TableMappingRefresher.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/TableMappingRefresher.java @@ -2,6 +2,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.TimerTask; public class TableMappingRefresher extends TimerTask { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java index b9e5011a..13aa47bc 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java @@ -1,6 +1,11 @@ package com.clickhouse.kafka.connect.sink.db.helper; -import com.clickhouse.client.*; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseNodeSelector; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.api.Client; import com.clickhouse.client.api.enums.ProxyType; import com.clickhouse.client.api.query.GenericRecord; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java index 4e23b361..af485f29 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java @@ -12,7 +12,12 @@ import reactor.util.function.Tuple3; import reactor.util.function.Tuples; -import java.util.*; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java index cab68272..2e56a04e 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java @@ -7,7 +7,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java b/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java index 1fe2c463..70d0c323 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java @@ -1,7 +1,6 @@ package com.clickhouse.kafka.connect.sink.processing; import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig; -import com.clickhouse.kafka.connect.sink.ClickHouseSinkTask; import com.clickhouse.kafka.connect.sink.data.Record; import com.clickhouse.kafka.connect.sink.db.DBWriter; import com.clickhouse.kafka.connect.sink.dlq.DuplicateException; @@ -13,8 +12,6 @@ import com.clickhouse.kafka.connect.sink.state.StateRecord; import com.clickhouse.kafka.connect.util.QueryIdentifier; import com.clickhouse.kafka.connect.util.Utils; -import org.apache.kafka.connect.sink.SinkRecord; -import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/InMemoryState.java b/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/InMemoryState.java index db1d1dbd..3b855a0a 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/InMemoryState.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/InMemoryState.java @@ -1,6 +1,5 @@ package com.clickhouse.kafka.connect.sink.state.provider; -import com.clickhouse.kafka.connect.sink.kafka.RangeContainer; import com.clickhouse.kafka.connect.sink.state.State; import com.clickhouse.kafka.connect.sink.state.StateProvider; import com.clickhouse.kafka.connect.sink.state.StateRecord; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/KeeperStateProvider.java b/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/KeeperStateProvider.java index 41a4b715..48fd01c7 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/KeeperStateProvider.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/KeeperStateProvider.java @@ -1,7 +1,11 @@ package com.clickhouse.kafka.connect.sink.state.provider; -import com.clickhouse.client.*; -import com.clickhouse.client.api.query.Records; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseNodeSelector; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/RedisStateProvider.java b/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/RedisStateProvider.java deleted file mode 100644 index c23601b6..00000000 --- a/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/RedisStateProvider.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.clickhouse.kafka.connect.sink.state.provider; - -import com.clickhouse.kafka.connect.sink.state.State; -import com.clickhouse.kafka.connect.sink.state.StateProvider; -import com.clickhouse.kafka.connect.sink.state.StateRecord; -import io.lettuce.core.*; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.api.sync.RedisCommands; - -import java.util.Optional; - -public class RedisStateProvider implements StateProvider { - - private RedisClient redisClient = null; - private RedisCommands syncCommands = null; - public RedisStateProvider(String host, int port, Optional password) { - String url = null; - if (password.isPresent()) - url = String.format("redis://%s@%s:%d/0", password.get(), host, port); - else - url = String.format("redis://%s:%d/0", host, port); - - redisClient = RedisClient.create(url); - StatefulRedisConnection connection = redisClient.connect(); - syncCommands = connection.sync(); - } - - @Override - public StateRecord getStateRecord(String topic, int partition) { - String value = syncCommands.get(String.format("%s-%d", topic, partition)); - - String [] values = value.split("-"); - - long maxOffset = Long.valueOf(values[0]).longValue(); - long minOffset = Long.valueOf(values[1]).longValue(); - State state = State.valueOf(values[2]); - return new StateRecord(topic, partition, maxOffset, minOffset, state); - - } - @Override - public void setStateRecord(StateRecord stateRecord) { - String key = String.format("%s-%d", stateRecord.getTopic(), stateRecord.getPartition()); - String value = String.format("%d-%d-%s", stateRecord.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getState().toString()); - syncCommands.set(key,value); - } - -} diff --git a/src/main/java/com/clickhouse/kafka/connect/transforms/ExtractTopic.java b/src/main/java/com/clickhouse/kafka/connect/transforms/ExtractTopic.java index c65e4242..78fbbb45 100644 --- a/src/main/java/com/clickhouse/kafka/connect/transforms/ExtractTopic.java +++ b/src/main/java/com/clickhouse/kafka/connect/transforms/ExtractTopic.java @@ -18,11 +18,6 @@ package com.clickhouse.kafka.connect.transforms; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Optional; - import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; @@ -32,6 +27,11 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.transforms.Transformation; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; + public abstract class ExtractTopic> implements Transformation { private static final List SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM = Arrays.asList( diff --git a/src/main/java/com/clickhouse/kafka/connect/transforms/ExtractTopicConfig.java b/src/main/java/com/clickhouse/kafka/connect/transforms/ExtractTopicConfig.java index fcbe003c..88a90023 100644 --- a/src/main/java/com/clickhouse/kafka/connect/transforms/ExtractTopicConfig.java +++ b/src/main/java/com/clickhouse/kafka/connect/transforms/ExtractTopicConfig.java @@ -16,12 +16,12 @@ package com.clickhouse.kafka.connect.transforms; -import java.util.Map; -import java.util.Optional; - import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import java.util.Map; +import java.util.Optional; + class ExtractTopicConfig extends AbstractConfig { public static final String FIELD_NAME_CONFIG = "field.name"; private static final String FIELD_NAME_DOC = diff --git a/src/main/java/com/clickhouse/kafka/connect/transforms/KeyToValue.java b/src/main/java/com/clickhouse/kafka/connect/transforms/KeyToValue.java index 2675271b..595f4a02 100644 --- a/src/main/java/com/clickhouse/kafka/connect/transforms/KeyToValue.java +++ b/src/main/java/com/clickhouse/kafka/connect/transforms/KeyToValue.java @@ -1,12 +1,12 @@ package com.clickhouse.kafka.connect.transforms; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.Transformation; -import org.apache.kafka.common.config.AbstractConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/clickhouse/kafka/connect/util/Utils.java b/src/main/java/com/clickhouse/kafka/connect/util/Utils.java index c180eca7..233848d9 100644 --- a/src/main/java/com/clickhouse/kafka/connect/util/Utils.java +++ b/src/main/java/com/clickhouse/kafka/connect/util/Utils.java @@ -1,7 +1,6 @@ package com.clickhouse.kafka.connect.util; import com.clickhouse.client.ClickHouseException; -import com.clickhouse.kafka.connect.sink.data.Data; import com.clickhouse.kafka.connect.sink.data.Record; import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter; import org.apache.kafka.connect.errors.DataException; @@ -13,13 +12,10 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class Utils { diff --git a/src/main/java/com/clickhouse/kafka/connect/util/jmx/MBeanServerUtils.java b/src/main/java/com/clickhouse/kafka/connect/util/jmx/MBeanServerUtils.java index 8d72037c..f4a9fefb 100644 --- a/src/main/java/com/clickhouse/kafka/connect/util/jmx/MBeanServerUtils.java +++ b/src/main/java/com/clickhouse/kafka/connect/util/jmx/MBeanServerUtils.java @@ -1,12 +1,12 @@ package com.clickhouse.kafka.connect.util.jmx; -import java.lang.management.ManagementFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.management.InstanceAlreadyExistsException; import javax.management.MBeanServer; import javax.management.ObjectName; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.lang.management.ManagementFactory; public final class MBeanServerUtils { private static final Logger LOGGER = LoggerFactory.getLogger(MBeanServerUtils.class); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java index 970b3490..88f0a32d 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java @@ -6,11 +6,9 @@ import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.client.config.ClickHouseClientOption; -import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; -import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; @@ -18,7 +16,6 @@ import org.testcontainers.clickhouse.ClickHouseContainer; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.Map; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkJdbcPropertiesTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkJdbcPropertiesTest.java index e2b10069..29bf24a4 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkJdbcPropertiesTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkJdbcPropertiesTest.java @@ -1,18 +1,13 @@ package com.clickhouse.kafka.connect.sink; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import com.clickhouse.client.ClickHouseClient; -import com.clickhouse.client.ClickHouseException; -import com.clickhouse.client.ClickHouseProtocol; -import com.clickhouse.client.ClickHouseResponse; -import com.clickhouse.client.ClickHouseResponseSummary; -import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -20,12 +15,8 @@ import java.util.List; import java.util.Map; import java.util.stream.LongStream; -import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.testcontainers.clickhouse.ClickHouseContainer; + +import static org.junit.jupiter.api.Assertions.assertEquals; public class ClickHouseSinkJdbcPropertiesTest extends ClickHouseBase { public Collection createPrimitiveTypes(String topic, int partition) { diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskMappingTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskMappingTest.java index f834382e..b6e06c87 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskMappingTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskMappingTest.java @@ -1,18 +1,13 @@ package com.clickhouse.kafka.connect.sink; -import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.clickhouse.kafka.connect.sink.helper.SchemaTestData; import com.clickhouse.kafka.connect.sink.helper.SchemalessTestData; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.clickhouse.ClickHouseContainer; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java index 5c7b6d20..07bb0433 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java @@ -1,6 +1,6 @@ package com.clickhouse.kafka.connect.sink; -import com.clickhouse.client.*; +import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.config.ClickHouseProxyType; import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; @@ -9,13 +9,16 @@ import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.ToxiproxyContainer; import java.io.IOException; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import static org.junit.jupiter.api.Assertions.*; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessTest.java index 14d1961c..53299897 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessTest.java @@ -4,14 +4,13 @@ import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.clickhouse.kafka.connect.sink.helper.SchemalessTestData; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; public class ClickHouseSinkTaskSchemalessTest extends ClickHouseBase { diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskStringTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskStringTest.java index 55e15174..057e56d6 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskStringTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskStringTest.java @@ -1,12 +1,6 @@ package com.clickhouse.kafka.connect.sink; -import com.clickhouse.client.ClickHouseClient; -import com.clickhouse.client.ClickHouseException; -import com.clickhouse.client.ClickHouseProtocol; -import com.clickhouse.client.ClickHouseResponse; -import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.client.api.query.Records; -import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.dlq.InMemoryDLQ; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; @@ -14,10 +8,7 @@ import com.google.gson.reflect.TypeToken; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.clickhouse.ClickHouseContainer; import java.util.ArrayList; import java.util.Arrays; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java index f354d202..55b9ec2f 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java @@ -7,15 +7,11 @@ import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; -import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.clickhouse.ClickHouseContainer; import java.io.PrintWriter; import java.io.StringWriter; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java index 199b6965..72b79499 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java @@ -1,6 +1,6 @@ package com.clickhouse.kafka.connect.sink; -import com.clickhouse.client.*; +import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.config.ClickHouseProxyType; import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; @@ -22,11 +22,13 @@ import org.testcontainers.containers.ToxiproxyContainer; import java.io.IOException; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.stream.LongStream; import static org.junit.jupiter.api.Assertions.*; -import static org.junit.jupiter.api.Assertions.assertFalse; @ExtendWith(FromVersionConditionExtension.class) public class ClickHouseSinkTaskWithSchemaProxyTest extends ClickHouseBase { diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java index c696e233..8dd8ca01 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java @@ -3,8 +3,8 @@ import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.clickhouse.kafka.connect.sink.helper.SchemaTestData; -import com.clickhouse.kafka.connect.sink.junit.extension.SinceClickHouseVersion; import com.clickhouse.kafka.connect.sink.junit.extension.FromVersionConditionExtension; +import com.clickhouse.kafka.connect.sink.junit.extension.SinceClickHouseVersion; import com.clickhouse.kafka.connect.util.Utils; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; @@ -18,7 +18,9 @@ import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; import java.math.BigDecimal; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriterTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriterTest.java index 6b1e299a..7aeba4b9 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriterTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriterTest.java @@ -12,31 +12,25 @@ import com.clickhouse.kafka.connect.sink.db.mapping.Type; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.clickhouse.kafka.connect.sink.junit.extension.FromVersionConditionExtension; +import com.clickhouse.kafka.connect.util.Utils; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -import org.junit.Ignore; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.clickhouse.kafka.connect.util.Utils; - -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import static com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers.newDescriptor; +import static org.junit.jupiter.api.Assertions.*; @ExtendWith(FromVersionConditionExtension.class) public class ClickHouseWriterTest extends ClickHouseBase { diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClientTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClientTest.java index 5c59844e..c101b895 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClientTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClientTest.java @@ -5,7 +5,6 @@ import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.clickhouse.kafka.connect.sink.junit.extension.FromVersionConditionExtension; import com.clickhouse.kafka.connect.sink.junit.extension.SinceClickHouseVersion; -import com.clickhouse.kafka.connect.util.Utils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -13,7 +12,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/dlq/InMemoryDLQ.java b/src/test/java/com/clickhouse/kafka/connect/sink/dlq/InMemoryDLQ.java index fa444e63..f1bbcb36 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/dlq/InMemoryDLQ.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/dlq/InMemoryDLQ.java @@ -1,6 +1,5 @@ package com.clickhouse.kafka.connect.sink.dlq; -import com.clickhouse.kafka.connect.sink.data.Record; import org.apache.kafka.connect.sink.SinkRecord; import java.util.ArrayList; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java b/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java index 9a490b0a..23eba246 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java @@ -1,6 +1,10 @@ package com.clickhouse.kafka.connect.sink.helper; -import com.clickhouse.client.*; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.client.api.metrics.OperationMetrics; import com.clickhouse.client.api.query.QueryResponse; import com.clickhouse.client.api.query.QuerySettings; @@ -22,10 +26,14 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class ClickHouseTestHelpers { private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseTestHelpers.class); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java index 17553306..6201e6fa 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java @@ -1,15 +1,30 @@ package com.clickhouse.kafka.connect.sink.helper; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.connect.data.*; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.sink.SinkRecord; import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils; import java.math.BigDecimal; -import java.time.*; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.LongStream; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java index 75595474..7c06ef4a 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java @@ -4,7 +4,12 @@ import org.apache.kafka.connect.sink.SinkRecord; import java.math.BigDecimal; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.stream.LongStream; public class SchemalessTestData { diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/kafa/RangeContainerTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/kafa/RangeContainerTest.java index 879f0a56..53c58f8f 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/kafa/RangeContainerTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/kafa/RangeContainerTest.java @@ -1,13 +1,13 @@ package com.clickhouse.kafka.connect.sink.kafa; -import static org.junit.jupiter.api.Assertions.assertEquals; - import com.clickhouse.kafka.connect.sink.kafka.RangeContainer; import com.clickhouse.kafka.connect.sink.kafka.RangeState; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class RangeContainerTest { private String topic = "test"; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java index bd9fbe47..12bfd826 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java @@ -1,17 +1,11 @@ package com.clickhouse.kafka.connect.sink.processing; -import com.clickhouse.client.ClickHouseConfig; -import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import com.clickhouse.kafka.connect.sink.ClickHouseSinkTask; +import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig; import com.clickhouse.kafka.connect.sink.data.Data; import com.clickhouse.kafka.connect.sink.data.Record; import com.clickhouse.kafka.connect.sink.data.SchemaType; import com.clickhouse.kafka.connect.sink.db.DBWriter; import com.clickhouse.kafka.connect.sink.db.InMemoryDBWriter; -import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter; import com.clickhouse.kafka.connect.sink.dlq.InMemoryDLQ; import com.clickhouse.kafka.connect.sink.state.State; import com.clickhouse.kafka.connect.sink.state.StateProvider; @@ -31,6 +25,9 @@ import java.util.concurrent.ExecutionException; import java.util.stream.LongStream; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + public class ProcessingTest { diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/provider/LocalProviderTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/provider/LocalProviderTest.java index 32502be9..764aefec 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/provider/LocalProviderTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/provider/LocalProviderTest.java @@ -1,8 +1,6 @@ package com.clickhouse.kafka.connect.sink.provider; -import static org.junit.jupiter.api.Assertions.assertEquals; - import com.clickhouse.kafka.connect.sink.state.State; import com.clickhouse.kafka.connect.sink.state.StateProvider; import com.clickhouse.kafka.connect.sink.state.StateRecord; @@ -10,6 +8,8 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class LocalProviderTest { diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/util/MaskTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/util/MaskTest.java index 79d5bf05..9faa9b19 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/util/MaskTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/util/MaskTest.java @@ -1,17 +1,9 @@ package com.clickhouse.kafka.connect.sink.util; -import com.clickhouse.kafka.connect.sink.data.Record; -import com.clickhouse.kafka.connect.sink.db.DBWriter; -import com.clickhouse.kafka.connect.sink.db.InMemoryDBWriter; -import com.clickhouse.kafka.connect.sink.processing.Processing; -import com.clickhouse.kafka.connect.sink.state.StateProvider; -import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState; import com.clickhouse.kafka.connect.util.Mask; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import java.util.List; - import static org.junit.jupiter.api.Assertions.assertEquals; public class MaskTest { diff --git a/src/test/java/transforms/KeyToValueTest.java b/src/test/java/transforms/KeyToValueTest.java index c0a4182b..03da15a4 100644 --- a/src/test/java/transforms/KeyToValueTest.java +++ b/src/test/java/transforms/KeyToValueTest.java @@ -1,7 +1,6 @@ package transforms; import com.clickhouse.kafka.connect.transforms.KeyToValue; -import org.apache.kafka.common.record.Record; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct;