Skip to content

Commit b4510d1

Browse files
authored
fix: records with data type mismatching goes to DLQ instead of failing the connector (#27)
* fix: records with data type mismatching go to DLQ instead of failing the connector fixes #26 this is a first impl. it could be optimized further, but I assume bad data are rare and it already does the job as it is. there is a potential issue with `inflightSinkRecords` referencing all in-flight data. in theory, this can +- double memory consumption. but we need the original SinkRecord so we can send them to DLQ.
1 parent 59e21ae commit b4510d1

File tree

3 files changed

+135
-7
lines changed

3 files changed

+135
-7
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,9 @@ jobs:
1818
distribution: 'temurin'
1919
cache: maven
2020
- name: Build with Maven
21-
run: mvn -B package --file pom.xml
21+
run: mvn -B package --file pom.xml
22+
- name: Archive connector artifact
23+
uses: actions/upload-artifact@v4
24+
with:
25+
name: connector-snapshot
26+
path: connector/target/kafka-questdb-connector-*-bin.zip

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.questdb.cutlass.http.client.HttpClientException;
55
import io.questdb.cutlass.line.LineSenderException;
66
import io.questdb.std.NumericException;
7+
import io.questdb.std.ObjList;
78
import io.questdb.std.datetime.DateFormat;
89
import io.questdb.std.datetime.microtime.Timestamps;
910
import io.questdb.std.datetime.millitime.DateFormatUtils;
@@ -15,6 +16,7 @@
1516
import org.apache.kafka.connect.data.*;
1617
import org.apache.kafka.connect.errors.ConnectException;
1718
import org.apache.kafka.connect.errors.RetriableException;
19+
import org.apache.kafka.connect.sink.ErrantRecordReporter;
1820
import org.apache.kafka.connect.sink.SinkRecord;
1921
import org.apache.kafka.connect.sink.SinkTask;
2022
import org.slf4j.Logger;
@@ -48,6 +50,8 @@ public final class QuestDBSinkTask extends SinkTask {
4850
private long nextFlushNanos;
4951
private int pendingRows;
5052
private final FlushConfig flushConfig = new FlushConfig();
53+
private final ObjList<SinkRecord> inflightSinkRecords = new ObjList<>();
54+
private ErrantRecordReporter reporter;
5155

5256
@Override
5357
public String version() {
@@ -86,6 +90,12 @@ public void start(Map<String, String> map) {
8690
this.allowedLag = config.getAllowedLag();
8791
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
8892
this.recordToTable = Templating.newTableTableFn(config.getTable());
93+
try {
94+
reporter = context.errantRecordReporter();
95+
} catch (NoSuchMethodError | NoClassDefFoundError e) {
96+
// Kafka older than 2.6
97+
reporter = null;
98+
}
8999
}
90100

91101
private Sender createRawSender() {
@@ -159,6 +169,9 @@ public void put(Collection<SinkRecord> collection) {
159169
sender = createSender();
160170
}
161171
for (SinkRecord record : collection) {
172+
if (httpTransport) {
173+
inflightSinkRecords.add(record);
174+
}
162175
handleSingleRecord(record);
163176
}
164177

@@ -208,22 +221,27 @@ public void put(Collection<SinkRecord> collection) {
208221
private void flushAndResetCounters() {
209222
log.debug("Flushing data to QuestDB");
210223
try {
211-
sender.flush();
224+
if (sender != null) {
225+
sender.flush();
226+
}
212227
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
213228
pendingRows = 0;
214229
} catch (LineSenderException | HttpClientException e) {
215230
onSenderException(e);
231+
} finally {
232+
inflightSinkRecords.clear();
216233
}
217234
}
218235

219236
private void onSenderException(Exception e) {
220237
if (httpTransport) {
221-
closeSenderSilently();
222-
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
223-
pendingRows = 0;
224-
throw new ConnectException("Failed to send data to QuestDB", e);
238+
onHttpSenderException(e);
239+
} else {
240+
onTcpSenderException(e);
225241
}
242+
}
226243

244+
private void onTcpSenderException(Exception e) {
227245
batchesSinceLastError = 0;
228246
if (--remainingRetries > 0) {
229247
closeSenderSilently();
@@ -235,6 +253,36 @@ private void onSenderException(Exception e) {
235253
}
236254
}
237255

256+
private void onHttpSenderException(Exception e) {
257+
closeSenderSilently();
258+
if (
259+
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors
260+
&& (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol"))
261+
) {
262+
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
263+
// and we will report it to the error handler. the rest of the records will make it to QuestDB
264+
sender = createSender();
265+
for (int i = 0; i < inflightSinkRecords.size(); i++) {
266+
SinkRecord sinkRecord = inflightSinkRecords.get(i);
267+
try {
268+
handleSingleRecord(sinkRecord);
269+
sender.flush();
270+
} catch (Exception ex) {
271+
context.errantRecordReporter().report(sinkRecord, ex);
272+
closeSenderSilently();
273+
sender = createSender();
274+
}
275+
}
276+
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
277+
pendingRows = 0;
278+
} else {
279+
// ok, this is not a parsing error, let's just close the sender and rethrow the exception
280+
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
281+
pendingRows = 0;
282+
throw new ConnectException("Failed to send data to QuestDB", e);
283+
}
284+
}
285+
238286
private void closeSenderSilently() {
239287
if (sender != null) {
240288
try {

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
public final class QuestDBSinkConnectorEmbeddedTest {
5353
private static int httpPort = -1;
5454
private static int ilpPort = -1;
55-
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:7.4.0";
55+
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.1.1";
5656
private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true;
5757

5858
private EmbeddedConnectCluster connect;
@@ -248,6 +248,81 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) {
248248
Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value()));
249249
}
250250

251+
@Test
252+
public void testDeadLetterQueue_badColumnType() {
253+
connect.kafka().createTopic(topicName, 1);
254+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
255+
props.put("value.converter.schemas.enable", "false");
256+
props.put("errors.deadletterqueue.topic.name", "dlq");
257+
props.put("errors.deadletterqueue.topic.replication.factor", "1");
258+
props.put("errors.tolerance", "all");
259+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
260+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
261+
262+
QuestDBUtils.assertSql(
263+
"{\"ddl\":\"OK\"}",
264+
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
265+
httpPort,
266+
QuestDBUtils.Endpoint.EXEC);
267+
268+
String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
269+
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
270+
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
271+
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
272+
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
273+
274+
// interleave good and bad records
275+
connect.kafka().produce(topicName, "key", goodRecordA);
276+
connect.kafka().produce(topicName, "key", badRecordA);
277+
connect.kafka().produce(topicName, "key", goodRecordB);
278+
connect.kafka().produce(topicName, "key", badRecordB);
279+
connect.kafka().produce(topicName, "key", goodRecordC);
280+
281+
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(2, 120_000, "dlq");
282+
Assertions.assertEquals(2, fetchedRecords.count());
283+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
284+
Assertions.assertEquals(badRecordA, new String(iterator.next().value()));
285+
Assertions.assertEquals(badRecordB, new String(iterator.next().value()));
286+
287+
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"id\"\r\n"
288+
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d041\r\n"
289+
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d042\r\n"
290+
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d043\r\n",
291+
"select firstname,lastname,age, id from " + topicName,
292+
httpPort);
293+
294+
}
295+
296+
@Test
297+
public void testbadColumnType_noDLQ() {
298+
connect.kafka().createTopic(topicName, 1);
299+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
300+
props.put("value.converter.schemas.enable", "false");
301+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
302+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
303+
304+
QuestDBUtils.assertSql(
305+
"{\"ddl\":\"OK\"}",
306+
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
307+
httpPort,
308+
QuestDBUtils.Endpoint.EXEC);
309+
310+
String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
311+
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
312+
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
313+
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
314+
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
315+
316+
// interleave good and bad records
317+
connect.kafka().produce(topicName, "key", goodRecordA);
318+
connect.kafka().produce(topicName, "key", badRecordA);
319+
connect.kafka().produce(topicName, "key", goodRecordB);
320+
connect.kafka().produce(topicName, "key", badRecordB);
321+
connect.kafka().produce(topicName, "key", goodRecordC);
322+
323+
ConnectTestUtils.assertConnectorTaskStateEventually(connect, AbstractStatus.State.FAILED);
324+
}
325+
251326
@ParameterizedTest
252327
@ValueSource(booleans = {true, false})
253328
public void testSymbol(boolean useHttp) {

0 commit comments

Comments
 (0)