Skip to content

Commit 45e4b7c

Browse files
authored
feat: support for templating in table names (#24)
This changeset introduces a simple templating language in the target `table` configuration. Previously, there were two options: 1. Unset Table Configuration (=default): The connector used the same QuestDB table as the topic name from which the message originated. When the connector was set to listen on multiple topics, it ingested into multiple QuestDB tables. 2. Explicit Table Configuration: The connector ingested into the one configured table, regardless of the topic from which the messages originated. This change allows the use of templates in table configurations. For example, setting table to `${topic}` will cause ingestion into the table named after the topic from which the message originated. This behavior mirrors the scenario where the table configuration is not set. However, it also supports more advanced scenarios, such as `${topic}_${key}`, where key is a string representation of the message key. Supported Placeholders: 1. `${topic}` 2. `${key}` - string representation of the message key or 'null' More placeholders may be added in the future. Caveats: 1. The key is intended for use with simple values and not with complex objects such as structs or arrays. 2. Using `${key}` can result in a large number of tables. QuestDB might require tuning when handling thousands of tables.
1 parent 35fb9bd commit 45e4b7c

File tree

4 files changed

+248
-2
lines changed

4 files changed

+248
-2
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
import java.time.temporal.ChronoUnit;
2424
import java.util.*;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.function.Function;
2627

2728
public final class QuestDBSinkTask extends SinkTask {
2829
private static final char STRUCT_FIELD_SEPARATOR = '_';
2930
private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key";
3031
private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value";
3132

33+
private Function<SinkRecord, ? extends CharSequence> recordToTable;
3234
private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class);
3335
private Sender sender;
3436
private QuestDBSinkConnectorConfig config;
@@ -83,6 +85,7 @@ public void start(Map<String, String> map) {
8385
this.timestampUnits = config.getTimestampUnitsOrNull();
8486
this.allowedLag = config.getAllowedLag();
8587
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
88+
this.recordToTable = Templating.newTableTableFn(config.getTable());
8689
}
8790

8891
private Sender createRawSender() {
@@ -247,8 +250,7 @@ private void closeSenderSilently() {
247250
private void handleSingleRecord(SinkRecord record) {
248251
assert timestampColumnValue == Long.MIN_VALUE;
249252

250-
String explicitTable = config.getTable();
251-
String tableName = explicitTable == null ? record.topic() : explicitTable;
253+
CharSequence tableName = recordToTable.apply(record);
252254
sender.table(tableName);
253255

254256
if (config.isIncludeKey()) {
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.questdb.kafka;
2+
3+
import io.questdb.std.str.StringSink;
4+
import org.apache.kafka.connect.errors.ConnectException;
5+
import org.apache.kafka.connect.sink.SinkRecord;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.function.Function;
10+
11+
final class Templating {
12+
private Templating() {
13+
}
14+
15+
static Function<SinkRecord, ? extends CharSequence> newTableTableFn(String template) {
16+
if (template == null || template.isEmpty()) {
17+
return SinkRecord::topic;
18+
}
19+
int currentPos = 0;
20+
List<Function<SinkRecord, String>> partials = null;
21+
for (;;) {
22+
int templateStartPos = template.indexOf("${", currentPos);
23+
if (templateStartPos == -1) {
24+
break;
25+
}
26+
int templateEndPos = template.indexOf('}', templateStartPos + 2);
27+
if (templateEndPos == -1) {
28+
throw new ConnectException("Unbalanced brackets in a table template, missing closing '}', table template: '" + template + "'");
29+
}
30+
int nextTemplateStartPos = template.indexOf("${", templateStartPos + 1);
31+
if (nextTemplateStartPos != -1 && nextTemplateStartPos < templateEndPos) {
32+
throw new ConnectException("Nesting templates in a table name are not supported, table template: '" + template + "'");
33+
}
34+
String templateName = template.substring(templateStartPos + 2, templateEndPos);
35+
if (templateName.isEmpty()) {
36+
throw new ConnectException("Empty template in table name, table template: '" + template + "'");
37+
}
38+
if (partials == null) {
39+
partials = new ArrayList<>();
40+
}
41+
String literal = template.substring(currentPos, templateStartPos);
42+
if (!literal.isEmpty()) {
43+
partials.add(record -> literal);
44+
}
45+
switch (templateName) {
46+
case "topic": {
47+
partials.add(SinkRecord::topic);
48+
break;
49+
}
50+
case "key": {
51+
partials.add(record -> record.key() == null ? "null" : record.key().toString());
52+
break;
53+
}
54+
default: {
55+
throw new ConnectException("Unknown template in table name, table template: '" + template + "'");
56+
}
57+
}
58+
currentPos = templateEndPos + 1;
59+
}
60+
if (partials == null) {
61+
return record -> template;
62+
}
63+
String literal = template.substring(currentPos);
64+
if (!literal.isEmpty()) {
65+
partials.add(record -> literal);
66+
}
67+
List<Function<SinkRecord, String>> finalPartials = partials;
68+
StringSink sink = new StringSink();
69+
return record -> {
70+
sink.clear();
71+
for (Function<SinkRecord, String> partial : finalPartials) {
72+
sink.put(partial.apply(record));
73+
}
74+
return sink;
75+
};
76+
}
77+
}

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,68 @@ public void testSmoke(boolean useHttp) {
160160
httpPort);
161161
}
162162

163+
@ParameterizedTest
164+
@ValueSource(booleans = {true, false})
165+
public void testTableTemplateWithKey_withSchema(boolean useHttp) {
166+
connect.kafka().createTopic(topicName, 1);
167+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
168+
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${topic}.${key}");
169+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
170+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
171+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
172+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
173+
.field("firstname", Schema.STRING_SCHEMA)
174+
.field("lastname", Schema.STRING_SCHEMA)
175+
.field("age", Schema.INT8_SCHEMA)
176+
.build();
177+
178+
Struct john = new Struct(schema)
179+
.put("firstname", "John")
180+
.put("lastname", "Doe")
181+
.put("age", (byte) 42);
182+
183+
Struct jane = new Struct(schema)
184+
.put("firstname", "Jane")
185+
.put("lastname", "Doe")
186+
.put("age", (byte) 41);
187+
188+
connect.kafka().produce(topicName, "john", new String(converter.fromConnectData(topicName, schema, john)));
189+
connect.kafka().produce(topicName, "jane", new String(converter.fromConnectData(topicName, schema, jane)));
190+
191+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
192+
+ "\"John\",\"Doe\",42\r\n",
193+
"select firstname,lastname,age from " + topicName + "." + "john",
194+
httpPort);
195+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
196+
+ "\"Jane\",\"Doe\",41\r\n",
197+
"select firstname,lastname,age from " + topicName + "." + "jane",
198+
httpPort);
199+
}
200+
201+
@ParameterizedTest
202+
@ValueSource(booleans = {true, false})
203+
public void testTableTemplateWithKey_schemaless(boolean useHttp) {
204+
connect.kafka().createTopic(topicName, 1);
205+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
206+
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "literal_${topic}_literal_${key}_literal");
207+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
208+
props.put("value.converter.schemas.enable", "false");
209+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
210+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
211+
212+
connect.kafka().produce(topicName, "john", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
213+
connect.kafka().produce(topicName, "jane", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");
214+
215+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
216+
+ "\"John\",\"Doe\",42\r\n",
217+
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "john_literal",
218+
httpPort);
219+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
220+
+ "\"Jane\",\"Doe\",41\r\n",
221+
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "jane_literal",
222+
httpPort);
223+
}
224+
163225
@ParameterizedTest
164226
@ValueSource(booleans = {true, false})
165227
public void testDeadLetterQueue_wrongJson(boolean useHttp) {
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package io.questdb.kafka;
2+
3+
import org.apache.kafka.connect.errors.ConnectException;
4+
import org.apache.kafka.connect.sink.SinkRecord;
5+
import org.junit.Assert;
6+
import org.junit.Test;
7+
8+
import java.util.function.Function;
9+
10+
public class TemplatingTest {
11+
12+
@Test
13+
public void testPlainTableName() {
14+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("table");
15+
SinkRecord record = newSinkRecord("topic", "key");
16+
assertTableName(fn, record, "table");
17+
}
18+
19+
@Test
20+
public void testEmptyTableName() {
21+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("");
22+
SinkRecord record = newSinkRecord("topic", "key");
23+
assertTableName(fn, record, "topic");
24+
}
25+
26+
@Test
27+
public void testNullTableName() {
28+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn(null);
29+
SinkRecord record = newSinkRecord("topic", "key");
30+
assertTableName(fn, record, "topic");
31+
}
32+
33+
@Test
34+
public void testSimpleTopicTemplate() {
35+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}");
36+
SinkRecord record = newSinkRecord("mytopic", "key");
37+
assertTableName(fn, record, "mytopic");
38+
}
39+
40+
@Test
41+
public void testTopicWithKeyTemplates() {
42+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}");
43+
SinkRecord record = newSinkRecord("mytopic", "mykey");
44+
assertTableName(fn, record, "mytopic_mykey");
45+
}
46+
47+
@Test
48+
public void testTopicWithNullKey() {
49+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}");
50+
SinkRecord record = newSinkRecord("mytopic", null);
51+
assertTableName(fn, record, "mytopic_null");
52+
}
53+
54+
@Test
55+
public void testMissingClosingBrackets() {
56+
assertIllegalTemplate("${topic", "Unbalanced brackets in a table template, missing closing '}', table template: '${topic'");
57+
}
58+
59+
@Test
60+
public void testOverlappingTemplates() {
61+
assertIllegalTemplate("${topic${key}", "Nesting templates in a table name are not supported, table template: '${topic${key}'");
62+
}
63+
64+
@Test
65+
public void testEmptyTemplate() {
66+
assertIllegalTemplate("${}", "Empty template in table name, table template: '${}'");
67+
}
68+
69+
@Test
70+
public void testIllegalTemplate() {
71+
assertIllegalTemplate("${unknown}", "Unknown template in table name, table template: '${unknown}'");
72+
}
73+
74+
@Test
75+
public void testSuffixLiteral() {
76+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_suffix");
77+
SinkRecord record = newSinkRecord("mytopic", "key");
78+
assertTableName(fn, record, "mytopic_suffix");
79+
}
80+
81+
private static void assertIllegalTemplate(String template, String expectedMessage) {
82+
try {
83+
Templating.newTableTableFn(template);
84+
Assert.fail();
85+
} catch (ConnectException e) {
86+
Assert.assertEquals(expectedMessage, e.getMessage());
87+
}
88+
}
89+
90+
@Test
91+
public void testTopicWithEmptyKey() {
92+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}");
93+
SinkRecord record = newSinkRecord("mytopic", "");
94+
assertTableName(fn, record, "mytopic_");
95+
}
96+
97+
private static void assertTableName(Function<SinkRecord, ? extends CharSequence> fn, SinkRecord record, String expectedTable) {
98+
Assert.assertEquals(expectedTable, fn.apply(record).toString());
99+
}
100+
101+
private static SinkRecord newSinkRecord(String topic, String key) {
102+
return new SinkRecord(topic, 0, null, key, null, null, 0);
103+
}
104+
105+
}

0 commit comments

Comments
 (0)