diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java index 8a193fc..ea0d689 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java @@ -6,6 +6,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.errors.ConnectException; import java.util.Arrays; @@ -153,8 +154,8 @@ public String getUsername() { return getString(USERNAME); } - public String getToken() { - return getString(TOKEN); + public Password getToken() { + return getPassword(TOKEN); } public boolean isTls() { diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index d116f38..b72f013 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -9,12 +9,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Date; -import org.apache.kafka.connect.data.Decimal; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.data.Time; -import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; @@ -22,12 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; public final class QuestDBSinkTask extends SinkTask { @@ -91,10 +81,10 @@ private Sender createSender() { } if (config.getToken() != null) { String username = config.getUsername(); - if (username == null || username.equals("")) { + if (username == null || username.isEmpty()) { throw new ConnectException("Username cannot be empty when using ILP authentication"); } - builder.enableAuth(username).authToken(config.getToken()); + builder.enableAuth(username).authToken(config.getToken().value()); } Sender rawSender = builder.build(); String symbolColumns = config.getSymbolColumns(); diff --git a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java new file mode 100644 index 0000000..712f2f2 --- /dev/null +++ b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java @@ -0,0 +1,77 @@ +package io.questdb.kafka; + +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.AbstractStatus; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.awaitility.Awaitility; +import org.testcontainers.containers.GenericContainer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.fail; + +public final class ConnectTestUtils { + public static final long CONNECTOR_START_TIMEOUT_MS = SECONDS.toMillis(60); + public static final String CONNECTOR_NAME = "questdb-sink-connector"; + private static final AtomicInteger ID_GEN = new AtomicInteger(0); + + private ConnectTestUtils() { + } + + static void assertConnectorTaskRunningEventually(EmbeddedConnectCluster connect) { + assertConnectorTaskStateEventually(connect, AbstractStatus.State.RUNNING); + } + + static void assertConnectorTaskFailedEventually(EmbeddedConnectCluster connect) { + assertConnectorTaskStateEventually(connect, AbstractStatus.State.FAILED); + } + + static void assertConnectorTaskStateEventually(EmbeddedConnectCluster connect, AbstractStatus.State expectedState) { + Awaitility.await().atMost(CONNECTOR_START_TIMEOUT_MS, MILLISECONDS).untilAsserted(() -> assertConnectorTaskState(connect, CONNECTOR_NAME, expectedState)); + } + + static Map baseConnectorProps(GenericContainer questDBContainer, String topicName) { + Map props = new HashMap<>(); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, QuestDBSinkConnector.class.getName()); + props.put("topics", topicName); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + props.put("host", questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT)); + return props; + } + + static void assertConnectorTaskState(EmbeddedConnectCluster connect, String connectorName, AbstractStatus.State expectedState) { + ConnectorStateInfo info = connect.connectorStatus(connectorName); + if (info == null) { + fail("Connector " + connectorName + " not found"); + } + List taskStates = info.tasks(); + if (taskStates.size() == 0) { + fail("No tasks found for connector " + connectorName); + } + for (ConnectorStateInfo.TaskState taskState : taskStates) { + if (!Objects.equals(taskState.state(), expectedState.toString())) { + fail("Task " + taskState.id() + " for connector " + connectorName + " is in state " + taskState.state() + " but expected " + expectedState); + } + } + } + + static String newTopicName() { + return "topic" + ID_GEN.getAndIncrement(); + } + + static String newTableName() { + return "table" + ID_GEN.getAndIncrement(); + } +} diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java new file mode 100644 index 0000000..c1f42b5 --- /dev/null +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java @@ -0,0 +1,89 @@ +package io.questdb.kafka; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.MountableFile; + +import java.util.Map; + +import static java.util.Collections.singletonMap; + +@Testcontainers +public class QuestDBSinkConnectorEmbeddedAuthTest { + private EmbeddedConnectCluster connect; + private Converter converter; + private String topicName; + + // must match the user in authDb.txt + private static final String TEST_USER_TOKEN = "UvuVb1USHGRRT08gEnwN2zGZrvM4MsLQ5brgF6SVkAw="; + private static final String TEST_USER_NAME = "testUser1"; + + @Container + private static GenericContainer questDBContainer = newQuestDbConnector(); + + private static GenericContainer newQuestDbConnector() { + FixedHostPortGenericContainer container = new FixedHostPortGenericContainer<>("questdb/questdb:7.3"); + container.addExposedPort(QuestDBUtils.QUESTDB_HTTP_PORT); + container.addExposedPort(QuestDBUtils.QUESTDB_ILP_PORT); + container.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*server-main enjoy.*")); + container.withCopyFileToContainer(MountableFile.forClasspathResource("/authDb.txt"), "/var/lib/questdb/conf/authDb.txt"); + container.withEnv("QDB_LINE_TCP_AUTH_DB_PATH", "conf/authDb.txt"); + return container.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))); + } + + @BeforeEach + public void setUp() { + topicName = ConnectTestUtils.newTopicName(); + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName())); + converter = jsonConverter; + + connect = new EmbeddedConnectCluster.Builder() + .name("questdb-connect-cluster") + .build(); + + connect.start(); + } + + @Test + public void testSmoke() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + props.put(QuestDBSinkConnectorConfig.USERNAME, TEST_USER_NAME); + props.put(QuestDBSinkConnectorConfig.TOKEN, TEST_USER_TOKEN); + + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + Schema schema = SchemaBuilder.struct().name("com.example.Person") + .field("firstname", Schema.STRING_SCHEMA) + .field("lastname", Schema.STRING_SCHEMA) + .field("age", Schema.INT8_SCHEMA) + .build(); + + Struct struct = new Struct(schema) + .put("firstname", "John") + .put("lastname", "Doe") + .put("age", (byte) 42); + + connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); + + QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n", + "select firstname,lastname,age from " + topicName); + } +} diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 06a289c..c1f7d1c 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -4,25 +4,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.connect.data.Date; -import org.apache.kafka.connect.data.Decimal; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.data.Time; -import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.AbstractStatus; -import org.apache.kafka.connect.runtime.Connect; -import org.apache.kafka.connect.runtime.ConnectorConfig; -import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterType; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -38,16 +28,12 @@ import java.math.BigDecimal; import java.util.Calendar; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.TimeZone; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; import static org.hamcrest.CoreMatchers.containsString; @@ -56,13 +42,8 @@ @Testcontainers public final class QuestDBSinkConnectorEmbeddedTest { - private static final String CONNECTOR_NAME = "questdb-sink-connector"; - private static final long CONNECTOR_START_TIMEOUT_MS = SECONDS.toMillis(60); - private EmbeddedConnectCluster connect; private Converter converter; - - private static final AtomicInteger ID_GEN = new AtomicInteger(0); private String topicName; @Container @@ -92,7 +73,7 @@ private static GenericContainer newQuestDbConnector(Integer httpPort, Integer @BeforeEach public void setUp() { - topicName = newTopicName(); + topicName = ConnectTestUtils.newTopicName(); JsonConverter jsonConverter = new JsonConverter(); jsonConverter.configure(singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName())); converter = jsonConverter; @@ -109,22 +90,12 @@ public void tearDown() { connect.stop(); } - private Map baseConnectorProps(String topicName) { - Map props = new HashMap<>(); - props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, QuestDBSinkConnector.class.getName()); - props.put("topics", topicName); - props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); - props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - props.put("host", questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT)); - return props; - } - @Test public void testSmoke() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -146,13 +117,13 @@ public void testSmoke() { @Test public void testDeadLetterQueue_wrongJson() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put("errors.deadletterqueue.topic.name", "dlq"); props.put("errors.deadletterqueue.topic.replication.factor", "1"); props.put("errors.tolerance", "all"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "key", "{\"not valid json}"); connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); @@ -170,10 +141,10 @@ public void testDeadLetterQueue_wrongJson() { @Test public void testSymbol() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "firstname,lastname"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -195,12 +166,12 @@ public void testSymbol() { @Test public void testRetrying_badDataStopsTheConnectorEventually() throws Exception { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.RETRY_BACKOFF_MS, "1000"); props.put(QuestDBSinkConnectorConfig.MAX_RETRIES, "5"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); // creates a record with 'age' as long connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); @@ -214,7 +185,7 @@ public void testRetrying_badDataStopsTheConnectorEventually() throws Exception { connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"str\"}"); try { - assertConnectorTaskState(CONNECTOR_NAME, AbstractStatus.State.FAILED); + ConnectTestUtils.assertConnectorTaskState(connect, ConnectTestUtils.CONNECTOR_NAME, AbstractStatus.State.FAILED); return; // ok, the connector already failed, good, we are done } catch (AssertionError e) { // not yet, maybe next-time @@ -227,12 +198,12 @@ public void testRetrying_badDataStopsTheConnectorEventually() throws Exception { @Test public void testRetrying_recoversFromInfrastructureIssues() throws Exception { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.RETRY_BACKOFF_MS, "1000"); props.put(QuestDBSinkConnectorConfig.MAX_RETRIES, "40"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "key1", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); @@ -268,14 +239,14 @@ public void testRetrying_recoversFromInfrastructureIssues() throws Exception { @Test public void testEmptyCollection_wontFailTheConnector() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); // filter out all message props.put("transforms", "drop"); props.put("transforms.drop.type", "org.apache.kafka.connect.transforms.Filter"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -292,17 +263,17 @@ public void testEmptyCollection_wontFailTheConnector() { int durationMs = 10_000; long deadline = System.currentTimeMillis() + durationMs; while (System.currentTimeMillis() < deadline) { - assertConnectorTaskState(CONNECTOR_NAME, AbstractStatus.State.RUNNING); + ConnectTestUtils.assertConnectorTaskState(connect, ConnectTestUtils.CONNECTOR_NAME, AbstractStatus.State.RUNNING); } } @Test public void testSymbol_withAllOtherILPTypes() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "firstname"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -350,10 +321,10 @@ public void testSymbol_withAllOtherILPTypes() { @Test public void testUpfrontTable_withSymbols() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "firstname,lastname"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -380,11 +351,11 @@ public void testUpfrontTable_withSymbols() { @Test public void testTimestampUnitResolution_auto() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); java.util.Date birth = new Calendar.Builder() .setTimeZone(TimeZone.getTimeZone("UTC")) @@ -439,12 +410,12 @@ private void testTimestampUnitResolution0(String mode) { throw new IllegalArgumentException("Unknown mode: " + mode); } connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); props.put(QuestDBSinkConnectorConfig.TIMESTAMP_UNITS_CONFIG, mode); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); long birthMillis = new Calendar.Builder() .setTimeZone(TimeZone.getTimeZone("UTC")) @@ -465,12 +436,12 @@ private void testTimestampUnitResolution0(String mode) { @Test public void testKafkaNativeTimestampsAndExplicitDesignatedFieldTimestampMutuallyExclusive() { - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, "true"); try { - connect.configureConnector(CONNECTOR_NAME, props); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); fail("Expected ConnectException"); } catch (ConnectException e) { assertThat(e.getMessage(), containsString("timestamp.field.name with timestamp.kafka.native")); @@ -480,12 +451,12 @@ public void testKafkaNativeTimestampsAndExplicitDesignatedFieldTimestampMutually @Test public void testKafkaNativeTimestamp() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, "true"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); QuestDBUtils.assertSql(questDBContainer, "{\"ddl\":\"OK\"}\n", @@ -513,7 +484,7 @@ public void testKafkaNativeTimestamp() { @Test public void testTimestampSMT_parseTimestamp_schemaLess() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); @@ -530,8 +501,8 @@ public void testTimestampSMT_parseTimestamp_schemaLess() { props.put("transforms.Timestamp-death.target.type", "Timestamp"); props.put("transforms.Timestamp-death.format", timestampFormat); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); QuestDBUtils.assertSql(questDBContainer, "{\"ddl\":\"OK\"}\n", @@ -555,7 +526,7 @@ public void testTimestampSMT_parseTimestamp_schemaLess() { @Test public void testTimestampSMT_parseTimestamp_withSchema() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); @@ -570,8 +541,8 @@ public void testTimestampSMT_parseTimestamp_withSchema() { props.put("transforms.Timestamp-death.target.type", "Timestamp"); props.put("transforms.Timestamp-death.format", timestampFormat); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) @@ -597,9 +568,9 @@ public void testTimestampSMT_parseTimestamp_withSchema() { @Test public void testUpfrontTable() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -626,11 +597,11 @@ public void testUpfrontTable() { @Test public void testDesignatedTimestamp_noSchema_unixEpochMillis() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":433774466123}"); @@ -642,7 +613,7 @@ public void testDesignatedTimestamp_noSchema_unixEpochMillis() { @Test public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put("transforms", "convert_birth"); props.put("transforms.convert_birth.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value"); @@ -650,8 +621,8 @@ public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp props.put("transforms.convert_birth.field", "birth"); props.put("transforms.convert_birth.format", "yyyy-MM-dd'T'HH:mm:ss.SSSX"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":\"1989-09-23T10:25:33.107Z\"}"); @@ -663,10 +634,10 @@ public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp @Test public void testDesignatedTimestamp_withSchema() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -695,11 +666,11 @@ public void testDesignatedTimestamp_withSchema() { @Test public void testDoNotIncludeKey() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -728,10 +699,10 @@ public void testDoNotIncludeKey() { @Test public void testJsonNoSchema() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" @@ -742,11 +713,11 @@ public void testJsonNoSchema() { @Test public void testJsonNoSchema_mixedFlotingAndIntTypes() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DOUBLE_COLUMNS_CONFIG, "age"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":42.5}"); @@ -760,21 +731,21 @@ public void testJsonNoSchema_mixedFlotingAndIntTypes() { @Test public void testJsonNoSchema_ArrayNotSupported() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"array\":[1,2,3]}"); - assertConnectorTaskFailedEventually(); + ConnectTestUtils.assertConnectorTaskFailedEventually(connect); } @Test public void testPrimitiveKey() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -796,15 +767,15 @@ public void testPrimitiveKey() { @Test public void testParsingStringTimestamp() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); props.put(QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSUUU z"); props.put(QuestDBSinkConnectorConfig.TIMESTAMP_STRING_FIELDS, "born,death"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); QuestDBUtils.assertSql(questDBContainer, "{\"ddl\":\"OK\"}\n", @@ -828,14 +799,14 @@ public void testParsingStringTimestamp() { @Test public void testCustomPrefixWithPrimitiveKeyAndValues() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(QuestDBSinkConnectorConfig.KEY_PREFIX_CONFIG, "col_key"); props.put(QuestDBSinkConnectorConfig.VALUE_PREFIX_CONFIG, "col_value"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "foo", "bar"); @@ -847,10 +818,10 @@ public void testCustomPrefixWithPrimitiveKeyAndValues() { @Test public void testSkipUnsupportedType_Bytes() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.SKIP_UNSUPPORTED_TYPES_CONFIG, "true"); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -872,12 +843,12 @@ public void testSkipUnsupportedType_Bytes() { @Test public void testDefaultPrefixWithPrimitiveKeyAndValues() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "foo", "bar"); @@ -889,11 +860,11 @@ public void testDefaultPrefixWithPrimitiveKeyAndValues() { @Test public void testStructKey() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); //overrider the convertor from String to Json props.put(KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -914,13 +885,13 @@ public void testStructKey() { @Test public void testStructKeyWithNoPrefix() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); //overrider the convertor from String to Json props.put(KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(QuestDBSinkConnectorConfig.KEY_PREFIX_CONFIG, ""); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -941,12 +912,12 @@ public void testStructKeyWithNoPrefix() { @Test public void testStructKeyAndPrimitiveValue() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); //overrider the convertor from String to Json props.put(KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -967,12 +938,12 @@ public void testStructKeyAndPrimitiveValue() { @Test public void testExplicitTableName() { - String tableName = newTableName(); + String tableName = ConnectTestUtils.newTableName(); connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, tableName); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -994,10 +965,10 @@ public void testExplicitTableName() { @Test public void testLogicalTypes() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, topicName); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -1043,10 +1014,10 @@ public void testLogicalTypes() { @Test public void testDecimalTypeNotSupported() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, topicName); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") .field("firstname", Schema.STRING_SCHEMA) .field("lastname", Schema.STRING_SCHEMA) @@ -1062,16 +1033,16 @@ public void testDecimalTypeNotSupported() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - assertConnectorTaskFailedEventually(); + ConnectTestUtils.assertConnectorTaskFailedEventually(connect); } @Test public void testNestedStructInValue() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, topicName); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema nameSchema = SchemaBuilder.struct().name("com.example.Name") .field("firstname", Schema.STRING_SCHEMA) @@ -1099,10 +1070,10 @@ public void testNestedStructInValue() { @Test public void testMultiLevelNestedStructInValue() { connect.kafka().createTopic(topicName, 1); - Map props = baseConnectorProps(topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, topicName); - connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorTaskRunningEventually(); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema nameSchema = SchemaBuilder.struct().name("com.example.Name") .field("firstname", Schema.STRING_SCHEMA) @@ -1137,40 +1108,4 @@ public void testMultiLevelNestedStructInValue() { + "\"John\",\"Doe\",\"Jane\",\"Doe\"\r\n", "select partner1_name_firstname, partner1_name_lastname, partner2_name_firstname, partner2_name_lastname from " + topicName); } - - private void assertConnectorTaskRunningEventually() { - assertConnectorTaskStateEventually(AbstractStatus.State.RUNNING); - } - - private void assertConnectorTaskFailedEventually() { - assertConnectorTaskStateEventually(AbstractStatus.State.FAILED); - } - - private void assertConnectorTaskStateEventually(AbstractStatus.State expectedState) { - Awaitility.await().atMost(CONNECTOR_START_TIMEOUT_MS, MILLISECONDS).untilAsserted(() -> assertConnectorTaskState(CONNECTOR_NAME, expectedState)); - } - - private void assertConnectorTaskState(String connectorName, AbstractStatus.State expectedState) { - ConnectorStateInfo info = connect.connectorStatus(connectorName); - if (info == null) { - fail("Connector " + connectorName + " not found"); - } - List taskStates = info.tasks(); - if (taskStates.size() == 0) { - fail("No tasks found for connector " + connectorName); - } - for (ConnectorStateInfo.TaskState taskState : taskStates) { - if (!Objects.equals(taskState.state(), expectedState.toString())) { - fail("Task " + taskState.id() + " for connector " + connectorName + " is in state " + taskState.state() + " but expected " + expectedState); - } - } - } - - private static String newTopicName() { - return "topic" + ID_GEN.getAndIncrement(); - } - - private static String newTableName() { - return "table" + ID_GEN.getAndIncrement(); - } } diff --git a/connector/src/test/resources/authDb.txt b/connector/src/test/resources/authDb.txt new file mode 100644 index 0000000..74452e2 --- /dev/null +++ b/connector/src/test/resources/authDb.txt @@ -0,0 +1,19 @@ +# Test auth db file, format is +# [key/user id] [key type] {key details} ... +# Only elliptic curve (for curve P-256) are supported (key type ec-p-256-sha256), the key details for such a key are the base64url encoded x and y points that determine the public key as defined in the JSON web token standard (RFC 7519) +# +# The auth db file needs to be put somewhere in the questdbn server root and referenced in the line.tcp.auth.db.path setting of server conf, like: +# line.tcp.auth.db.path=conf/authDb.txt +# +# Below is an elliptic curve (for curve P-256) JSON Web Key +#{ +# "kty": "EC", +# "d": "5UjEMuA0Pj5pjK8a-fa24dyIf-Es5mYny3oE_Wmus48", +# "crv": "P-256", +# "kid": "testUser1", +# "x": "fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU", +# "y": "Dt5tbS1dEDMSYfym3fgMv0B99szno-dFc1rYF9t0aac" +#} +# For this kind of key the "d" parameter is used to generate the secret key. The "x" and "y" parameters are used to generate the public key +testUser1 ec-p-256-sha256 AKfkxOBlqBN8uDfTxu2Oo6iNsOPBnXkEH4gt44tBJKCY AL7WVjoH-IfeX_CXo5G1xXKp_PqHUrdo3xeRyDuWNbBX +