Skip to content

Commit

Permalink
Merge pull request #256 from ClickHouse/logging-additional-values
Browse files Browse the repository at this point in the history
Logging updates
  • Loading branch information
Paultagoras authored Nov 22, 2023
2 parents cd2cbe0 + 5841290 commit 59a006e
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 52 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.0.6 2023-11-22
* Added additional logging to help debug future issues

## 1.0.5 2023-11-12
* Added 'zkPath' and 'zkDatabase' properties to customize exactly-once state storage

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.0.5
v1.0.6
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void put(Collection<SinkRecord> records) {
} catch (Exception e) {
LOGGER.trace("Passing the exception to the exception handler.");
boolean errorTolerance = clickHouseSinkConfig != null && clickHouseSinkConfig.getErrorsTolerance();
Utils.handleException(e, errorTolerance);
Utils.handleException(e, errorTolerance, records);
if (errorTolerance && errorReporter != null) {
LOGGER.debug("Sending records to DLQ.");
records.forEach(r -> Utils.sendTODlq(errorReporter, r, e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.Timer;
Expand Down Expand Up @@ -70,7 +72,7 @@ public void stop() {
MBeanServerUtils.unregisterMBean(getMBeanNAme());
}

public void put(final Collection<SinkRecord> records) {
public void put(final Collection<SinkRecord> records) throws IOException, ExecutionException, InterruptedException {
if (records.isEmpty()) {
LOGGER.trace("No records sent to SinkTask");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,48 +151,39 @@ public ClickHouseNode getServer() {
return chc.getServer();
}

public void doInsert(List<Record> records) {
public void doInsert(List<Record> records) throws IOException, ExecutionException, InterruptedException {
doInsert(records, null);
}

@Override
public void doInsert(List<Record> records, ErrorReporter errorReporter) {
public void doInsert(List<Record> records, ErrorReporter errorReporter) throws IOException, ExecutionException, InterruptedException {
if (records.isEmpty())
return;

try {
Record first = records.get(0);
String topic = first.getTopic();
Table table = this.mapping.get(Utils.getTableName(topic, csc.getTopicToTableMap()));
LOGGER.debug("Actual Min Offset: {} Max Offset: {} Partition: {}",
first.getRecordOffsetContainer().getOffset(),
records.get(records.size() - 1).getRecordOffsetContainer().getOffset(),
first.getRecordOffsetContainer().getPartition());
LOGGER.debug("Table: {}", table);

switch (first.getSchemaType()) {
case SCHEMA:
if (table.hasDefaults()) {
LOGGER.debug("Default value present, switching to JSON insert instead.");
doInsertJson(records);
} else {
doInsertRawBinary(records);
}
break;
case SCHEMA_LESS:
Record first = records.get(0);
String topic = first.getTopic();
Table table = this.mapping.get(Utils.getTableName(topic, csc.getTopicToTableMap()));
LOGGER.debug("Actual Min Offset: {} Max Offset: {} Partition: {}",
first.getRecordOffsetContainer().getOffset(),
records.get(records.size() - 1).getRecordOffsetContainer().getOffset(),
first.getRecordOffsetContainer().getPartition());
LOGGER.debug("Table: {}", table);

switch (first.getSchemaType()) {
case SCHEMA:
if (table.hasDefaults()) {
LOGGER.debug("Default value present, switching to JSON insert instead.");
doInsertJson(records);
break;
case STRING_SCHEMA:
doInsertString(records);
break;
}
} catch (Exception e) {
LOGGER.trace("Passing the exception to the exception handler.");
Utils.handleException(e, csc.getErrorsTolerance());
if (csc.getErrorsTolerance() && errorReporter != null) {
LOGGER.debug("Sending records to DLQ.");
records.forEach(r -> Utils.sendTODlq(errorReporter, r, e));
}
} else {
doInsertRawBinary(records);
}
break;
case SCHEMA_LESS:
doInsertJson(records);
break;
case STRING_SCHEMA:
doInsertString(records);
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public interface DBWriter {

public boolean start(ClickHouseSinkConfig csc);
public void stop();
public void doInsert(List<Record> records);
public void doInsert(List<Record> records, ErrorReporter errorReporter);
public void doInsert(List<Record> records) throws IOException, ExecutionException, InterruptedException;
public void doInsert(List<Record> records, ErrorReporter errorReporter) throws IOException, ExecutionException, InterruptedException;
public long recordsInserted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class Processing {
Expand All @@ -42,7 +44,7 @@ public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter
*
* @param records
*/
private void doInsert(List<Record> records) {
private void doInsert(List<Record> records) throws IOException, ExecutionException, InterruptedException {
dbWriter.doInsert(records, errorReporter);
}

Expand Down Expand Up @@ -77,7 +79,7 @@ private List<List<Record>> splitRecordsByOffset(List<Record> records, long offse
}


public void doLogic(List<Record> records) {
public void doLogic(List<Record> records) throws IOException, ExecutionException, InterruptedException {
List<Record> trimmedRecords;
Record record = records.get(0);

Expand Down
30 changes: 29 additions & 1 deletion src/main/java/com/clickhouse/kafka/connect/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Utils {

Expand Down Expand Up @@ -48,8 +51,17 @@ public static Exception getRootCause (Exception e, Boolean prioritizeClickHouseE
* @param e Exception to check
*/

public static void handleException(Exception e, boolean errorsTolerance) {
public static void handleException(Exception e, boolean errorsTolerance, Collection<SinkRecord> records) {
LOGGER.warn("Deciding how to handle exception: {}", e.getLocalizedMessage());
if (records != null && !records.isEmpty()) {
LOGGER.warn("Number of total records: {}", records.size());
Map<String, List<SinkRecord>> dataRecords = records.stream().collect(Collectors.groupingBy((r) -> r.topic() + "-" + r.kafkaPartition()));
for (String topicAndPartition : dataRecords.keySet()) {
LOGGER.warn("Number of records in [{}] : {}", topicAndPartition, dataRecords.get(topicAndPartition).size());
List<SinkRecord> recordsByTopicAndPartition = dataRecords.get(topicAndPartition);
LOGGER.warn("Exception context: topic: [{}], partition: [{}], offsets: [{}]", recordsByTopicAndPartition.get(0).topic(), recordsByTopicAndPartition.get(0).kafkaPartition(), getOffsets(records));
}
}

//Let's check if we have a ClickHouseException to reference the error code
//https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
Expand Down Expand Up @@ -122,4 +134,20 @@ public static String getTableName(String topicName, Map<String, String> topicToT
return escapeTopicName(tableName);
}


public static String getOffsets(Collection<SinkRecord> records) {
long minOffset = Long.MAX_VALUE;
long maxOffset = -1;

for (SinkRecord record : records) {
if (record.kafkaOffset() > maxOffset) {
maxOffset = record.kafkaOffset();
}
if (record.kafkaOffset() < minOffset) {
minOffset = record.kafkaOffset();
}
}

return String.format("minOffset: %d, maxOffset: %d", minOffset, maxOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.LongStream;

public class ProcessingTest {
Expand All @@ -47,7 +49,7 @@ private List<Record> createRecords(String topic, int partition) {

@Test
@DisplayName("ProcessAllAtOnceNewTest")
public void ProcessAllAtOnceNewTest() {
public void ProcessAllAtOnceNewTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Expand All @@ -58,7 +60,7 @@ public void ProcessAllAtOnceNewTest() {

@Test
@DisplayName("ProcessSplitNewTest")
public void ProcessSplitNewTest() {
public void ProcessSplitNewTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
int splitPoint = 11;
List<Record> recordsHead = records.subList(0, splitPoint);
Expand All @@ -75,7 +77,7 @@ public void ProcessSplitNewTest() {

@Test
@DisplayName("ProcessAllNewTwiceTest")
public void ProcessAllNewTwiceTest() {
public void ProcessAllNewTwiceTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Expand All @@ -88,7 +90,7 @@ public void ProcessAllNewTwiceTest() {

@Test
@DisplayName("ProcessAllNewFailedSetStateAfterProcessingTest")
public void ProcessAllNewFailedSetStateAfterProcessingTest() {
public void ProcessAllNewFailedSetStateAfterProcessingTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
int splitPoint = 11;
List<Record> recordsHead = records.subList(0, splitPoint);
Expand All @@ -106,7 +108,7 @@ public void ProcessAllNewFailedSetStateAfterProcessingTest() {

@Test
@DisplayName("ProcessContainsBeforeProcessingTest")
public void ProcessContainsBeforeProcessingTest() {
public void ProcessContainsBeforeProcessingTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
List<Record> containsRecords = records.subList(345,850);
StateProvider stateProvider = new InMemoryState();
Expand All @@ -123,7 +125,7 @@ public void ProcessContainsBeforeProcessingTest() {

@Test
@DisplayName("ProcessContainsAfterProcessingTest")
public void ProcessContainsAfterProcessingTest() {
public void ProcessContainsAfterProcessingTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
List<Record> containsRecords = records.subList(345,850);
StateProvider stateProvider = new InMemoryState();
Expand All @@ -138,7 +140,7 @@ public void ProcessContainsAfterProcessingTest() {

@Test
@DisplayName("ProcessContainsAfterProcessingTest")
public void ProcessOverlappingBeforeProcessingTest() {
public void ProcessOverlappingBeforeProcessingTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
List<Record> containsRecords = records.subList(345,850);
StateProvider stateProvider = new InMemoryState();
Expand All @@ -153,7 +155,7 @@ public void ProcessOverlappingBeforeProcessingTest() {

@Test
@DisplayName("ProcessSplitNewWithBeforeProcessingTest")
public void ProcessSplitNewWithBeforeProcessingTest() {
public void ProcessSplitNewWithBeforeProcessingTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
int splitPoint = 11;
List<Record> recordsHead = records.subList(0, splitPoint);
Expand All @@ -173,7 +175,7 @@ public void ProcessSplitNewWithBeforeProcessingTest() {

@Test
@DisplayName("ProcessDeletedTopicBeforeProcessingTest")
public void ProcessDeletedTopicBeforeProcessingTest() {
public void ProcessDeletedTopicBeforeProcessingTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
List<Record> containsRecords = records.subList(0,150);
StateProvider stateProvider = new InMemoryState();
Expand All @@ -190,7 +192,7 @@ public void ProcessDeletedTopicBeforeProcessingTest() {

@Test
@DisplayName("Processing with dlq test")
public void ProcessingWithDLQTest() {
public void ProcessingWithDLQTest() throws IOException, ExecutionException, InterruptedException {
InMemoryDLQ er = new InMemoryDLQ();
List<Record> records = createRecords("test", 1);
List<Record> containsRecords = records.subList(345,850);
Expand Down

0 comments on commit 59a006e

Please sign in to comment.