diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 789140c..cdd3ea8 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -23,12 +23,14 @@ import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.function.Function; public final class QuestDBSinkTask extends SinkTask { private static final char STRUCT_FIELD_SEPARATOR = '_'; private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key"; private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value"; + private Function recordToTable; private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class); private Sender sender; private QuestDBSinkConnectorConfig config; @@ -83,6 +85,7 @@ public void start(Map map) { this.timestampUnits = config.getTimestampUnitsOrNull(); this.allowedLag = config.getAllowedLag(); this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; + this.recordToTable = Templating.newTableTableFn(config.getTable()); } private Sender createRawSender() { @@ -247,8 +250,7 @@ private void closeSenderSilently() { private void handleSingleRecord(SinkRecord record) { assert timestampColumnValue == Long.MIN_VALUE; - String explicitTable = config.getTable(); - String tableName = explicitTable == null ? record.topic() : explicitTable; + CharSequence tableName = recordToTable.apply(record); sender.table(tableName); if (config.isIncludeKey()) { diff --git a/connector/src/main/java/io/questdb/kafka/Templating.java b/connector/src/main/java/io/questdb/kafka/Templating.java new file mode 100644 index 0000000..353c3e2 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/Templating.java @@ -0,0 +1,77 @@ +package io.questdb.kafka; + +import io.questdb.std.str.StringSink; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +final class Templating { + private Templating() { + } + + static Function newTableTableFn(String template) { + if (template == null || template.isEmpty()) { + return SinkRecord::topic; + } + int currentPos = 0; + List> partials = null; + for (;;) { + int templateStartPos = template.indexOf("${", currentPos); + if (templateStartPos == -1) { + break; + } + int templateEndPos = template.indexOf('}', templateStartPos + 2); + if (templateEndPos == -1) { + throw new ConnectException("Unbalanced brackets in a table template, missing closing '}', table template: '" + template + "'"); + } + int nextTemplateStartPos = template.indexOf("${", templateStartPos + 1); + if (nextTemplateStartPos != -1 && nextTemplateStartPos < templateEndPos) { + throw new ConnectException("Nesting templates in a table name are not supported, table template: '" + template + "'"); + } + String templateName = template.substring(templateStartPos + 2, templateEndPos); + if (templateName.isEmpty()) { + throw new ConnectException("Empty template in table name, table template: '" + template + "'"); + } + if (partials == null) { + partials = new ArrayList<>(); + } + String literal = template.substring(currentPos, templateStartPos); + if (!literal.isEmpty()) { + partials.add(record -> literal); + } + switch (templateName) { + case "topic": { + partials.add(SinkRecord::topic); + break; + } + case "key": { + partials.add(record -> record.key() == null ? "null" : record.key().toString()); + break; + } + default: { + throw new ConnectException("Unknown template in table name, table template: '" + template + "'"); + } + } + currentPos = templateEndPos + 1; + } + if (partials == null) { + return record -> template; + } + String literal = template.substring(currentPos); + if (!literal.isEmpty()) { + partials.add(record -> literal); + } + List> finalPartials = partials; + StringSink sink = new StringSink(); + return record -> { + sink.clear(); + for (Function partial : finalPartials) { + sink.put(partial.apply(record)); + } + return sink; + }; + } +} diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index a67915a..84b39e8 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -160,6 +160,68 @@ public void testSmoke(boolean useHttp) { httpPort); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTableTemplateWithKey_withSchema(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${topic}.${key}"); + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + Schema schema = SchemaBuilder.struct().name("com.example.Person") + .field("firstname", Schema.STRING_SCHEMA) + .field("lastname", Schema.STRING_SCHEMA) + .field("age", Schema.INT8_SCHEMA) + .build(); + + Struct john = new Struct(schema) + .put("firstname", "John") + .put("lastname", "Doe") + .put("age", (byte) 42); + + Struct jane = new Struct(schema) + .put("firstname", "Jane") + .put("lastname", "Doe") + .put("age", (byte) 41); + + connect.kafka().produce(topicName, "john", new String(converter.fromConnectData(topicName, schema, john))); + connect.kafka().produce(topicName, "jane", new String(converter.fromConnectData(topicName, schema, jane))); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n", + "select firstname,lastname,age from " + topicName + "." + "john", + httpPort); + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from " + topicName + "." + "jane", + httpPort); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTableTemplateWithKey_schemaless(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "literal_${topic}_literal_${key}_literal"); + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + connect.kafka().produce(topicName, "john", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); + connect.kafka().produce(topicName, "jane", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}"); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n", + "select firstname,lastname,age from literal_" + topicName + "_literal_" + "john_literal", + httpPort); + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from literal_" + topicName + "_literal_" + "jane_literal", + httpPort); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testDeadLetterQueue_wrongJson(boolean useHttp) { diff --git a/connector/src/test/java/io/questdb/kafka/TemplatingTest.java b/connector/src/test/java/io/questdb/kafka/TemplatingTest.java new file mode 100644 index 0000000..7b426b7 --- /dev/null +++ b/connector/src/test/java/io/questdb/kafka/TemplatingTest.java @@ -0,0 +1,105 @@ +package io.questdb.kafka; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Assert; +import org.junit.Test; + +import java.util.function.Function; + +public class TemplatingTest { + + @Test + public void testPlainTableName() { + Function fn = Templating.newTableTableFn("table"); + SinkRecord record = newSinkRecord("topic", "key"); + assertTableName(fn, record, "table"); + } + + @Test + public void testEmptyTableName() { + Function fn = Templating.newTableTableFn(""); + SinkRecord record = newSinkRecord("topic", "key"); + assertTableName(fn, record, "topic"); + } + + @Test + public void testNullTableName() { + Function fn = Templating.newTableTableFn(null); + SinkRecord record = newSinkRecord("topic", "key"); + assertTableName(fn, record, "topic"); + } + + @Test + public void testSimpleTopicTemplate() { + Function fn = Templating.newTableTableFn("${topic}"); + SinkRecord record = newSinkRecord("mytopic", "key"); + assertTableName(fn, record, "mytopic"); + } + + @Test + public void testTopicWithKeyTemplates() { + Function fn = Templating.newTableTableFn("${topic}_${key}"); + SinkRecord record = newSinkRecord("mytopic", "mykey"); + assertTableName(fn, record, "mytopic_mykey"); + } + + @Test + public void testTopicWithNullKey() { + Function fn = Templating.newTableTableFn("${topic}_${key}"); + SinkRecord record = newSinkRecord("mytopic", null); + assertTableName(fn, record, "mytopic_null"); + } + + @Test + public void testMissingClosingBrackets() { + assertIllegalTemplate("${topic", "Unbalanced brackets in a table template, missing closing '}', table template: '${topic'"); + } + + @Test + public void testOverlappingTemplates() { + assertIllegalTemplate("${topic${key}", "Nesting templates in a table name are not supported, table template: '${topic${key}'"); + } + + @Test + public void testEmptyTemplate() { + assertIllegalTemplate("${}", "Empty template in table name, table template: '${}'"); + } + + @Test + public void testIllegalTemplate() { + assertIllegalTemplate("${unknown}", "Unknown template in table name, table template: '${unknown}'"); + } + + @Test + public void testSuffixLiteral() { + Function fn = Templating.newTableTableFn("${topic}_suffix"); + SinkRecord record = newSinkRecord("mytopic", "key"); + assertTableName(fn, record, "mytopic_suffix"); + } + + private static void assertIllegalTemplate(String template, String expectedMessage) { + try { + Templating.newTableTableFn(template); + Assert.fail(); + } catch (ConnectException e) { + Assert.assertEquals(expectedMessage, e.getMessage()); + } + } + + @Test + public void testTopicWithEmptyKey() { + Function fn = Templating.newTableTableFn("${topic}_${key}"); + SinkRecord record = newSinkRecord("mytopic", ""); + assertTableName(fn, record, "mytopic_"); + } + + private static void assertTableName(Function fn, SinkRecord record, String expectedTable) { + Assert.assertEquals(expectedTable, fn.apply(record).toString()); + } + + private static SinkRecord newSinkRecord(String topic, String key) { + return new SinkRecord(topic, 0, null, key, null, null, 0); + } + +}