Skip to content

Commit 913a224

Browse files
authored
fix: DQL for client-side validations failures (#29)
When an individual message fails client-side validation then we send this message to DQL (if configured)
1 parent af36753 commit 913a224

File tree

5 files changed

+197
-13
lines changed

5 files changed

+197
-13
lines changed

connector/src/main/java/io/questdb/kafka/BufferingSender.java

+18
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,24 @@ public Sender boolColumn(CharSequence name, boolean value) {
9797
return this;
9898
}
9999

100+
@Override
101+
public void cancelRow() {
102+
symbolColumnNames.clear();
103+
symbolColumnValues.clear();
104+
stringNames.clear();
105+
stringValues.clear();
106+
longNames.clear();
107+
longValues.clear();
108+
doubleNames.clear();
109+
doubleValues.clear();
110+
boolNames.clear();
111+
boolValues.clear();
112+
timestampNames.clear();
113+
timestampValues.clear();
114+
115+
sender.cancelRow();
116+
}
117+
100118
@Override
101119
public Sender timestampColumn(CharSequence name, long value, ChronoUnit unit) {
102120
if (symbolColumns.contains(name)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.questdb.kafka;
2+
3+
import io.questdb.std.NumericException;
4+
import org.apache.kafka.connect.errors.ConnectException;
5+
6+
public final class InvalidDataException extends ConnectException {
7+
public InvalidDataException(String message) {
8+
super(message);
9+
}
10+
11+
public InvalidDataException(String message, Throwable e) {
12+
super(message, e);
13+
}
14+
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

+42-11
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,22 @@ public void put(Collection<SinkRecord> collection) {
172172
if (httpTransport) {
173173
inflightSinkRecords.add(record);
174174
}
175-
handleSingleRecord(record);
175+
try {
176+
handleSingleRecord(record);
177+
} catch (InvalidDataException ex) {
178+
// data format error generated on client-side
179+
180+
if (httpTransport && reporter != null) {
181+
// we have DLQ set, let's report this single object
182+
183+
// remove the last item from in-flight records
184+
inflightSinkRecords.setPos(inflightSinkRecords.size() - 1);
185+
context.errantRecordReporter().report(record, ex);
186+
} else {
187+
// ok, no DQL, let's error the connector
188+
throw ex;
189+
}
190+
}
176191
}
177192

178193
if (httpTransport) {
@@ -257,7 +272,7 @@ private void onTcpSenderException(Exception e) {
257272
private void onHttpSenderException(Exception e) {
258273
closeSenderSilently();
259274
if (
260-
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors
275+
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors originating at server-side
261276
&& (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol"))
262277
) {
263278
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
@@ -300,16 +315,32 @@ private void handleSingleRecord(SinkRecord record) {
300315
assert timestampColumnValue == Long.MIN_VALUE;
301316

302317
CharSequence tableName = recordToTable.apply(record);
303-
sender.table(tableName);
318+
if (tableName == null || tableName.equals("")) {
319+
throw new InvalidDataException("Table name cannot be empty");
320+
}
304321

305-
if (config.isIncludeKey()) {
306-
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
322+
try {
323+
sender.table(tableName);
324+
if (config.isIncludeKey()) {
325+
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
326+
}
327+
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);
328+
} catch (InvalidDataException ex) {
329+
if (httpTransport) {
330+
sender.cancelRow();
331+
}
332+
throw ex;
333+
} catch (LineSenderException ex) {
334+
if (httpTransport) {
335+
sender.cancelRow();
336+
}
337+
throw new InvalidDataException("object contains invalid data", ex);
307338
}
308-
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);
309339

310340
if (kafkaTimestampsEnabled) {
311341
timestampColumnValue = TimeUnit.MILLISECONDS.toNanos(record.timestamp());
312342
}
343+
313344
if (timestampColumnValue == Long.MIN_VALUE) {
314345
sender.atNow();
315346
} else {
@@ -338,7 +369,7 @@ private void handleMap(String name, Map<?, ?> value, String fallbackName) {
338369
for (Map.Entry<?, ?> entry : value.entrySet()) {
339370
Object mapKey = entry.getKey();
340371
if (!(mapKey instanceof String)) {
341-
throw new ConnectException("Map keys must be strings");
372+
throw new InvalidDataException("Map keys must be strings");
342373
}
343374
String mapKeyName = (String) mapKey;
344375
String entryName = name.isEmpty() ? mapKeyName : name + STRUCT_FIELD_SEPARATOR + mapKeyName;
@@ -365,7 +396,7 @@ private void handleObject(String name, Schema schema, Object value, String fallb
365396
if (isDesignatedColumnName(name, fallbackName)) {
366397
assert timestampColumnValue == Long.MIN_VALUE;
367398
if (value == null) {
368-
throw new ConnectException("Timestamp column value cannot be null");
399+
throw new InvalidDataException("Timestamp column value cannot be null");
369400
}
370401
timestampColumnValue = resolveDesignatedTimestampColumnValue(value, schema);
371402
return;
@@ -393,7 +424,7 @@ private long resolveDesignatedTimestampColumnValue(Object value, Schema schema)
393424
return parseToMicros((String) value) * 1000;
394425
}
395426
if (!(value instanceof Long)) {
396-
throw new ConnectException("Unsupported timestamp column type: " + value.getClass());
427+
throw new InvalidDataException("Unsupported timestamp column type: " + value.getClass());
397428
}
398429
long longValue = (Long) value;
399430
TimeUnit inputUnit;
@@ -453,7 +484,7 @@ private long parseToMicros(String timestamp) {
453484
try {
454485
return dataFormat.parse(timestamp, DateFormatUtils.EN_LOCALE);
455486
} catch (NumericException e) {
456-
throw new ConnectException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '"
487+
throw new InvalidDataException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '"
457488
+ QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT + "' to configure the right timestamp format. " +
458489
"See https://questdb.io/docs/reference/function/date-time/#date-and-timestamp-format for timestamp parser documentation. ", e);
459490
}
@@ -513,7 +544,7 @@ private void onUnsupportedType(String name, Object type) {
513544
if (config.isSkipUnsupportedTypes()) {
514545
log.debug("Skipping unsupported type: {}, name: {}", type, name);
515546
} else {
516-
throw new ConnectException("Unsupported type: " + type + ", name: " + name);
547+
throw new InvalidDataException("Unsupported type: " + type + ", name: " + name);
517548
}
518549
}
519550

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

+122-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
public final class QuestDBSinkConnectorEmbeddedTest {
5353
private static int httpPort = -1;
5454
private static int ilpPort = -1;
55-
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.1.1";
55+
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.2.0";
5656
private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true;
5757

5858
private EmbeddedConnectCluster connect;
@@ -248,6 +248,127 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) {
248248
Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value()));
249249
}
250250

251+
@Test
252+
public void testDeadLetterQueue_invalidTableName() {
253+
connect.kafka().createTopic(topicName, 1);
254+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
255+
props.put("errors.deadletterqueue.topic.name", "dlq");
256+
props.put("errors.deadletterqueue.topic.replication.factor", "1");
257+
props.put("errors.tolerance", "all");
258+
props.put("value.converter.schemas.enable", "false");
259+
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}");
260+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
261+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
262+
263+
// we send this with an invalid key - contains dots
264+
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":88}";
265+
266+
connect.kafka().produce(topicName, topicName, "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
267+
connect.kafka().produce(topicName, "k,e,y", badObjectString);
268+
connect.kafka().produce(topicName, topicName, "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");
269+
270+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
271+
+ "\"John\",\"Doe\",42\r\n"
272+
+ "\"Jane\",\"Doe\",41\r\n",
273+
"select firstname,lastname,age from " + topicName,
274+
httpPort);
275+
276+
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
277+
Assertions.assertEquals(1, fetchedRecords.count());
278+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
279+
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
280+
}
281+
282+
@Test
283+
public void testDeadLetterQueue_invalidColumnName() {
284+
connect.kafka().createTopic(topicName, 1);
285+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
286+
props.put("errors.deadletterqueue.topic.name", "dlq");
287+
props.put("errors.deadletterqueue.topic.replication.factor", "1");
288+
props.put("errors.tolerance", "all");
289+
props.put("value.converter.schemas.enable", "false");
290+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
291+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
292+
293+
// invalid column - contains a star
294+
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"a*g*e\":88}";
295+
296+
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
297+
connect.kafka().produce(topicName, "key", badObjectString);
298+
connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");
299+
300+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
301+
+ "\"John\",\"Doe\",42\r\n"
302+
+ "\"Jane\",\"Doe\",41\r\n",
303+
"select firstname,lastname,age from " + topicName,
304+
httpPort);
305+
306+
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
307+
Assertions.assertEquals(1, fetchedRecords.count());
308+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
309+
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
310+
}
311+
312+
@Test
313+
public void testDeadLetterQueue_unsupportedType() {
314+
connect.kafka().createTopic(topicName, 1);
315+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
316+
props.put("errors.deadletterqueue.topic.name", "dlq");
317+
props.put("errors.deadletterqueue.topic.replication.factor", "1");
318+
props.put("errors.tolerance", "all");
319+
props.put("value.converter.schemas.enable", "false");
320+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
321+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
322+
323+
// contains array - not supported
324+
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":[1, 2, 3]}";
325+
326+
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
327+
connect.kafka().produce(topicName, "key", badObjectString);
328+
connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");
329+
330+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
331+
+ "\"John\",\"Doe\",42\r\n"
332+
+ "\"Jane\",\"Doe\",41\r\n",
333+
"select firstname,lastname,age from " + topicName,
334+
httpPort);
335+
336+
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
337+
Assertions.assertEquals(1, fetchedRecords.count());
338+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
339+
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
340+
}
341+
342+
@Test
343+
public void testDeadLetterQueue_emptyTable() {
344+
connect.kafka().createTopic(topicName, 1);
345+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
346+
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}");
347+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
348+
props.put("value.converter.schemas.enable", "false");
349+
props.put("errors.deadletterqueue.topic.name", "dlq");
350+
props.put("errors.deadletterqueue.topic.replication.factor", "1");
351+
props.put("errors.tolerance", "all");
352+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
353+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
354+
355+
connect.kafka().produce(topicName, "tab", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
356+
String emptyRecordValue = "{\"firstname\":\"empty\",\"lastname\":\"\",\"age\":-41}";
357+
connect.kafka().produce(topicName, "", emptyRecordValue);
358+
connect.kafka().produce(topicName, "tab", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");
359+
360+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
361+
+ "\"John\",\"Doe\",42\r\n"
362+
+ "\"Jane\",\"Doe\",41\r\n",
363+
"select firstname,lastname,age from tab",
364+
httpPort);
365+
366+
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
367+
Assertions.assertEquals(1, fetchedRecords.count());
368+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
369+
Assertions.assertEquals(emptyRecordValue, new String(iterator.next().value()));
370+
}
371+
251372
@Test
252373
public void testDeadLetterQueue_badColumnType() {
253374
connect.kafka().createTopic(topicName, 1);

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
<dependency>
8585
<groupId>org.questdb</groupId>
8686
<artifactId>questdb</artifactId>
87-
<version>7.4.0</version>
87+
<version>8.2.0</version>
8888
</dependency>
8989
<dependency>
9090
<groupId>org.junit.jupiter</groupId>

0 commit comments

Comments
 (0)