Skip to content

Commit

Permalink
Merge pull request #413 from ClickHouse/adjust-datetime-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Paultagoras authored Jul 18, 2024
2 parents 0eeaa9f + 19b1d27 commit 716baf6
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 15 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# 1.1.2
## 1.1.2
* Adding a "dateTimeFormat" configuration option to allow for custom date/time formatting with String schema values
* Adding ephemeral column support and adding an error message

## 1.1.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -43,6 +44,7 @@ public class ClickHouseSinkConfig {
public static final String ENABLE_DB_TOPIC_SPLIT = "enableDbTopicSplit";
public static final String DB_TOPIC_SPLIT_CHAR = "dbTopicSplitChar";
public static final String KEEPER_ON_CLUSTER = "keeperOnCluster";
public static final String DATE_TIME_FORMAT = "dateTimeFormats";

public static final int MILLI_IN_A_SEC = 1000;
private static final String databaseDefault = "default";
Expand Down Expand Up @@ -86,6 +88,7 @@ public enum StateStores {
private final boolean enableDbTopicSplit;
private final String dbTopicSplitChar;
private final String keeperOnCluster;
private final Map<String, DateTimeFormatter> dateTimeFormats;

public enum InsertFormats {
NONE,
Expand Down Expand Up @@ -245,6 +248,17 @@ public ClickHouseSinkConfig(Map<String, String> props) {
this.dbTopicSplitChar = props.getOrDefault(DB_TOPIC_SPLIT_CHAR, "");
this.keeperOnCluster = props.getOrDefault(KEEPER_ON_CLUSTER, "");
this.bypassRowBinary = Boolean.parseBoolean(props.getOrDefault("bypassRowBinary", "false"));
this.dateTimeFormats = new HashMap<>();
String dateTimeFormatsString = props.getOrDefault(DATE_TIME_FORMAT, "").trim();
if (!dateTimeFormatsString.isBlank()) {
String [] stringSplit = dateTimeFormatsString.split(";");
for (String topicToDateTimeFormat: stringSplit) {
String [] propSplit = topicToDateTimeFormat.trim().split("=");
if ( propSplit.length == 2 ) {
dateTimeFormats.put(propSplit[0].trim(), DateTimeFormatter.ofPattern(propSplit[1].trim()));
}
}
}

LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);
Expand Down Expand Up @@ -521,6 +535,15 @@ private static ConfigDef createConfigDef() {
++orderInGroup,
ConfigDef.Width.SHORT,
"Bypass RowBinary format.");
configDef.define(DATE_TIME_FORMAT,
ConfigDef.Type.LIST,
"",
ConfigDef.Importance.LOW,
"Date time formats for parsing date time fields (e.g. 'someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS'). default: ''",
group,
++orderInGroup,
ConfigDef.Width.SHORT,
"Date time formats.");
return configDef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import com.google.gson.reflect.TypeToken;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -243,12 +246,14 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie
return validSchema;
}

protected void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data value, int precision) throws IOException {
protected void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data value, int precision, String columnName) throws IOException {
// TODO: develop more specific tests to have better coverage
if (value.getObject() == null) {
BinaryStreamUtils.writeNull(stream);
return;
}

LOGGER.trace("Writing date type: {}, value: {}, value class: {}", type, value.getObject(), value.getObject().getClass());
boolean unsupported = false;
switch (type) {
case Date:
Expand Down Expand Up @@ -288,7 +293,6 @@ protected void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data
} else if (value.getFieldType().equals(Schema.Type.STRING)) {
try {
ZonedDateTime zonedDateTime = ZonedDateTime.parse((String) value.getObject());
LOGGER.trace("Writing epoch seconds: {}", zonedDateTime.toInstant().getEpochSecond());
BinaryStreamUtils.writeUnsignedInt32(stream, zonedDateTime.toInstant().getEpochSecond());
} catch (Exception e) {
LOGGER.error("Error parsing date time string: {}", value.getObject());
Expand All @@ -302,18 +306,33 @@ protected void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data
if (value.getFieldType().equals(Schema.Type.INT64)) {
if (value.getObject().getClass().getName().endsWith(".Date")) {
Date date = (Date) value.getObject();
long time = date.getTime();
BinaryStreamUtils.writeInt64(stream, time);
BinaryStreamUtils.writeInt64(stream, date.getTime());
} else {
BinaryStreamUtils.writeInt64(stream, (Long) value.getObject());
}
} else if (value.getFieldType().equals(Schema.Type.STRING)) {
try {
ZonedDateTime zonedDateTime = ZonedDateTime.parse((String) value.getObject());
long seconds = zonedDateTime.toInstant().getEpochSecond();
long milliSeconds = zonedDateTime.toInstant().toEpochMilli();
long microSeconds = TimeUnit.MICROSECONDS.convert(seconds, TimeUnit.SECONDS) + zonedDateTime.get(ChronoField.MICRO_OF_SECOND);
long nanoSeconds = TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS) + zonedDateTime.getNano();
long seconds;
long milliSeconds;
long microSeconds;
long nanoSeconds;

if (!csc.getDateTimeFormats().isEmpty()) {
Map<String, DateTimeFormatter> formats = csc.getDateTimeFormats();
DateTimeFormatter formatter = formats.get(columnName);
LOGGER.trace("Using custom date time format: {}", formatter);
LocalDateTime localDateTime = LocalDateTime.from(formatter.parse((String) value.getObject()));
seconds = localDateTime.toInstant(ZoneOffset.UTC).getEpochSecond();
milliSeconds = localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
microSeconds = TimeUnit.MICROSECONDS.convert(seconds, TimeUnit.SECONDS) + localDateTime.get(ChronoField.MICRO_OF_SECOND);
nanoSeconds = TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS) + localDateTime.getNano();
} else {
ZonedDateTime zonedDateTime = ZonedDateTime.parse((String) value.getObject());
seconds = zonedDateTime.toInstant().getEpochSecond();
milliSeconds = zonedDateTime.toInstant().toEpochMilli();
microSeconds = TimeUnit.MICROSECONDS.convert(seconds, TimeUnit.SECONDS) + zonedDateTime.get(ChronoField.MICRO_OF_SECOND);
nanoSeconds = TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS) + zonedDateTime.getNano();
}

if (precision == 3) {
LOGGER.trace("Writing epoch milliseconds: {}", milliSeconds);
Expand All @@ -329,7 +348,7 @@ protected void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data
BinaryStreamUtils.writeInt64(stream, seconds);
}
} catch (Exception e) {
LOGGER.error("Error parsing date time string: {}", value.getObject());
LOGGER.error("Error parsing date time string: {}, exception: {}", value.getObject(), e.getMessage());
unsupported = true;
}
} else {
Expand All @@ -338,7 +357,7 @@ protected void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data
break;
}
if (unsupported) {
String msg = String.format("Not implemented conversion from %s to %s", value.getFieldType(), type);
String msg = String.format("(Potentially) Not implemented conversion from %s to %s", value.getFieldType(), type);
LOGGER.error(msg);
throw new DataException(msg);
}
Expand Down Expand Up @@ -372,7 +391,7 @@ protected void doWriteColValue(Column col, ClickHousePipedOutputStream stream, D
case Date32:
case DateTime:
case DateTime64:
doWriteDates(columnType, stream, value, col.getPrecision());
doWriteDates(columnType, stream, value, col.getPrecision(), col.getName());
break;
case Decimal:
if (value.getObject() == null) {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/clickhouse/kafka/connect/util/Utils.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.clickhouse.kafka.connect.util;

import com.clickhouse.client.ClickHouseException;
import com.clickhouse.kafka.connect.sink.data.Data;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import org.apache.kafka.connect.errors.DataException;
Expand All @@ -12,7 +13,9 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public static void setup() throws IOException {
database = String.format("kafka_connect_test_%s", System.currentTimeMillis());
}
if (isCloud) {
initialPing();
return;
}
db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE);
Expand Down Expand Up @@ -121,6 +122,45 @@ protected void createDatabase(String database, ClickHouseHelperClient chc) {

}

protected static void initialPing() {
ClickHouseSinkConfig csc = new ClickHouseSinkConfig(new ClickHouseBase().createProps());

String hostname = csc.getHostname();
int port = csc.getPort();
String username = csc.getUsername();
String password = csc.getPassword();
boolean sslEnabled = csc.isSslEnabled();
int timeout = csc.getTimeout();

ClickHouseHelperClient tmpChc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
.setDatabase("default")
.setUsername(username)
.setPassword(password)
.sslEnable(sslEnabled)
.setTimeout(timeout)
.setRetry(csc.getRetry())
.build();

boolean ping;
int retry = 0;
do {
LOGGER.info("Pinging ClickHouse server to warm up for tests...");
ping = tmpChc.ping();
if (!ping) {
LOGGER.info("Unable to ping ClickHouse server, retrying... {}", retry);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} while (!ping && retry++ < 10);

if (!ping) {
throw new RuntimeException("Unable to ping ClickHouse server...");
}
}

protected static void dropDatabase(String database) {
ClickHouseSinkConfig csc = new ClickHouseSinkConfig(new ClickHouseBase().createProps());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.clickhouse.kafka.connect.util.Utils;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -18,6 +17,8 @@

import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -266,6 +267,39 @@ public void supportZonedDatesStringTest() {
}


@Test
public void supportFormattedDatesStringTest() {
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.DATE_TIME_FORMAT, "format_date=yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
ClickHouseHelperClient chc = createClient(props);

String topic = createTopicName("support-formatted-dates-string-test");
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, format_date DateTime64(9)) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = SchemaTestData.createFormattedTimestampConversions(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();

assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
List<JSONObject> results = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic);
AtomicBoolean found = new AtomicBoolean(false);
AtomicInteger count = new AtomicInteger(0);
sr.forEach(sinkRecord -> {
JSONObject row = results.get(0);
if (sinkRecord.value().toString().contains(row.get("format_date").toString())) {
found.set(true);
count.incrementAndGet();
}
});

assertTrue(found.get());
assertEquals(1, count.get());
}



@Test
public void withEmptyDataRecordsTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,15 @@ public static ClickHouseResponseSummary dropTable(ClickHouseHelperClient chc, St
}

public static ClickHouseResponseSummary createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) {
return createTable(chc, tableName, createTableQuery, new HashMap<>());
ClickHouseResponseSummary summary = createTable(chc, tableName, createTableQuery, new HashMap<>());
if (isCloud()) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
LOGGER.error("Error while sleeping", e);
}
}
return summary;
}

public static ClickHouseResponseSummary createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery, Map<String, Serializable> clientSettings) {
Expand Down Expand Up @@ -122,6 +130,7 @@ public static List<JSONObject> getAllRowsAsJson(ClickHouseHelperClient chc, Stri

public static int countRows(ClickHouseHelperClient chc, String tableName) {
String queryCount = String.format("SELECT COUNT(*) FROM `%s`", tableName);

try (ClickHouseClient client = ClickHouseClient.builder()
.options(chc.getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.math.BigDecimal;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.Date;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -907,6 +908,40 @@ public static Collection<SinkRecord> createZonedTimestampConversions(String topi
return array;
}

public static Collection<SinkRecord> createFormattedTimestampConversions(String topic, int partition) {
return createFormattedTimestampConversions(topic, partition, DEFAULT_TOTAL_RECORDS);
}
public static Collection<SinkRecord> createFormattedTimestampConversions(String topic, int partition, int totalRecords) {

Schema NESTED_SCHEMA = SchemaBuilder.struct()
.field("off16", Schema.INT16_SCHEMA)
.field("format_date", Schema.STRING_SCHEMA)
.build();


List<SinkRecord> array = new ArrayList<>();
LongStream.range(0, totalRecords).forEachOrdered(n -> {
Struct value_struct = new Struct(NESTED_SCHEMA)
.put("off16", (short)n)
.put("format_date", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")));


SinkRecord sr = new SinkRecord(
topic,
partition,
null,
null, NESTED_SCHEMA,
value_struct,
n,
System.currentTimeMillis(),
TimestampType.CREATE_TIME
);

array.add(sr);
});
return array;
}



public static Collection<SinkRecord> createFixedStringData(String topic, int partition, int fixedSize) {
Expand Down

0 comments on commit 716baf6

Please sign in to comment.