From 09430d6cb644968093a1594ba188abfa1561e155 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Wed, 5 Jun 2024 15:44:23 +0200 Subject: [PATCH] Scram Credentials list separated by semicolon instead of comma --- .../test/container/KafkaNativeContainerIT.java | 8 ++++---- .../kafka/server/EmbeddedKafkaBroker.java | 8 ++++---- .../com/ozangunalp/kafka/server/ServerConfig.java | 13 ++++++++++--- .../java/com/ozangunalp/kafka/server/Startup.java | 2 +- .../java/com/ozangunalp/kafka/server/Storage.java | 4 +--- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/kafka-native-test-container/src/test/java/com/ozangunalp/kafka/test/container/KafkaNativeContainerIT.java b/kafka-native-test-container/src/test/java/com/ozangunalp/kafka/test/container/KafkaNativeContainerIT.java index c6b84a2..38a316a 100644 --- a/kafka-native-test-container/src/test/java/com/ozangunalp/kafka/test/container/KafkaNativeContainerIT.java +++ b/kafka-native-test-container/src/test/java/com/ozangunalp/kafka/test/container/KafkaNativeContainerIT.java @@ -132,7 +132,7 @@ void testMinimumMetadataVersion() { @Test void testSaslScramContainer() { try (var container = createKafkaNativeContainer() - .withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client\\,password=client-secret]") + .withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client,password=client-secret]") .withServerProperties(MountableFile.forClasspathResource("sasl_scram_plaintext.properties")) .withAdvertisedListeners(c -> String.format("SASL_PLAINTEXT://%s:%s", c.getHost(), c.getExposedKafkaPort()))) { container.start(); @@ -146,7 +146,7 @@ void testSaslScramContainer() { @Test void testSaslScramContainerNotSupported() { try (var container = createKafkaNativeContainer() - .withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client\\,password=client-secret]") + .withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client,password=client-secret]") .withServerProperties(MountableFile.forClasspathResource("metadata_version_3.3.properties")) .withStartupTimeout(Duration.ofSeconds(10)) .withAdvertisedListeners(c -> String.format("SASL_PLAINTEXT://%s:%s", c.getHost(), c.getExposedKafkaPort()))) { @@ -172,7 +172,7 @@ void testSaslScramContainerCluster() throws ExecutionException, InterruptedExcep var common = Map.of( "SERVER_CLUSTER_ID", clusterId, "SERVER_SCRAM_CREDENTIALS", - "SCRAM-SHA-512=[name=client\\,password=client-secret],SCRAM-SHA-512=[name=broker\\,password=broker-secret]", + "SCRAM-SHA-512=[name=client,password=client-secret];SCRAM-SHA-512=[name=broker,password=broker-secret]", "KAFKA_CONTROLLER_QUORUM_VOTERS", quorumVotes); common.forEach(b1::addEnv); @@ -301,7 +301,7 @@ void testZookeeperContainerWithScram() { try (var container = createKafkaNativeContainer() .withNetwork(Network.SHARED) .withArgs("-Dkafka.zookeeper.connect=zookeeper:2181") - .withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client\\,password=client-secret]") + .withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client,password=client-secret]") .withServerProperties(MountableFile.forClasspathResource("sasl_scram_plaintext.properties")) .withAdvertisedListeners(c -> String.format("SASL_PLAINTEXT://%s:%s", c.getHost(), c.getExposedKafkaPort()))) { container.start(); diff --git a/kafka-server/src/main/java/com/ozangunalp/kafka/server/EmbeddedKafkaBroker.java b/kafka-server/src/main/java/com/ozangunalp/kafka/server/EmbeddedKafkaBroker.java index fec15bc..81bd149 100644 --- a/kafka-server/src/main/java/com/ozangunalp/kafka/server/EmbeddedKafkaBroker.java +++ b/kafka-server/src/main/java/com/ozangunalp/kafka/server/EmbeddedKafkaBroker.java @@ -1,6 +1,6 @@ package com.ozangunalp.kafka.server; -import static kafka.zk.KafkaZkClient.*; +import static kafka.zk.KafkaZkClient.createZkClient; import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT; import java.io.Closeable; @@ -11,9 +11,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import kafka.server.KafkaServer; -import kafka.server.Server; -import kafka.zk.AdminZkClient; import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.Uuid; @@ -29,6 +26,9 @@ import kafka.cluster.EndPoint; import kafka.server.KafkaConfig; import kafka.server.KafkaRaftServer; +import kafka.server.KafkaServer; +import kafka.server.Server; +import kafka.zk.AdminZkClient; import scala.Option; import scala.jdk.javaapi.StreamConverters; diff --git a/kafka-server/src/main/java/com/ozangunalp/kafka/server/ServerConfig.java b/kafka-server/src/main/java/com/ozangunalp/kafka/server/ServerConfig.java index 4999cb6..4af9cbe 100644 --- a/kafka-server/src/main/java/com/ozangunalp/kafka/server/ServerConfig.java +++ b/kafka-server/src/main/java/com/ozangunalp/kafka/server/ServerConfig.java @@ -1,6 +1,8 @@ package com.ozangunalp.kafka.server; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; @@ -32,16 +34,21 @@ public interface ServerConfig { boolean autoConfigure(); /** - * List of scram credentials. + * List of scram credentials, separated by semicolon. *
* Format of the scram string must be in one of the following forms: * * SCRAM-SHA-256=[user=alice,password=alice-secret] * SCRAM-SHA-512=[user=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="] * </plaintext> - + * * @return list of scram credentials */ - Optional<List<String>> scramCredentials(); + Optional<String> scramCredentials(); + + default List<String> scramCredentialsList() { + return scramCredentials().map(s -> Arrays.stream(s.split(";")).toList()) + .orElse(Collections.emptyList()); + } } diff --git a/kafka-server/src/main/java/com/ozangunalp/kafka/server/Startup.java b/kafka-server/src/main/java/com/ozangunalp/kafka/server/Startup.java index 18bb076..789ef19 100644 --- a/kafka-server/src/main/java/com/ozangunalp/kafka/server/Startup.java +++ b/kafka-server/src/main/java/com/ozangunalp/kafka/server/Startup.java @@ -28,13 +28,13 @@ void startup(@Observes StartupEvent event) { .withInternalPort(config.internalPort()) .withKafkaHost(config.host().orElse("")) .withAutoConfigure(config.autoConfigure()) + .withScramCredentials(config.scramCredentialsList()) .withConfig(properties -> { properties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, Reporter.class.getName()); config.propertiesFile().ifPresent(Unchecked.consumer(file -> properties.putAll(Utils.loadProps(file.toFile().getAbsolutePath())))); }); config.clusterId().ifPresent(id -> broker.withClusterId(id)); - config.scramCredentials().ifPresent(credentials -> broker.withScramCredentials(credentials)); broker.start(); } diff --git a/kafka-server/src/main/java/com/ozangunalp/kafka/server/Storage.java b/kafka-server/src/main/java/com/ozangunalp/kafka/server/Storage.java index 573a22b..55491aa 100644 --- a/kafka-server/src/main/java/com/ozangunalp/kafka/server/Storage.java +++ b/kafka-server/src/main/java/com/ozangunalp/kafka/server/Storage.java @@ -15,14 +15,12 @@ import org.apache.kafka.common.metadata.UserScramCredentialRecord; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.properties.MetaProperties; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; import org.jboss.logging.Logger; import kafka.server.KafkaConfig; - -import org.apache.kafka.metadata.properties.MetaProperties; - import kafka.tools.StorageTool; import scala.collection.immutable.Seq;