Having trouble with creation of KafkaConsumer using schema registry #23376
-
I'm following the Using Apache Kafka with Schema Registry guide and trying to create a This is with Quarkus 2.7.0.Final. One thing to note is that this setup works perfectly fine prior to me trying to switch to use avro/schema registry. The app without that stuff can be found at https://github.com/quarkusio/quarkus-super-heroes/tree/main/rest-fights I have my test resource class as follows: import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import io.quarkus.test.common.DevServicesContext;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager.TestInjector.AnnotatedAndMatchesType;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
public class KafkaConsumerResource implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware {
private static final AtomicReference<Map<String, String>> DEV_SERVICES_PROPERTIES = new AtomicReference<>(new HashMap<>());
private KafkaConsumer<String, io.quarkus.sample.superheroes.fight.schema.Fight> fightConsumer;
@Override
public Map<String, String> start() {
this.fightConsumer = new KafkaConsumer<>(consumerConfig());
this.fightConsumer.subscribe(List.of("fights"));
return Map.of();
}
@Override
public void stop() {
if (this.fightConsumer != null) {
this.fightConsumer.unsubscribe();
this.fightConsumer.close();
}
}
private Map<String, Object> consumerConfig() {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DEV_SERVICES_PROPERTIES.get().get("kafka.bootstrap.servers"),
ConsumerConfig.GROUP_ID_CONFIG, "fights",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName(),
SerdeConfig.AUTO_REGISTER_ARTIFACT, "true",
SerdeConfig.REGISTRY_URL, DEV_SERVICES_PROPERTIES.get().get("mp.messaging.connector.smallrye-kafka.apicurio.registry.url")
);
}
@Override
public void inject(TestInjector testInjector) {
testInjector.injectIntoFields(
this.fightConsumer,
new AnnotatedAndMatchesType(InjectKafkaConsumer.class, KafkaConsumer.class)
);
}
@Override
public void setIntegrationTestContext(DevServicesContext context) {
DEV_SERVICES_PROPERTIES.getAndSet(context.devServicesProperties());
}
} My test class is an integration test run by the @QuarkusIntegrationTest
@QuarkusTestResource(HeroesVillainsWiremockServerResource.class)
@QuarkusTestResource(value = KafkaConsumerResource.class, restrictToAnnotatedClass = true)
@TestMethodOrder(OrderAnnotation.class)
public class FightResourceIT {
@InjectKafkaConsumer
KafkaConsumer<String, io.quarkus.sample.superheroes.fight.schema.Fight> fightsConsumer;
} When the test goes to run it blows up - the test class fails to instantiate: java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.RuntimeException: Unable to start Quarkus test resource io.quarkus.sample.superheroes.fight.KafkaConsumerResource@fb93ea7
at io.quarkus.test.junit.QuarkusIntegrationTestExtension.throwBootFailureException(QuarkusIntegrationTestExtension.java:265)
at io.quarkus.test.junit.QuarkusIntegrationTestExtension.beforeEach(QuarkusIntegrationTestExtension.java:70)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachCallbacks$2(TestMethodTestDescriptor.java:163)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:199)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:199)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachCallbacks(TestMethodTestDescriptor.java:162)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:129)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: Unable to start Quarkus test resource io.quarkus.sample.superheroes.fight.KafkaConsumerResource@fb93ea7
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1739)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: Unable to start Quarkus test resource io.quarkus.sample.superheroes.fight.KafkaConsumerResource@fb93ea7
at io.quarkus.test.common.TestResourceManager$TestResourceEntryRunnable.run(TestResourceManager.java:451)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1736)
... 4 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
at io.quarkus.sample.superheroes.fight.KafkaConsumerResource.start(KafkaConsumerResource.java:33)
at io.quarkus.test.common.TestResourceManager$TestResourceEntryRunnable.run(TestResourceManager.java:446)
... 5 more
Caused by: java.lang.IllegalStateException: java.util.NoSuchElementException
at io.apicurio.registry.serde.AbstractSchemaResolver.configure(AbstractSchemaResolver.java:86)
at io.apicurio.registry.serde.DefaultSchemaResolver.configure(DefaultSchemaResolver.java:56)
at io.apicurio.registry.serde.SchemaResolverConfigurer.configure(SchemaResolverConfigurer.java:75)
at io.apicurio.registry.serde.AbstractKafkaSerDe.configure(AbstractKafkaSerDe.java:68)
at io.apicurio.registry.serde.AbstractKafkaDeserializer.configure(AbstractKafkaDeserializer.java:62)
at io.apicurio.registry.serde.avro.AvroKafkaDeserializer.configure(AvroKafkaDeserializer.java:68)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:716)
... 9 more
Caused by: java.util.NoSuchElementException
at java.base/java.util.ServiceLoader$2.next(ServiceLoader.java:1308)
at java.base/java.util.ServiceLoader$2.next(ServiceLoader.java:1296)
at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1394)
at io.apicurio.registry.rest.client.RegistryClientFactory.resolveProviderInstance(RegistryClientFactory.java:105)
at io.apicurio.registry.rest.client.RegistryClientFactory.create(RegistryClientFactory.java:77)
at io.apicurio.registry.rest.client.RegistryClientFactory.create(RegistryClientFactory.java:71)
at io.apicurio.registry.serde.AbstractSchemaResolver.configure(AbstractSchemaResolver.java:82)
... 15 more |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 12 replies
-
/cc @cescoffier, @ozangunalp |
Beta Was this translation helpful? Give feedback.
-
/cc @carlesarnal |
Beta Was this translation helpful? Give feedback.
/cc @carlesarnal