Skip to content

Commit

Permalink
Merge pull request #404 from ClickHouse/adjust-nested-exception
Browse files Browse the repository at this point in the history
Tweak to return null instead of throw exception
  • Loading branch information
Paultagoras authored Jun 21, 2024
2 parents 7d09144 + 6f9799a commit c9d2a86
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## 1.1.1
* Bugfix to address string encoding issue
* Bugfix to address issue with nested types and flatten_nested setting conflict
* Bugfix to avoid storing keeper state in same column name if virtual topic is enabled

## 1.1.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.clickhouse.kafka.connect.sink;

import com.clickhouse.client.config.ClickHouseProxyType;
import lombok.Getter;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
Expand All @@ -12,6 +13,7 @@
import java.util.List;
import java.util.Map;

@Getter
public class ClickHouseSinkConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkConfig.class);

Expand Down Expand Up @@ -74,7 +76,6 @@ public enum StateStores {
private final long tableRefreshInterval;
private final boolean suppressTableExistenceException;
private final boolean errorsTolerance;

private final Map<String, String> clickhouseSettings;
private final Map<String, String> topicToTableMap;
private final ClickHouseProxyType proxyType;
Expand Down Expand Up @@ -510,59 +511,4 @@ private static ConfigDef createConfigDef() {
);
return configDef;
}

