Skip to content

Commit

Permalink
Add tests to DockerKafkaEnvironment (#440)
Browse files Browse the repository at this point in the history
... and:
* added a public `start()` and `close()` method for when not using with JUnit.
* added simple test for `DockerKafkaEnvironment`
* removed a deprecated method.
* deprecated `SchemaRegistryContainer.withKafka` as its not used, not needed and the `KafkaContainer` parameter is deprecated in the next version of TestContainers.
* fix a few warnings.

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and Andy Coates authored Dec 16, 2024
1 parent b4bf7ec commit 368807a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.specmesh.kafka.schema.SchemaRegistryContainer;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -80,7 +81,8 @@ public final class DockerKafkaEnvironment
BeforeAllCallback,
BeforeEachCallback,
AfterEachCallback,
AfterAllCallback {
AfterAllCallback,
Closeable {

/** The port to use when connecting to Kafka from inside the Docker network */
public static final int KAFKA_DOCKER_NETWORK_PORT = 9092;
Expand All @@ -99,7 +101,7 @@ public final class DockerKafkaEnvironment
private KafkaContainer kafkaBroker;
private SchemaRegistryContainer schemaRegistry;
private boolean invokedStatically = false;
private final AtomicInteger setUpCount = new AtomicInteger(1);
private final AtomicInteger setUpCount = new AtomicInteger(0);

/**
* @return returns a {@link Builder} instance to allow customisation of the environment.
Expand Down Expand Up @@ -128,13 +130,12 @@ private DockerKafkaEnvironment(
this.aclBindings = Set.copyOf(requireNonNull(aclBindings, "aclBindings"));
this.adminUser = requireNonNull(adminUser, "credentials");
this.explicitNetwork = requireNonNull(explicitNetwork, "explicitNetwork");
tearDown();
}

@Override
public void beforeAll(final ExtensionContext context) {
invokedStatically = true;
setUp();
start();
}

@Override
Expand All @@ -143,7 +144,7 @@ public void beforeEach(final ExtensionContext context) {
return;
}

setUp();
start();
}

@Override
Expand All @@ -152,12 +153,12 @@ public void afterEach(final ExtensionContext context) {
return;
}

tearDown();
close();
}

@Override
public void afterAll(final ExtensionContext context) {
tearDown();
close();
}

/**
Expand All @@ -182,16 +183,6 @@ public String dockerNetworkKafkaBootstrapServers() {
+ KAFKA_DOCKER_NETWORK_PORT;
}

/**
* @return bootstrap servers for connecting to Schema Registry from outside the Docker network,
* i.e. from test code
*/
@SuppressWarnings("deprecation")
@Override
public String schemeRegistryServer() {
return schemaRegistryServer();
}

/**
* @return bootstrap servers for connecting to Schema Registry from outside the Docker network,
* i.e. from test code
Expand Down Expand Up @@ -223,6 +214,18 @@ public Admin adminClient() {
return AdminClient.create(properties);
}

@Override
public CachedSchemaRegistryClient srClient() {
return new CachedSchemaRegistryClient(
schemaRegistryServer(),
5,
List.of(
new ProtobufSchemaProvider(),
new AvroSchemaProvider(),
new JsonSchemaProvider()),
Map.of());
}

/**
* @return the Docker network Kafka and SR are running on, allowing additional containers to use
* the same network, if needed.
Expand All @@ -234,7 +237,8 @@ public Network network() {
return network;
}

private void setUp() {
/** Start the contains */
public void start() {
if (setUpCount.incrementAndGet() != 1) {
return;
}
Expand Down Expand Up @@ -272,7 +276,8 @@ private void setUp() {
installAcls();
}

private void tearDown() {
@Override
public void close() {
if (setUpCount.decrementAndGet() != 0) {
return;
}
Expand Down Expand Up @@ -323,6 +328,14 @@ public String testNetworkSchemeRegistryServer() {
return dockerNetworkSchemaRegistryServer();
}

public String kafkaLogs() {
return kafkaBroker == null ? "Kafka Broker not started" : kafkaBroker.getLogs();
}

public String schemaRegistryLogs() {
return schemaRegistry == null ? "Schema Registry not started" : schemaRegistry.getLogs();
}

/** Builder of {@link DockerKafkaEnvironment}. */
public static final class Builder {

Expand All @@ -338,17 +351,22 @@ public static final class Builder {

private int startUpAttempts = DEFAULT_CONTAINER_STARTUP_ATTEMPTS;
private Duration startUpTimeout = DEFAULT_CONTAINER_STARTUP_TIMEOUT;
private DockerImageName kafkaDockerImage =
DockerImageName.parse(DEFAULT_KAFKA_DOCKER_IMAGE);
private final Map<String, String> kafkaEnv = new HashMap<>(DEFAULT_KAFKA_ENV);
private Optional<DockerImageName> srImage =
Optional.of(DockerImageName.parse(DEFAULT_SCHEMA_REG_IMAGE));
private DockerImageName kafkaDockerImage;
private final Map<String, String> kafkaEnv = new HashMap<>();
private Optional<DockerImageName> srImage;
private final Map<String, String> srEnv = new HashMap<>();
private final Map<String, String> userPasswords = new LinkedHashMap<>();
private boolean enableAcls = false;
private final Set<AclBinding> aclBindings = new HashSet<>();
private Optional<Network> explicitNetwork = Optional.empty();

public Builder() {
withKafkaEnv(DEFAULT_KAFKA_ENV)
.withKafkaImage(DEFAULT_KAFKA_DOCKER_IMAGE)
.withSchemaRegistryImage(DEFAULT_SCHEMA_REG_IMAGE);
}

@SuppressWarnings("unused")
public Builder withNetwork(final Network network) {
this.explicitNetwork = Optional.of(network);
return this;
Expand Down Expand Up @@ -398,6 +416,7 @@ public Builder withKafkaImage(final String imageName) {
* @param value the environment value.
* @return self.
*/
@SuppressWarnings("UnusedReturnValue")
public Builder withKafkaEnv(final String key, final String value) {
return withKafkaEnv(Map.of(key, value));
}
Expand Down Expand Up @@ -429,6 +448,7 @@ public Builder withoutSchemaRegistry() {
* @param imageName the Docker image name.
* @return self.
*/
@SuppressWarnings("UnusedReturnValue")
public Builder withSchemaRegistryImage(final String imageName) {
this.srImage = Optional.of(DockerImageName.parse(imageName));
return this;
Expand All @@ -441,6 +461,7 @@ public Builder withSchemaRegistryImage(final String imageName) {
* @param value the environment value.
* @return self.
*/
@SuppressWarnings("UnusedReturnValue")
public Builder withSchemaRegistryEnv(final String key, final String value) {
return withSchemaRegistryEnv(Map.of(key, value));
}
Expand Down Expand Up @@ -567,12 +588,11 @@ private void maybeEnableSasl() {

Clients.clientSaslAuthProperties(adminUser.get().userName, adminUser.get().password)
.forEach(
(key, value) -> {
withSchemaRegistryEnv(
"SCHEMA_REGISTRY_KAFKASTORE_"
+ key.toUpperCase().replaceAll("\\.", "_"),
value.toString());
});
(key, value) ->
withSchemaRegistryEnv(
"SCHEMA_REGISTRY_KAFKASTORE_"
+ key.toUpperCase().replaceAll("\\.", "_"),
value.toString()));
}

private String buildJaasConfig(final Credentials adminUser) {
Expand All @@ -597,15 +617,4 @@ private static class Credentials {
this.password = requireNonNull(password, "password");
}
}

public CachedSchemaRegistryClient srClient() {
return new CachedSchemaRegistryClient(
schemaRegistryServer(),
5,
List.of(
new ProtobufSchemaProvider(),
new AvroSchemaProvider(),
new JsonSchemaProvider()),
Map.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,6 @@ public interface KafkaEnvironment extends Extension {
*/
String kafkaBootstrapServers();

/**
* @return Connection string for connecting to Schema Registry.
* @deprecated use {@link #schemaRegistryServer}
*/
@Deprecated
String schemeRegistryServer();

/**
* @return Connection string for connecting to Schema Registry.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public SchemaRegistryContainer withNetworkAliases(final String... aliases) {
*
* @param kafka kafka container
* @return self.
* @deprecated will be removed in future version.
*/
@Deprecated
public SchemaRegistryContainer withKafka(final KafkaContainer kafka) {
withNetwork(kafka.getNetwork());
withEnv(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2023 SpecMesh Contributors (https://github.com/specmesh)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.specmesh.kafka;

import java.time.Duration;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;

class DockerKafkaEnvironmentTest {

private static final int STARTUP_ATTEMPTS = 1;
private static final Duration STARTUP_DURATION = Duration.ofSeconds(15);

@Test
void shouldWorkWithoutSasl() {
assertStarts(builder -> {});
}

@Test
void shouldWorkWithSasl() {
assertStarts(
builder -> builder.withSaslAuthentication("admin", "admin-secret").withKafkaAcls());
}

private void assertStarts(final Consumer<DockerKafkaEnvironment.Builder> customizer) {
final DockerKafkaEnvironment.Builder builder =
DockerKafkaEnvironment.builder()
.withContainerStartUpAttempts(STARTUP_ATTEMPTS)
.withContainerStartUpTimeout(STARTUP_DURATION);

customizer.accept(builder);

try (DockerKafkaEnvironment kafkaEnvironment = builder.build()) {
try {
// When:
kafkaEnvironment.start();

// Then: did not throw
} catch (Exception e) {
System.out.println("----------Kafka Logs---------------");
System.out.println(kafkaEnvironment.kafkaLogs());
System.out.println("----------Schema Registry Logs---------------");
System.out.println(kafkaEnvironment.schemaRegistryLogs());
throw e;
}
}
}
}

0 comments on commit 368807a

Please sign in to comment.