From 4ee853f5becc9f3b257b53774b1f3ec8945a9c68 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 23 May 2024 18:12:51 +0200 Subject: [PATCH 1/4] feat: connectors uses auto flush configuration from client config string --- .../io/questdb/kafka/ClientConfUtils.java | 130 ++++++++++++------ .../java/io/questdb/kafka/FlushConfig.java | 13 ++ .../io/questdb/kafka/QuestDBSinkTask.java | 61 +++++--- .../io/questdb/kafka/ClientConfUtilsTest.java | 74 +++++++--- .../questdb/kafka/QuestDBSinkConnectorIT.java | 2 +- .../java/io/questdb/kafka/ExactlyOnceIT.java | 2 +- 6 files changed, 202 insertions(+), 80 deletions(-) create mode 100644 connector/src/main/java/io/questdb/kafka/FlushConfig.java diff --git a/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java index 6374cf1..65f0f1d 100644 --- a/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java +++ b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java @@ -2,65 +2,117 @@ import io.questdb.client.impl.ConfStringParser; import io.questdb.std.Chars; +import io.questdb.std.Misc; +import io.questdb.std.Numbers; +import io.questdb.std.NumericException; import io.questdb.std.str.StringSink; +import org.apache.kafka.common.config.ConfigException; final class ClientConfUtils { private ClientConfUtils() { } - static boolean patchConfStr(String confStr, StringSink sink) { - int pos = ConfStringParser.of(confStr, sink); + + static boolean patchConfStr(String confStr, StringSink sink, FlushConfig flushConfig) { + flushConfig.reset(); + + sink.clear(); + StringSink tmpSink = Misc.getThreadLocalSink(); + int pos = ConfStringParser.of(confStr, tmpSink); if (pos < 0) { - sink.clear(); sink.put(confStr); return false; } - boolean isHttpTransport = Chars.equals(sink, "http") || Chars.equals(sink, "https"); - boolean intervalFlushSetExplicitly = false; - boolean flushesDisabled = false; - boolean parseError = false; - boolean hasAtLeastOneParam = false; + boolean isHttpTransport = Chars.equals(tmpSink, "http") || Chars.equals(tmpSink, "https"); + if (!isHttpTransport) { + sink.put(confStr); + // no patching for TCP transport + return false; + } + sink.put(tmpSink).put("::"); - // disable interval based flushes - // unless they are explicitly set or auto_flush is entirely off - // why? the connector has its own mechanism to flush data in a timely manner + boolean hasAtLeastOneParam = false; while (ConfStringParser.hasNext(confStr, pos)) { hasAtLeastOneParam = true; - pos = ConfStringParser.nextKey(confStr, pos, sink); + pos = ConfStringParser.nextKey(confStr, pos, tmpSink); if (pos < 0) { - parseError = true; - break; + sink.clear(); + sink.put(confStr); + return true; } - if (Chars.equals(sink, "auto_flush_interval")) { - intervalFlushSetExplicitly = true; - pos = ConfStringParser.value(confStr, pos, sink); - } else if (Chars.equals(sink, "auto_flush")) { - pos = ConfStringParser.value(confStr, pos, sink); - flushesDisabled = Chars.equals(sink, "off"); + if (Chars.equals(tmpSink, "auto_flush_interval")) { + pos = ConfStringParser.value(confStr, pos, tmpSink); + if (pos < 0) { + sink.clear(); + sink.put(confStr); + // invalid config, let the real client parser to fail + return true; + } + if (Chars.equals(tmpSink, "off")) { + throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_interval disabled"); + } + try { + flushConfig.autoFlushNanos = Numbers.parseLong(tmpSink); + } catch (NumericException e) { + throw new ConfigException("Invalid auto_flush_interval value [auto_flush_interval=" + tmpSink + ']'); + } + } else if (Chars.equals(tmpSink, "auto_flush_rows")) { + pos = ConfStringParser.value(confStr, pos, tmpSink); + if (pos < 0) { + sink.clear(); + sink.put(confStr); + return true; + } + if (Chars.equals(tmpSink, "off")) { + throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_rows disabled"); + } else { + try { + flushConfig.autoFlushRows = Numbers.parseInt(tmpSink); + } catch (NumericException e) { + throw new ConfigException("Invalid auto_flush_rows value [auto_flush_rows=" + tmpSink + ']'); + } + } + } else if (Chars.equals(tmpSink, "auto_flush")) { + pos = ConfStringParser.value(confStr, pos, tmpSink); + if (pos < 0) { + sink.clear(); + sink.put(confStr); + return true; + } + if (Chars.equals(tmpSink, "off")) { + throw new ConfigException("QuestDB Kafka connector cannot have auto_flush disabled"); + } else if (!Chars.equals(tmpSink, "on")) { + throw new ConfigException("Unknown auto_flush value [auto_flush=" + tmpSink + ']'); + } } else { - pos = ConfStringParser.value(confStr, pos, sink); // skip other values - } - if (pos < 0) { - parseError = true; - break; + // copy other params + sink.put(tmpSink).put('='); + pos = ConfStringParser.value(confStr, pos, tmpSink); + if (pos < 0) { + sink.clear(); + sink.put(confStr); + return true; + } + for (int i = 0; i < tmpSink.length(); i++) { + char ch = tmpSink.charAt(i); + sink.put(ch); + // re-escape semicolon + if (ch == ';') { + sink.put(';'); + } + } + sink.put(';'); } } - sink.clear(); - sink.put(confStr); - if (!parseError // we don't want to mess with the config if there was a parse error - && isHttpTransport // we only want to patch http transport - && !flushesDisabled // if auto-flush is disabled we don't need to do anything - && !intervalFlushSetExplicitly // if auto_flush_interval is set explicitly we don't want to override it - && hasAtLeastOneParam // no parameter is also an error since at least address should be set. we let client throw exception in this case - ) { - // if everything is ok, we set auto_flush_interval to max value - // this will effectively disable interval based flushes - // and the connector will flush data only when it is told to do so by Connector - // or if a row count limit is reached - sink.put("auto_flush_interval=").put(Integer.MAX_VALUE).put(';'); + if (!hasAtLeastOneParam) { + // this is invalid, let the real client parser to fail + sink.clear(); + sink.put(confStr); + return true; } + sink.put("auto_flush=off;"); - return isHttpTransport; + return true; } } diff --git a/connector/src/main/java/io/questdb/kafka/FlushConfig.java b/connector/src/main/java/io/questdb/kafka/FlushConfig.java new file mode 100644 index 0000000..0670a81 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/FlushConfig.java @@ -0,0 +1,13 @@ +package io.questdb.kafka; + +import java.util.concurrent.TimeUnit; + +class FlushConfig { + int autoFlushRows; + long autoFlushNanos; + + void reset() { + autoFlushRows = 75_000; + autoFlushNanos = TimeUnit.SECONDS.toNanos(1); + } +} diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index d8c51ae..8a7dffa 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -28,6 +28,7 @@ public final class QuestDBSinkTask extends SinkTask { private static final char STRUCT_FIELD_SEPARATOR = '_'; private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key"; private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value"; + private static final long FLUSH_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1); private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class); private Sender sender; @@ -43,6 +44,10 @@ public final class QuestDBSinkTask extends SinkTask { private boolean kafkaTimestampsEnabled; private boolean httpTransport; private int allowedLag; + private long nextFlushNanos; + private int pendingRows; + private final int maxPendingRows = 75_000; + private FlushConfig flushConfig = new FlushConfig(); @Override public String version() { @@ -79,6 +84,7 @@ public void start(Map map) { this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative(); this.timestampUnits = config.getTimestampUnitsOrNull(); this.allowedLag = config.getAllowedLag(); + this.nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS; } private Sender createRawSender() { @@ -91,7 +97,7 @@ private Sender createRawSender() { if (confStr != null && !confStr.isEmpty()) { log.debug("Using client configuration string"); StringSink sink = new StringSink(); - httpTransport = ClientConfUtils.patchConfStr(confStr, sink); + httpTransport = ClientConfUtils.patchConfStr(confStr, sink, flushConfig); return Sender.fromConfig(sink); } log.debug("Using legacy client configuration"); @@ -132,11 +138,7 @@ public void put(Collection collection) { // We do not want locally buffered row to be stuck in the buffer for too long. It increases // latency between the time the record is produced and the time it is visible in QuestDB. // If the local buffer is empty then flushing is a cheap no-op. - try { - sender.flush(); - } catch (LineSenderException | HttpClientException e) { - onSenderException(e); - } + flushAndResetCounters(); } else { log.debug("Received empty collection, nothing to do"); } @@ -156,7 +158,27 @@ public void put(Collection collection) { handleSingleRecord(record); } - if (!httpTransport) { + if (httpTransport) { + if (pendingRows >= maxPendingRows) { + log.debug("Flushing data to QuestDB due to auto_flush_rows limit [pending-rows={}, max-pending-rows={}]", + pendingRows, maxPendingRows); + flushAndResetCounters(); + } else { + long remainingNanos = nextFlushNanos - System.nanoTime(); + long remainingMs = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + if (remainingMs <= 0) { + log.debug("Flushing data to QuestDB due to auto_flush_interval timeout"); + flushAndResetCounters(); + } if (allowedLag == 0) { + log.debug("Flushing data to QuestDB due to zero allowed lag"); + flushAndResetCounters(); + } else { + log.debug("Flushing data to QuestDB in {} ms", remainingMs); + long maxWaitTime = Math.min(remainingMs, allowedLag); + context.timeout(maxWaitTime); + } + } + } else { log.debug("Sending {} records", collection.size()); sender.flush(); log.debug("Successfully sent {} records", collection.size()); @@ -177,18 +199,24 @@ public void put(Collection collection) { } catch (LineSenderException | HttpClientException e) { onSenderException(e); } + } - if (httpTransport) { - // we successfully added some rows to the local buffer. - // let's set a timeout so Kafka Connect will call us again in time even if there are - // no new records to send. this gives us a chance to flush the buffer. - context.timeout(allowedLag); + private void flushAndResetCounters() { + log.debug("Flushing data to QuestDB"); + try { + sender.flush(); + nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS; + pendingRows = 0; + } catch (LineSenderException | HttpClientException e) { + onSenderException(e); } } private void onSenderException(Exception e) { if (httpTransport) { closeSenderSilently(); + nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS; + pendingRows = 0; throw new ConnectException("Failed to send data to QuestDB", e); } @@ -239,6 +267,7 @@ private void handleSingleRecord(SinkRecord record) { timestampColumnValue = Long.MIN_VALUE; } } + pendingRows++; } private void handleStruct(String parentName, Struct value, Schema schema) { @@ -470,13 +499,7 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) { @Override public void flush(Map map) { if (httpTransport) { - try { - log.debug("Flushing data to QuestDB"); - sender.flush(); - } catch (LineSenderException | HttpClientException e) { - onSenderException(e); - throw new ConnectException("Failed to flush data to QuestDB", e); - } + flushAndResetCounters(); } // TCP transport flushes after each batch so no need to flush here } diff --git a/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java index 6cfd5d4..8cfbcdd 100644 --- a/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java +++ b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java @@ -1,55 +1,89 @@ package io.questdb.kafka; -import io.questdb.std.Chars; import io.questdb.std.str.StringSink; +import org.apache.kafka.common.config.ConfigException; +import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.TimeUnit; + import static org.junit.jupiter.api.Assertions.*; public class ClientConfUtilsTest { + private static final int DEFAULT_MAX_PENDING_ROWS = 75_000; + private static final long DEFAULT_FLUSH_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1); @Test public void testHttpTransportIsResolved() { StringSink sink = new StringSink(); - assertTrue(ClientConfUtils.patchConfStr("http::addr=localhost:9000;", sink)); - assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink)); - assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink)); - assertFalse(ClientConfUtils.patchConfStr("tcp::addr=localhost:9000;", sink)); - assertFalse(ClientConfUtils.patchConfStr("tcps::addr=localhost:9000;", sink)); + FlushConfig flushConfig = new FlushConfig(); + assertTrue(ClientConfUtils.patchConfStr("http::addr=localhost:9000;", sink, flushConfig)); + assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink, flushConfig)); + assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink, flushConfig)); + assertFalse(ClientConfUtils.patchConfStr("tcp::addr=localhost:9000;", sink, flushConfig)); + assertFalse(ClientConfUtils.patchConfStr("tcps::addr=localhost:9000;", sink, flushConfig)); } @Test - public void testHttpTransportTimeBasedFlushesDisabledByDefault() { - assertConfStringIsPatched("http::addr=localhost:9000;"); - assertConfStringIsPatched("https::addr=localhost:9000;foo=bar;"); - assertConfStringIsPatched("https::addr=localhost:9000;auto_flush_rows=1;"); - assertConfStringIsPatched("https::addr=localhost:9000;auto_flush=on;"); - - assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush_interval=100;"); - assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush=off;"); - assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar"); + public void testClientConfPatching() { + assertConfStringIsPatched("http::addr=localhost:9000;", "http::addr=localhost:9000;auto_flush=off;", DEFAULT_MAX_PENDING_ROWS, DEFAULT_FLUSH_INTERVAL_NANOS); + assertConfStringIsPatched("https::addr=localhost:9000;foo=bar;", "https::addr=localhost:9000;foo=bar;auto_flush=off;", DEFAULT_MAX_PENDING_ROWS, DEFAULT_FLUSH_INTERVAL_NANOS); + assertConfStringIsPatched("https::addr=localhost:9000;auto_flush_rows=1;", "https::addr=localhost:9000;auto_flush=off;",1, DEFAULT_FLUSH_INTERVAL_NANOS); + assertConfStringIsPatched("https::addr=localhost:9000;auto_flush=on;", "https::addr=localhost:9000;auto_flush=off;", DEFAULT_MAX_PENDING_ROWS, DEFAULT_FLUSH_INTERVAL_NANOS); + assertConfStringIsPatched("https::addr=localhost:9000;foo=bar;auto_flush_interval=100;", "https::addr=localhost:9000;foo=bar;auto_flush=off;", DEFAULT_MAX_PENDING_ROWS, TimeUnit.MILLISECONDS.toNanos(100)); + assertConfStringIsPatched("https::addr=localhost:9000;foo=bar;auto_flush_interval=100;auto_flush_rows=42;", "https::addr=localhost:9000;foo=bar;auto_flush=off;",42, TimeUnit.MILLISECONDS.toNanos(100)); + + // with escaped semi-colon + assertConfStringIsPatched("https::addr=localhost:9000;foo=b;;ar;auto_flush_interval=100;auto_flush_rows=42;", "https::addr=localhost:9000;foo=b;;ar;auto_flush=off;",42, TimeUnit.MILLISECONDS.toNanos(100)); + + + assertConfStringIsNotPatched("https::addr=localhost:9000;auto_flush_interval="); + assertConfStringIsNotPatched("https::addr=localhost:9000;auto_flush_rows="); + assertConfStringIsNotPatched("https::addr=localhost:9000;auto_flush="); + assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar"); // missing trailing semicolon + assertConfStringIsNotPatched("https::addr="); assertConfStringIsNotPatched("https::addr"); assertConfStringIsNotPatched("https"); + assertConfStringIsNotPatched("http!"); assertConfStringIsNotPatched("tcp::addr=localhost:9000;"); assertConfStringIsNotPatched("tcps::addr=localhost:9000;foo=bar;"); assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush_rows=1;"); assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush=on;"); assertConfStringIsNotPatched("unknown::addr=localhost:9000;auto_flush=on;"); + + assertConfStringPatchingThrowsConfigException("https::addr=localhost:9000;foo=bar;auto_flush=foo;", "Unknown auto_flush value [auto_flush=foo]"); + assertConfStringPatchingThrowsConfigException("https::addr=localhost:9000;foo=bar;auto_flush_interval=foo;", "Invalid auto_flush_interval value [auto_flush_interval=foo]"); + assertConfStringPatchingThrowsConfigException("https::addr=localhost:9000;foo=bar;auto_flush_rows=foo;", "Invalid auto_flush_rows value [auto_flush_rows=foo]"); + assertConfStringPatchingThrowsConfigException("https::addr=localhost:9000;foo=bar;auto_flush=off;", "QuestDB Kafka connector cannot have auto_flush disabled"); + assertConfStringPatchingThrowsConfigException("https::addr=localhost:9000;foo=bar;auto_flush_interval=off;", "QuestDB Kafka connector cannot have auto_flush_interval disabled"); + assertConfStringPatchingThrowsConfigException("https::addr=localhost:9000;foo=bar;auto_flush_rows=off;", "QuestDB Kafka connector cannot have auto_flush_rows disabled"); } - private static void assertConfStringIsPatched(String confStr) { + private static void assertConfStringIsPatched(String confStr, String expectedPatchedConfStr, long expectedMaxPendingRows, long expectedFlushNanos) { StringSink sink = new StringSink(); - ClientConfUtils.patchConfStr(confStr, sink); + FlushConfig flushConfig = new FlushConfig(); + ClientConfUtils.patchConfStr(confStr, sink, flushConfig); - String expected = confStr + "auto_flush_interval=" + Integer.MAX_VALUE + ";"; - assertTrue(Chars.equals(expected, sink), "Conf string = " + confStr + ", expected = " + expected + ", actual = " + sink); + Assert.assertEquals(expectedPatchedConfStr, sink.toString()); } private static void assertConfStringIsNotPatched(String confStr) { StringSink sink = new StringSink(); - ClientConfUtils.patchConfStr(confStr, sink); + FlushConfig flushConfig = new FlushConfig(); + ClientConfUtils.patchConfStr(confStr, sink, flushConfig); assertEquals(confStr, sink.toString()); } + private static void assertConfStringPatchingThrowsConfigException(String confStr, String expectedMsg) { + StringSink sink = new StringSink(); + FlushConfig flushConfig = new FlushConfig(); + try { + ClientConfUtils.patchConfStr(confStr, sink, flushConfig); + Assert.fail("Expected ConfigException"); + } catch (ConfigException e) { + assertEquals(expectedMsg, e.getMessage()); + } + } + } \ No newline at end of file diff --git a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java index f4b67c7..9343297 100644 --- a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java +++ b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java @@ -66,7 +66,7 @@ public class QuestDBSinkConnectorIT { .withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1") .withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1") .withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1") - .withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush=off;") // intentionally disabled auto-flush + .withEnv("QDB_CLIENT_CONF", "http::addr=questdb;") .withNetwork(network) .withExposedPorts(8083) .withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar") diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index bf325de..14b601a 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -297,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) { } private static void startConnector() throws IOException, InterruptedException, URISyntaxException { - String confString = "http::addr=questdb:9000;auto_flush_rows=10000;retry_timeout=60000;"; + String confString = "http::addr=questdb:9000;retry_timeout=60000;"; String payload = "{\"name\":\"my-connector\",\"config\":{" + "\"tasks.max\":\"4\"," + From 7b387b63fd843d9a7923bc08cf86b2a37f13fa53 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 24 May 2024 12:50:58 +0200 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=A4=A6=E2=80=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 1bc47f9..4ea4b49 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -169,7 +169,7 @@ public void put(Collection collection) { if (remainingMs <= 0) { log.debug("Flushing data to QuestDB due to auto_flush_interval timeout"); flushAndResetCounters(); - } if (allowedLag == 0) { + } else if (allowedLag == 0) { log.debug("Flushing data to QuestDB due to zero allowed lag"); flushAndResetCounters(); } else { From 4cb82b6421e6748c114964464ccdaf79770c0cd9 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 24 May 2024 13:14:07 +0200 Subject: [PATCH 3/4] debug value removed --- .../io/questdb/kafka/ClientConfUtils.java | 4 +++- .../java/io/questdb/kafka/FlushConfig.java | 2 +- .../io/questdb/kafka/QuestDBSinkTask.java | 19 ++++++++++--------- .../io/questdb/kafka/ClientConfUtilsTest.java | 2 ++ .../io/questdb/kafka/ConnectTestUtils.java | 8 ++++---- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java index 65f0f1d..17846f7 100644 --- a/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java +++ b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java @@ -8,6 +8,8 @@ import io.questdb.std.str.StringSink; import org.apache.kafka.common.config.ConfigException; +import java.util.concurrent.TimeUnit; + final class ClientConfUtils { private ClientConfUtils() { } @@ -53,7 +55,7 @@ static boolean patchConfStr(String confStr, StringSink sink, FlushConfig flushCo throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_interval disabled"); } try { - flushConfig.autoFlushNanos = Numbers.parseLong(tmpSink); + flushConfig.autoFlushNanos = TimeUnit.MILLISECONDS.toNanos(Numbers.parseLong(tmpSink)); } catch (NumericException e) { throw new ConfigException("Invalid auto_flush_interval value [auto_flush_interval=" + tmpSink + ']'); } diff --git a/connector/src/main/java/io/questdb/kafka/FlushConfig.java b/connector/src/main/java/io/questdb/kafka/FlushConfig.java index 0670a81..190469d 100644 --- a/connector/src/main/java/io/questdb/kafka/FlushConfig.java +++ b/connector/src/main/java/io/questdb/kafka/FlushConfig.java @@ -2,7 +2,7 @@ import java.util.concurrent.TimeUnit; -class FlushConfig { +final class FlushConfig { int autoFlushRows; long autoFlushNanos; diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 4ea4b49..789140c 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -28,7 +28,6 @@ public final class QuestDBSinkTask extends SinkTask { private static final char STRUCT_FIELD_SEPARATOR = '_'; private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key"; private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value"; - private static final long FLUSH_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1); private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class); private Sender sender; @@ -46,8 +45,7 @@ public final class QuestDBSinkTask extends SinkTask { private int allowedLag; private long nextFlushNanos; private int pendingRows; - private final int maxPendingRows = 75_000; - private FlushConfig flushConfig = new FlushConfig(); + private final FlushConfig flushConfig = new FlushConfig(); @Override public String version() { @@ -84,7 +82,7 @@ public void start(Map map) { this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative(); this.timestampUnits = config.getTimestampUnitsOrNull(); this.allowedLag = config.getAllowedLag(); - this.nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS; + this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; } private Sender createRawSender() { @@ -98,9 +96,12 @@ private Sender createRawSender() { log.debug("Using client configuration string"); StringSink sink = new StringSink(); httpTransport = ClientConfUtils.patchConfStr(confStr, sink, flushConfig); + if (!httpTransport) { + log.info("Using TCP transport, consider using HTTP transport for improved fault tolerance and error handling"); + } return Sender.fromConfig(sink); } - log.debug("Using legacy client configuration"); + log.warn("Configuration options 'host', 'tsl', 'token' and 'username' are deprecated and will be removed in the future. Use 'client.conf.string' instead. See: https://questdb.io/docs/third-party-tools/kafka/questdb-kafka/#configuration-options"); Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost()); if (config.isTls()) { builder.enableTls(); @@ -159,9 +160,9 @@ public void put(Collection collection) { } if (httpTransport) { - if (pendingRows >= maxPendingRows) { + if (pendingRows >= flushConfig.autoFlushRows) { log.debug("Flushing data to QuestDB due to auto_flush_rows limit [pending-rows={}, max-pending-rows={}]", - pendingRows, maxPendingRows); + pendingRows, flushConfig.autoFlushRows); flushAndResetCounters(); } else { long remainingNanos = nextFlushNanos - System.nanoTime(); @@ -205,7 +206,7 @@ private void flushAndResetCounters() { log.debug("Flushing data to QuestDB"); try { sender.flush(); - nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS; + nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; pendingRows = 0; } catch (LineSenderException | HttpClientException e) { onSenderException(e); @@ -215,7 +216,7 @@ private void flushAndResetCounters() { private void onSenderException(Exception e) { if (httpTransport) { closeSenderSilently(); - nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS; + nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; pendingRows = 0; throw new ConnectException("Failed to send data to QuestDB", e); } diff --git a/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java index 8cfbcdd..8a122d5 100644 --- a/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java +++ b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java @@ -65,6 +65,8 @@ private static void assertConfStringIsPatched(String confStr, String expectedPat ClientConfUtils.patchConfStr(confStr, sink, flushConfig); Assert.assertEquals(expectedPatchedConfStr, sink.toString()); + Assert.assertEquals(expectedMaxPendingRows, flushConfig.autoFlushRows); + Assert.assertEquals(expectedFlushNanos, flushConfig.autoFlushNanos); } private static void assertConfStringIsNotPatched(String confStr) { diff --git a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java index 047a2c2..663c339 100644 --- a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java +++ b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java @@ -52,13 +52,13 @@ static Map baseConnectorProps(GenericContainer questDBContain props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); String confString; - if (!useHttp) { - String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); - props.put("host", ilpIUrl); - } else { + if (useHttp) { int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT); confString = "http::addr=" + host + ":" + port + ";"; props.put("client.conf.string", confString); + } else { + String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); + props.put("host", ilpIUrl); } return props; } From 13acd8b2dac0eea921c9b1e42232267a8fcab7ff Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 24 May 2024 14:05:13 +0200 Subject: [PATCH 4/4] remove a stop-gap config option this is no longer needed --- integration-tests/debezium/src/test/java/kafka/DebeziumIT.java | 3 +-- .../src/test/java/io/questdb/kafka/ExactlyOnceIT.java | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java index 327a3a5..63029d6 100644 --- a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java +++ b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java @@ -63,8 +63,7 @@ public class DebeziumIT { .dependsOn(kafkaContainer) .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("debezium"))) .withEnv("CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE", "true") - .withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "true") - .withEnv("OFFSET_FLUSH_INTERVAL_MS", "1000"); + .withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "true"); @Container private final GenericContainer questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0") diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index 14b601a..1598f63 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -171,7 +171,6 @@ private static GenericContainer newConnectContainer(int id) { return new GenericContainer<>(CONNECT_CONTAINER_IMAGE) .withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka0:9092") .withEnv("CONNECT_GROUP_ID", "test") - .withEnv("CONNECT_OFFSET_FLUSH_INTERVAL_MS", "5000") .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic") .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic") .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic")