From c53b5a143eefc887e4efe135974189ece701a456 Mon Sep 17 00:00:00 2001 From: mroiter-larus Date: Tue, 25 Jul 2023 13:45:24 +0200 Subject: [PATCH 1/8] Issue #549: Upgrade kafka-avro-serializer dependency --- .../src/main/kotlin/streams/extensions/CommonExtensions.kt | 2 +- .../streams/service/sink/errors/KafkaErrorServiceTest.kt | 2 +- kafka-connect-neo4j/pom.xml | 2 +- pom.xml | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/src/main/kotlin/streams/extensions/CommonExtensions.kt b/common/src/main/kotlin/streams/extensions/CommonExtensions.kt index 010a662d..46e7ccb2 100644 --- a/common/src/main/kotlin/streams/extensions/CommonExtensions.kt +++ b/common/src/main/kotlin/streams/extensions/CommonExtensions.kt @@ -74,7 +74,7 @@ private fun convertAvroData(rawValue: Any?): Any? = when (rawValue) { .mapValues { convertAvroData(it.value) } is GenericFixed -> rawValue.bytes() is ByteBuffer -> rawValue.array() - is GenericEnumSymbol, is CharSequence -> rawValue.toString() + is GenericEnumSymbol<*>, is CharSequence -> rawValue.toString() else -> rawValue } fun IndexedRecord.toMap() = this.schema.fields diff --git a/common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt b/common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt index 38b78b10..bda9b230 100644 --- a/common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt +++ b/common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt @@ -25,7 +25,7 @@ class KafkaErrorServiceTest { val counter = AtomicInteger(0) Mockito.`when`(producer.send(ArgumentMatchers.any>())).then { counter.incrementAndGet() - FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, SystemTime()) + FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0, 0, SystemTime()) } val dlqService = KafkaErrorService(producer, ErrorService.ErrorConfig(fail=false,dlqTopic = "dlqTopic"), { s, e -> }) dlqService.report(listOf(dlqData())) diff --git a/kafka-connect-neo4j/pom.xml b/kafka-connect-neo4j/pom.xml index 7547cee5..33e8662b 100644 --- a/kafka-connect-neo4j/pom.xml +++ b/kafka-connect-neo4j/pom.xml @@ -22,7 +22,7 @@ 0.11.1 3.1.0 0.3.141 - 27.0.1-jre + 32.1.1-jre diff --git a/pom.xml b/pom.xml index fd6d910e..035598d0 100644 --- a/pom.xml +++ b/pom.xml @@ -55,15 +55,15 @@ 1.6.10 1.6.0 4.4.3 - 2.4.1 + 3.5.1 2.13.1 true 4.4.2 1.15.1 - 1.8.2 + 1.11.2 3.3.0 4.13.2 - 5.2.2 + 7.4.0 5.7.1 1.3 ad59084711 From 6ec4a00b5ff4ffba5569ace179cd1fd68a6f6482 Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Thu, 27 Jul 2023 15:42:56 +0200 Subject: [PATCH 2/8] fixed flaky test --- .../kafka/connect/source/Neo4jSourceTaskTest.kt | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt index 0bffeccd..7fa3b4aa 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt @@ -267,7 +267,16 @@ class Neo4jSourceTaskTest { task.start(props) val totalRecords = 10 insertRecords(totalRecords) - - task.poll() + var exception: ConnectException? = null + Assert.assertEventually(ThrowingSupplier { + try { + task.poll() + false + } catch (e: ConnectException) { + exception = e + true + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + if (exception != null) throw exception as ConnectException } } \ No newline at end of file From ba8504b805ebf1635c2fb2894778af38ea50f039 Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Thu, 27 Jul 2023 16:50:26 +0200 Subject: [PATCH 3/8] fixed KafkaEventRouterEnterpriseTSE --- pom.xml | 2 +- .../streams/integrations/KafkaEventRouterEnterpriseTSE.kt | 2 +- test-support/src/main/kotlin/streams/MavenUtils.kt | 2 +- .../src/main/kotlin/streams/Neo4jContainerExtension.kt | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 035598d0..89f01cc8 100644 --- a/pom.xml +++ b/pom.xml @@ -59,7 +59,7 @@ 2.13.1 true 4.4.2 - 1.15.1 + 1.18.3 1.11.2 3.3.0 4.13.2 diff --git a/producer/src/test/kotlin/streams/integrations/KafkaEventRouterEnterpriseTSE.kt b/producer/src/test/kotlin/streams/integrations/KafkaEventRouterEnterpriseTSE.kt index 6b4f1a83..b36c11ed 100644 --- a/producer/src/test/kotlin/streams/integrations/KafkaEventRouterEnterpriseTSE.kt +++ b/producer/src/test/kotlin/streams/integrations/KafkaEventRouterEnterpriseTSE.kt @@ -65,7 +65,7 @@ class KafkaEventRouterEnterpriseTSE { } StreamsUtils.ignoreExceptions({ neo4j.withWaitStrategy(LogMessageWaitStrategy() - .withRegEx(".*\\[(${DB_NAME_NAMES.joinToString("|")}|neo4j)] \\[Source\\] Streams Source module initialised\n") + .withRegEx(".*\\[(${DB_NAME_NAMES.joinToString("|")}|neo4j)/\\w+\\] \\[Source\\] Streams Source module initialised\n") .withTimes(DB_NAME_NAMES.size + 1) .withStartupTimeout(Duration.ofMinutes(10))) DB_NAME_NAMES.forEach { diff --git a/test-support/src/main/kotlin/streams/MavenUtils.kt b/test-support/src/main/kotlin/streams/MavenUtils.kt index 10ad6869..e337ba56 100644 --- a/test-support/src/main/kotlin/streams/MavenUtils.kt +++ b/test-support/src/main/kotlin/streams/MavenUtils.kt @@ -11,7 +11,7 @@ object MavenUtils { val rt = Runtime.getRuntime() val mvnw = if (System.getProperty("os.name").startsWith("Windows")) "./mvnw.cmd" else "./mvnw" - val commands = arrayOf(mvnw, "-pl", "!doc,!kafka-connect-neo4j", "-DbuildSubDirectory=containerPlugins") + + val commands = arrayOf(mvnw, "-pl", "!kafka-connect-neo4j", "-DbuildSubDirectory=containerPlugins") + args.let { if (it.isNullOrEmpty()) arrayOf("package", "-Dmaven.test.skip") else it } val proc = rt.exec(commands, null, File(path)) diff --git a/test-support/src/main/kotlin/streams/Neo4jContainerExtension.kt b/test-support/src/main/kotlin/streams/Neo4jContainerExtension.kt index 19c807ce..737ff006 100644 --- a/test-support/src/main/kotlin/streams/Neo4jContainerExtension.kt +++ b/test-support/src/main/kotlin/streams/Neo4jContainerExtension.kt @@ -40,7 +40,7 @@ private class DatabasesWaitStrategy(private val auth: AuthToken): AbstractWaitSt Unreliables.retryUntilSuccess(startupTimeout.seconds.toInt(), TimeUnit.SECONDS) { rateLimiter.doWhenReady { if (databases.isNotEmpty()) { - val databasesStatus = systemSession.beginTransaction() + val databasesStatus = systemSession.beginTransaction() .use { tx -> tx.run("SHOW DATABASES").list().map { it.get("name").asString() to it.get("currentStatus").asString() }.toMap() } val notOnline = databasesStatus.filterValues { it != "online" } if (databasesStatus.size < databases.size || notOnline.isNotEmpty()) { @@ -57,7 +57,7 @@ private class DatabasesWaitStrategy(private val auth: AuthToken): AbstractWaitSt } class Neo4jContainerExtension(dockerImage: String): Neo4jContainer(dockerImage) { - constructor(): this("neo4j:4.1.1-enterprise") + constructor(): this("neo4j:4.4.23-enterprise") private val logger = LoggerFactory.getLogger(Neo4jContainerExtension::class.java) var driver: Driver? = null var session: Session? = null @@ -100,7 +100,7 @@ class Neo4jContainerExtension(dockerImage: String): Neo4jContainer Date: Thu, 27 Jul 2023 17:19:27 +0200 Subject: [PATCH 4/8] Fixed SchemaRegistryContainer --- .../test/kotlin/integrations/kafka/KafkaEventSinkSuiteIT.kt | 1 + .../test/kotlin/integrations/kafka/SchemaRegistryContainer.kt | 4 ++-- .../kotlin/streams/integrations/KafkaEventRouterSuiteIT.kt | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkSuiteIT.kt b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkSuiteIT.kt index 8e1d84bc..d5b523f8 100644 --- a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkSuiteIT.kt +++ b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkSuiteIT.kt @@ -35,6 +35,7 @@ class KafkaEventSinkSuiteIT { * 4.0.x | 1.0.x * 4.1.x | 1.1.x * 5.0.x | 2.0.x + * 7.4.X | 3.4.x (We are currently using 3.5.1 which is backward compatible) * * Please see also https://docs.confluent.io/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility */ diff --git a/consumer/src/test/kotlin/integrations/kafka/SchemaRegistryContainer.kt b/consumer/src/test/kotlin/integrations/kafka/SchemaRegistryContainer.kt index 5a559b79..6e64ca1a 100644 --- a/consumer/src/test/kotlin/integrations/kafka/SchemaRegistryContainer.kt +++ b/consumer/src/test/kotlin/integrations/kafka/SchemaRegistryContainer.kt @@ -3,7 +3,6 @@ package integrations.kafka import org.testcontainers.containers.GenericContainer import org.testcontainers.containers.wait.strategy.Wait import org.testcontainers.containers.KafkaContainer -import org.testcontainers.containers.KafkaContainer.KAFKA_PORT import org.testcontainers.containers.Network import org.testcontainers.containers.SocatContainer import java.util.stream.Stream @@ -30,8 +29,9 @@ class SchemaRegistryContainer(version: String): GenericContainer Date: Wed, 30 Aug 2023 09:56:00 +0200 Subject: [PATCH 5/8] Ali feedback --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 89f01cc8..ad9780da 100644 --- a/pom.xml +++ b/pom.xml @@ -54,9 +54,9 @@ 1.8 1.6.10 1.6.0 - 4.4.3 - 3.5.1 - 2.13.1 + 4.4.25 + 2.6.3 + 2.15.3 true 4.4.2 1.18.3 From 580bd10571134dc6e8bca3da6c4b356906bfa2ff Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Wed, 30 Aug 2023 09:59:13 +0200 Subject: [PATCH 6/8] jackson 2.15.3 doesn't exist, downgrade to 2.15.2 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ad9780da..5706b585 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,7 @@ 1.6.0 4.4.25 2.6.3 - 2.15.3 + 2.15.2 true 4.4.2 1.18.3 From 5341c530837c84e1185e8c7b2f8489843b57696a Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Wed, 30 Aug 2023 12:45:35 +0200 Subject: [PATCH 7/8] updated api --- .../streams/service/sink/errors/KafkaErrorServiceTest.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt b/common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt index bda9b230..cab6e95d 100644 --- a/common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt +++ b/common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt @@ -6,7 +6,6 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.internals.FutureRecordMetadata import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.utils.SystemTime -import org.apache.kafka.common.utils.Time import org.junit.Test import org.mockito.ArgumentMatchers import org.mockito.Mockito @@ -25,7 +24,7 @@ class KafkaErrorServiceTest { val counter = AtomicInteger(0) Mockito.`when`(producer.send(ArgumentMatchers.any>())).then { counter.incrementAndGet() - FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0, 0, SystemTime()) + FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0, 0, 0, SystemTime()) } val dlqService = KafkaErrorService(producer, ErrorService.ErrorConfig(fail=false,dlqTopic = "dlqTopic"), { s, e -> }) dlqService.report(listOf(dlqData())) From bf1d0660a8d4a2e6130966098b58fb827d7a935b Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Thu, 31 Aug 2023 21:03:07 +0200 Subject: [PATCH 8/8] downgrade to jackson 2.14.3 --- pom.xml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index 5706b585..10be1f94 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,7 @@ 1.6.0 4.4.25 2.6.3 - 2.15.2 + 2.14.3 true 4.4.2 1.18.3 @@ -297,12 +297,12 @@ test - - - - - - + + org.slf4j + slf4j-simple + 1.7.30 + test + org.neo4j.community