public String getHostname() {
return hostname;
}
public int getPort() {
return port;
}
public String getDatabase() {
return database;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public boolean isSslEnabled() {
return sslEnabled;
}
public String getJdbcConnectionProperties() {
return jdbcConnectionProperties;
}
public int getTimeout() {
return timeout;
}
public int getRetry() { return retry; }
public long getTableRefreshInterval() {
return tableRefreshInterval;
}
public boolean getExactlyOnce() { return exactlyOnce; }
public boolean getSuppressTableExistenceException() {
return suppressTableExistenceException;
}
public Map<String, String> getClickhouseSettings() {return clickhouseSettings;}
public Map<String, String> getTopicToTableMap() {return topicToTableMap;}
public boolean getErrorsTolerance() { return errorsTolerance; }
public InsertFormats getInsertFormat() { return insertFormat; }
public ClickHouseProxyType getProxyType() {
return proxyType;
}
public String getProxyHost() {
return proxyHost;
}
public int getProxyPort() {
return proxyPort;
}
public String getZkPath() {
return zkPath;
}
public String getZkDatabase() {
return zkDatabase;
}
public boolean getEnableDbTopicSplit() { return enableDbTopicSplit; }
public String getDbTopicSplitChar() { return dbTopicSplitChar; }
public String getKeeperOnCluster() { return keeperOnCluster; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void put(Collection<SinkRecord> records) {
this.proxySinkTask.put(records);
} catch (Exception e) {
LOGGER.trace("Passing the exception to the exception handler.");
boolean errorTolerance = clickHouseSinkConfig != null && clickHouseSinkConfig.getErrorsTolerance();
boolean errorTolerance = clickHouseSinkConfig != null && clickHouseSinkConfig.isErrorsTolerance();
Utils.handleException(e, errorTolerance, records);
if (errorTolerance && errorReporter != null) {
LOGGER.warn("Sending [{}] records to DLQ for exception: {}", records.size(), e.getLocalizedMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class ProxySinkTask {

public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final ErrorReporter errorReporter) {
this.clickHouseSinkConfig = clickHouseSinkConfig;
LOGGER.info("Enable ExactlyOnce? {}", clickHouseSinkConfig.getExactlyOnce());
if ( clickHouseSinkConfig.getExactlyOnce() ) {
LOGGER.info("Enable ExactlyOnce? {}", clickHouseSinkConfig.isExactlyOnce());
if ( clickHouseSinkConfig.isExactlyOnce() ) {
this.stateProvider = new KeeperStateProvider(clickHouseSinkConfig);
} else {
this.stateProvider = new InMemoryState();
Expand Down Expand Up @@ -87,7 +87,7 @@ public void put(final Collection<SinkRecord> records) throws IOException, Execut

Map<String, List<Record>> dataRecords = records.stream()
.map(v -> Record.convert(v,
clickHouseSinkConfig.getEnableDbTopicSplit(),
clickHouseSinkConfig.isEnableDbTopicSplit(),
clickHouseSinkConfig.getDbTopicSplitChar(),
clickHouseSinkConfig.getDatabase() ))
.collect(Collectors.groupingBy(Record::getTopicAndPartition));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ protected void doInsertRawBinary(List<Record> records, Table table, QueryIdentif

protected void doInsertJson(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
//https://devqa.io/how-to-convert-java-map-to-json/
boolean enableDbTopicSplit = csc.getEnableDbTopicSplit();
boolean enableDbTopicSplit = csc.isEnableDbTopicSplit();
String dbTopicSplitChar = csc.getDbTopicSplitChar();
LOGGER.trace("enableDbTopicSplit: {}", enableDbTopicSplit);
Gson gson = new Gson();
Expand Down Expand Up @@ -864,7 +864,7 @@ private Table getTable(String topic) {
String tableName = Utils.getTableName(topic, csc.getTopicToTableMap());
Table table = this.mapping.get(tableName);
if (table == null) {
if (csc.getSuppressTableExistenceException()) {
if (csc.isSuppressTableExistenceException()) {
LOGGER.warn("Table [{}] does not exist, but error was suppressed.", tableName);
} else {
//TODO to pick the correct exception here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.clickhouse.kafka.connect.util.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,6 +30,7 @@ public class ClickHouseHelperClient {
private final String hostname;
private final int port;
private final String username;
@Getter
private final String database;
private final String password;
private final boolean sslEnabled;
Expand Down Expand Up @@ -57,9 +59,6 @@ public ClickHouseHelperClient(ClickHouseClientBuilder builder) {
this.server = create();
}

public String getDatabase() {
return database;
}
public Map<ClickHouseOption, Serializable> getDefaultClientOptions() {
Map<ClickHouseOption, Serializable> options = new HashMap<>();
options.put(ClickHouseClientOption.PRODUCT_NAME, "clickhouse-kafka-connect/"+ClickHouseClientOption.class.getPackage().getImplementationVersion());
Expand Down Expand Up @@ -218,6 +217,11 @@ public Table describeTable(String tableName) {
}

Column column = Column.extractColumn(fieldDescriptor);
//If we run into a rare column we can't handle, just ignore the table and warn the user
if (column == null) {
LOGGER.warn("Unable to handle column: {}", fieldDescriptor.getName());
return null;
}
table.addColumn(column);
}
return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ public static Column extractColumn(String name, String valueType, boolean isNull
.tupleFields(new ArrayList<>())
.build();
} else if (valueType.startsWith("Nested")) {
throw new IllegalArgumentException("DESCRIBE TABLE is never supposed to return Nested type. It should always yield its Array fields directly.");
LOGGER.warn("DESCRIBE TABLE is never supposed to return Nested type - it should always yield its Array fields directly. " +
"This is likely caused by a different table in the same database using 'flatten_nested=0', so we'll ignore that table...");
return null;//This is a special case where we don't want to create a column
} else if (valueType.startsWith("Variant")) {
String rawVariantTypes = valueType.substring("Variant".length() + 1, valueType.length() - 1);
List<Tuple2<Column, String>> variantTypes = splitUnlessInBrackets(rawVariantTypes, ',').stream().map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
String database = record.getDatabase();
String topic = record.getRecordOffsetContainer().getTopic();

if (this.clickHouseSinkConfig != null && clickHouseSinkConfig.getEnableDbTopicSplit()) {
if (this.clickHouseSinkConfig != null && clickHouseSinkConfig.isEnableDbTopicSplit()) {
topic = database + clickHouseSinkConfig.getDbTopicSplitChar() + topic;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.clickhouse.kafka.connect.sink.db.helper;

import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.kafka.connect.sink.ClickHouseBase;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.sink.junit.extension.FromVersionConditionExtension;
import com.clickhouse.kafka.connect.sink.junit.extension.SinceClickHouseVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

@ExtendWith(FromVersionConditionExtension.class)
public class ClickHouseHelperClientTest extends ClickHouseBase {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseHelperClientTest.class);
ClickHouseHelperClient chc = null;

@BeforeEach
public void setUp() {
LOGGER.info("Setting up...");
Map<String, String> props = createProps();
chc = createClient(props);
}

@Test
public void ping() {
Assertions.assertTrue(chc.ping());
}

@Test
public void describeNestedFlattenedTable() {
String topic = createTopicName("nested_flattened_table_test");
ClickHouseTestHelpers.createTable(chc, topic,
"CREATE TABLE %s ( `num` String, " +
"`nested` Nested (innerInt Int32, innerString String)) " +
"Engine = MergeTree ORDER BY num");

try {
Table table = chc.describeTable(topic);
Assertions.assertEquals(3, table.getRootColumnsList().size());
} finally {
ClickHouseTestHelpers.dropTable(chc, topic);
}
}

@Test
@SinceClickHouseVersion("24.1")
public void describeNestedUnFlattenedTable() {
String nestedTopic = createTopicName("nested_unflattened_table_test");
String normalTopic = createTopicName("normal_unflattened_table_test");
ClickHouseTestHelpers.query(chc, "CREATE USER IF NOT EXISTS unflatten IDENTIFIED BY '123FOURfive^&*91011' SETTINGS flatten_nested=0");
ClickHouseTestHelpers.query(chc, "GRANT CURRENT GRANTS ON *.* TO unflatten");

Map<String, String> props = createProps();
props.put("username", "unflatten");
props.put("password", "123FOURfive^&*91011");
chc = createClient(props);

ClickHouseTestHelpers.createTable(chc, nestedTopic,
"CREATE TABLE %s ( `num` String, " +
"`nested` Nested (innerInt Int32, innerString String)) " +
"Engine = MergeTree ORDER BY num");
ClickHouseTestHelpers.createTable(chc, normalTopic,
"CREATE TABLE %s ( `num` String ) " +
"Engine = MergeTree ORDER BY num");

try {
Table nestedTable = chc.describeTable(nestedTopic);
Assertions.assertNull(nestedTable);

Table normalTable = chc.describeTable(normalTopic);
Assertions.assertEquals(1, normalTable.getRootColumnsList().size());
} finally {
ClickHouseTestHelpers.dropTable(chc, nestedTopic);
ClickHouseTestHelpers.dropTable(chc, normalTopic);
ClickHouseTestHelpers.query(chc, "DROP USER IF EXISTS unflatten");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,41 @@ public static boolean isCloud() {
LOGGER.info("Version: {}", version);
return version != null && version.equalsIgnoreCase("cloud");
}
public static ClickHouseResponseSummary dropTable(ClickHouseHelperClient chc, String tableName) {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName);

public static ClickHouseResponseSummary query(ClickHouseHelperClient chc, String query) {
try (ClickHouseClient client = ClickHouseClient.builder()
.options(chc.getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(chc.getServer())
.query(dropTable)
.query(query)
.executeAndWait()) {
return response.getSummary();
} catch (ClickHouseException e) {
throw new RuntimeException(e);
}
}

public static ClickHouseResponseSummary query(ClickHouseHelperClient chc, String query, ClickHouseFormat format) {
try (ClickHouseClient client = ClickHouseClient.builder()
.options(chc.getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(chc.getServer())
.query(query)
.format(format)
.executeAndWait()) {
return response.getSummary();
} catch (ClickHouseException e) {
throw new RuntimeException(e);
}
}

public static ClickHouseResponseSummary dropTable(ClickHouseHelperClient chc, String tableName) {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName);
return query(chc, dropTable);
}

public static ClickHouseResponseSummary createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) {
return createTable(chc, tableName, createTableQuery, new HashMap<>());
}
Expand Down

0 comments on commit c9d2a86

Please sign in to comment.