Skip to content

Commit

Permalink
GH-2942: EmbeddedKafka usage improvements in Kafka Streams binder
Browse files Browse the repository at this point in the history
* Currently, EmbeddedKafka is initialized as part of the class initialization.
  This is preventing individual JUnit tests from being executed from an IDE (IntelliJ, for example).
  If we move this initialization to the JUnit `BeforeAll` method, then that seems to be working.

Resolves #2942
  • Loading branch information
sobychacko committed May 1, 2024
1 parent 1d24158 commit d534e84
Show file tree
Hide file tree
Showing 26 changed files with 136 additions and 64 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,12 +59,13 @@
@EmbeddedKafka(topics = "foo-2")
class EventTypeRoutingWithInferredSerdeTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();

private static Consumer<Integer, Integer> consumer;

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group-1", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,9 +64,15 @@
class InteractiveQueryServiceMultiStateStoreTests {

private static final String STORE_1_NAME = "store1";

private static final String STORE_2_NAME = "store2";

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();
private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}

@Test
void stateStoreAvailableOnProperAppWhenAppServerPropertySet() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.function.Consumer;

import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.boot.WebApplicationType;
Expand All @@ -40,7 +41,12 @@
@EmbeddedKafka
class KafkaStreamsBinderEnvironmentPostProcessorTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();
private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}

@Test
void defaultIneligibleFunctionIsSet() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,14 +59,15 @@
@EmbeddedKafka(topics = "foo-2")
class KafkaStreamsEventTypeRoutingTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();

private static Consumer<Integer, Foo> consumer;

private static CountDownLatch LATCH = new CountDownLatch(3);
private static final CountDownLatch LATCH = new CountDownLatch(3);

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group-1", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,16 +55,17 @@
@EmbeddedKafka(topics = {"fooFuncanotherFooFunc-out-0", "bar"})
class KafkaStreamsFunctionCompositionTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();

private static Consumer<String, String> consumer;

private static final CountDownLatch countDownLatch1 = new CountDownLatch(1);
private static final CountDownLatch countDownLatch2 = new CountDownLatch(1);
private static final CountDownLatch countDownLatch3 = new CountDownLatch(2);

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("fn-composition-group", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@
@EmbeddedKafka(topics = "counts-id")
class KafkaStreamsInteractiveQueryIntegrationTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();

private static Consumer<String, String> consumer;

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() throws Exception {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id",
"false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,14 +56,15 @@
@EmbeddedKafka(topics = {"coffee", "electronics"})
class MultipleFunctionsInSameAppTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();

private static Consumer<String, String> consumer;

private static CountDownLatch countDownLatch = new CountDownLatch(2);
private static final CountDownLatch countDownLatch = new CountDownLatch(2);

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("purchase-groups", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@
*
* @author Chris Bono
*/
@SuppressWarnings({ "rawtypes", "NewClassNamingConvention", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked" })
class SerdeResolverUtilsTests {

@Nested
class ResolveForType {

private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues("spring.cloud.function.ineligible-definitions: sendToDlqAndContinue");

private Serde<?> fallback = mock(Serde.class);
private final Serde<?> fallback = mock(Serde.class);

@Test
void returnsFallbackSerdeForWildcard() {
Expand Down Expand Up @@ -368,7 +368,7 @@ void returnsProperlyOrderedSerdesForComplexGenericTypes() {
}

static class GenericEventSerde<T> implements Serde<GenericEvent<? extends T>> {
private String name;
private final String name;

GenericEventSerde(String name) {
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,10 +55,11 @@
@EmbeddedKafka
class KafkaStreamsBinderBootstrapTest {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();
private static EmbeddedKafkaBroker embeddedKafka;

@BeforeEach
public void before() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,12 +41,13 @@
@EmbeddedKafka
class KafkaStreamsBinderJaasInitTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();
private static EmbeddedKafkaBroker embeddedKafka;

private static String JAVA_LOGIN_CONFIG_PARAM_VALUE;

@BeforeAll
public static void beforeAll() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
JAVA_LOGIN_CONFIG_PARAM_VALUE = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,12 +55,13 @@
@EmbeddedKafka(topics = {"counts", "foo", "bar"})
class KafkaStreamsBinderWordCountBranchesFunctionTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();
private static EmbeddedKafkaBroker embeddedKafka;

private static Consumer<String, String> consumer;

@BeforeAll
public static void setUp() throws Exception {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupx", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,14 +76,15 @@
@EmbeddedKafka(topics = {"counts", "counts-1", "counts-2", "counts-5", "counts-6"})
class KafkaStreamsBinderWordCountFunctionTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();
private static EmbeddedKafkaBroker embeddedKafka;

private static Consumer<String, String> consumer;

private final static CountDownLatch LATCH = new CountDownLatch(1);

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.boot.SpringApplication;
Expand All @@ -47,7 +48,12 @@
@EmbeddedKafka
class KafkaStreamsFunctionStateStoreTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();
private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}

@Test
void kafkaStreamsFuncionWithMultipleStateStores() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.NoSuchBeanDefinitionException;
Expand Down Expand Up @@ -55,11 +56,17 @@
@EmbeddedKafka
class KafkaStreamsRetryTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();

private final static CountDownLatch LATCH1 = new CountDownLatch(2);

private final static CountDownLatch LATCH2 = new CountDownLatch(4);

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}

@Test
void retryTemplatePerBindingOnKStream() throws Exception {
SpringApplication app = new SpringApplication(RetryTemplatePerConsumerBindingApp.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.boot.SpringApplication;
Expand Down Expand Up @@ -55,7 +56,12 @@
@EmbeddedKafka(topics = { "topic1", "topic2" })
class SerdesProvidedAsBeansTests {

private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();
private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}

@Test
void simpleSerdeBeansAreResolvedProperly() throws Exception {
Expand Down
Loading

0 comments on commit d534e84

Please sign in to comment.