Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #549: Upgrade kafka-avro-serializer dependency #573

Merged
merged 8 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private fun convertAvroData(rawValue: Any?): Any? = when (rawValue) {
.mapValues { convertAvroData(it.value) }
is GenericFixed -> rawValue.bytes()
is ByteBuffer -> rawValue.array()
is GenericEnumSymbol, is CharSequence -> rawValue.toString()
is GenericEnumSymbol<*>, is CharSequence -> rawValue.toString()
else -> rawValue
}
fun IndexedRecord.toMap() = this.schema.fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.utils.Time
import org.junit.Test
import org.mockito.ArgumentMatchers
import org.mockito.Mockito
Expand All @@ -25,7 +24,7 @@ class KafkaErrorServiceTest {
val counter = AtomicInteger(0)
Mockito.`when`(producer.send(ArgumentMatchers.any<ProducerRecord<ByteArray, ByteArray>>())).then {
counter.incrementAndGet()
FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, SystemTime())
FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0, 0, 0, SystemTime())
}
val dlqService = KafkaErrorService(producer, ErrorService.ErrorConfig(fail=false,dlqTopic = "dlqTopic"), { s, e -> })
dlqService.report(listOf(dlqData()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class KafkaEventSinkSuiteIT {
* 4.0.x | 1.0.x
* 4.1.x | 1.1.x
* 5.0.x | 2.0.x
* 7.4.X | 3.4.x (We are currently using 3.5.1 which is backward compatible)
*
* Please see also https://docs.confluent.io/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package integrations.kafka
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.KafkaContainer.KAFKA_PORT
import org.testcontainers.containers.Network
import org.testcontainers.containers.SocatContainer
import java.util.stream.Stream
Expand All @@ -30,8 +29,9 @@ class SchemaRegistryContainer(version: String): GenericContainer<SchemaRegistryC
return withKafka(kafka.network, kafka.networkAliases.map { "PLAINTEXT://$it:9092" }.joinToString(","))
}

fun withKafka(network: Network, bootstrapServers: String): SchemaRegistryContainer {
fun withKafka(network: Network?, bootstrapServers: String): SchemaRegistryContainer {
withNetwork(network)
withExposedPorts(PORT)
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", bootstrapServers)
return self()
Expand Down
2 changes: 1 addition & 1 deletion kafka-connect-neo4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<confluent.connect.plugin.version>0.11.1</confluent.connect.plugin.version>
<mvn.assembly.plugin.version>3.1.0</mvn.assembly.plugin.version>
<kafka.connect.utils.version>0.3.141</kafka.connect.utils.version>
<google.guava.version>27.0.1-jre</google.guava.version>
<google.guava.version>32.1.1-jre</google.guava.version>
</properties>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,16 @@ class Neo4jSourceTaskTest {
task.start(props)
val totalRecords = 10
insertRecords(totalRecords)

task.poll()
var exception: ConnectException? = null
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
try {
task.poll()
false
} catch (e: ConnectException) {
exception = e
true
}
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
if (exception != null) throw exception as ConnectException
}
}
24 changes: 12 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@
<java.version>1.8</java.version>
<kotlin.version>1.6.10</kotlin.version>
<kotlin.coroutines.version>1.6.0</kotlin.coroutines.version>
<neo4j.version>4.4.3</neo4j.version>
<kafka.version>2.4.1</kafka.version>
<jackson.version>2.13.1</jackson.version>
<neo4j.version>4.4.25</neo4j.version>
<kafka.version>2.6.3</kafka.version>
<jackson.version>2.14.3</jackson.version>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<neo4j.java.driver.version>4.4.2</neo4j.java.driver.version>
ali-ince marked this conversation as resolved.
Show resolved Hide resolved
<testcontainers.version>1.15.1</testcontainers.version>
<avro.version>1.8.2</avro.version>
<testcontainers.version>1.18.3</testcontainers.version>
<avro.version>1.11.2</avro.version>
<mokito.version>3.3.0</mokito.version>
<junit.version>4.13.2</junit.version>
<kafka.avro.serializer.version>5.2.2</kafka.avro.serializer.version>
<kafka.avro.serializer.version>7.4.0</kafka.avro.serializer.version>
<junit-jupiter.version>5.7.1</junit-jupiter.version>
<hamcrest.version>1.3</hamcrest.version>
<neo4j.configuration-lifecycle.version>ad59084711</neo4j.configuration-lifecycle.version>
Expand Down Expand Up @@ -297,12 +297,12 @@
<scope>test</scope>
</dependency>

<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-simple</artifactId>-->
<!-- <version>1.7.30</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.neo4j.community</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class KafkaEventRouterEnterpriseTSE {
}
StreamsUtils.ignoreExceptions({
neo4j.withWaitStrategy(LogMessageWaitStrategy()
.withRegEx(".*\\[(${DB_NAME_NAMES.joinToString("|")}|neo4j)] \\[Source\\] Streams Source module initialised\n")
.withRegEx(".*\\[(${DB_NAME_NAMES.joinToString("|")}|neo4j)/\\w+\\] \\[Source\\] Streams Source module initialised\n")
.withTimes(DB_NAME_NAMES.size + 1)
.withStartupTimeout(Duration.ofMinutes(10)))
DB_NAME_NAMES.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class KafkaEventRouterSuiteIT {
* 4.0.x | 1.0.x
* 4.1.x | 1.1.x
* 5.0.x | 2.0.x
* 7.4.x | 3.4.x (We are currently using 3.5.1 which is backward compatible)
*
* Please see also https://docs.confluent.io/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
*/
Expand Down
2 changes: 1 addition & 1 deletion test-support/src/main/kotlin/streams/MavenUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object MavenUtils {

val rt = Runtime.getRuntime()
val mvnw = if (System.getProperty("os.name").startsWith("Windows")) "./mvnw.cmd" else "./mvnw"
val commands = arrayOf(mvnw, "-pl", "!doc,!kafka-connect-neo4j", "-DbuildSubDirectory=containerPlugins") +
val commands = arrayOf(mvnw, "-pl", "!kafka-connect-neo4j", "-DbuildSubDirectory=containerPlugins") +
args.let { if (it.isNullOrEmpty()) arrayOf("package", "-Dmaven.test.skip") else it }
val proc = rt.exec(commands, null, File(path))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private class DatabasesWaitStrategy(private val auth: AuthToken): AbstractWaitSt
Unreliables.retryUntilSuccess(startupTimeout.seconds.toInt(), TimeUnit.SECONDS) {
rateLimiter.doWhenReady {
if (databases.isNotEmpty()) {
val databasesStatus = systemSession.beginTransaction()
val databasesStatus = systemSession.beginTransaction()
.use { tx -> tx.run("SHOW DATABASES").list().map { it.get("name").asString() to it.get("currentStatus").asString() }.toMap() }
val notOnline = databasesStatus.filterValues { it != "online" }
if (databasesStatus.size < databases.size || notOnline.isNotEmpty()) {
Expand All @@ -57,7 +57,7 @@ private class DatabasesWaitStrategy(private val auth: AuthToken): AbstractWaitSt
}

class Neo4jContainerExtension(dockerImage: String): Neo4jContainer<Neo4jContainerExtension>(dockerImage) {
constructor(): this("neo4j:4.1.1-enterprise")
constructor(): this("neo4j:4.4.23-enterprise")
private val logger = LoggerFactory.getLogger(Neo4jContainerExtension::class.java)
var driver: Driver? = null
var session: Session? = null
Expand Down Expand Up @@ -100,7 +100,7 @@ class Neo4jContainerExtension(dockerImage: String): Neo4jContainer<Neo4jContaine
}

fun withKafka(kafka: KafkaContainer): Neo4jContainerExtension {
return withKafka(kafka.network, kafka.networkAliases.map { "$it:9092" }.joinToString(","))
return withKafka(kafka.network!!, kafka.networkAliases.map { "$it:9092" }.joinToString(","))
}

fun withKafka(network: Network, bootstrapServers: String): Neo4jContainerExtension {
Expand Down