Skip to content

Commit

Permalink
Scram Credentials list separated by semicolon instead of comma
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Jun 6, 2024
1 parent a9abdd7 commit 09430d6
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ void testMinimumMetadataVersion() {
@Test
void testSaslScramContainer() {
try (var container = createKafkaNativeContainer()
.withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client\\,password=client-secret]")
.withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client,password=client-secret]")
.withServerProperties(MountableFile.forClasspathResource("sasl_scram_plaintext.properties"))
.withAdvertisedListeners(c -> String.format("SASL_PLAINTEXT://%s:%s", c.getHost(), c.getExposedKafkaPort()))) {
container.start();
Expand All @@ -146,7 +146,7 @@ void testSaslScramContainer() {
@Test
void testSaslScramContainerNotSupported() {
try (var container = createKafkaNativeContainer()
.withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client\\,password=client-secret]")
.withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client,password=client-secret]")
.withServerProperties(MountableFile.forClasspathResource("metadata_version_3.3.properties"))
.withStartupTimeout(Duration.ofSeconds(10))
.withAdvertisedListeners(c -> String.format("SASL_PLAINTEXT://%s:%s", c.getHost(), c.getExposedKafkaPort()))) {
Expand All @@ -172,7 +172,7 @@ void testSaslScramContainerCluster() throws ExecutionException, InterruptedExcep
var common = Map.of(
"SERVER_CLUSTER_ID", clusterId,
"SERVER_SCRAM_CREDENTIALS",
"SCRAM-SHA-512=[name=client\\,password=client-secret],SCRAM-SHA-512=[name=broker\\,password=broker-secret]",
"SCRAM-SHA-512=[name=client,password=client-secret];SCRAM-SHA-512=[name=broker,password=broker-secret]",
"KAFKA_CONTROLLER_QUORUM_VOTERS", quorumVotes);

common.forEach(b1::addEnv);
Expand Down Expand Up @@ -301,7 +301,7 @@ void testZookeeperContainerWithScram() {
try (var container = createKafkaNativeContainer()
.withNetwork(Network.SHARED)
.withArgs("-Dkafka.zookeeper.connect=zookeeper:2181")
.withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client\\,password=client-secret]")
.withEnv("SERVER_SCRAM_CREDENTIALS", "SCRAM-SHA-512=[name=client,password=client-secret]")
.withServerProperties(MountableFile.forClasspathResource("sasl_scram_plaintext.properties"))
.withAdvertisedListeners(c -> String.format("SASL_PLAINTEXT://%s:%s", c.getHost(), c.getExposedKafkaPort()))) {
container.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.ozangunalp.kafka.server;

import static kafka.zk.KafkaZkClient.*;
import static kafka.zk.KafkaZkClient.createZkClient;
import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;

import java.io.Closeable;
Expand All @@ -11,9 +11,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import kafka.server.KafkaServer;
import kafka.server.Server;
import kafka.zk.AdminZkClient;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
Expand All @@ -29,6 +26,9 @@
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.KafkaServer;
import kafka.server.Server;
import kafka.zk.AdminZkClient;
import scala.Option;
import scala.jdk.javaapi.StreamConverters;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.ozangunalp.kafka.server;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -32,16 +34,21 @@ public interface ServerConfig {
boolean autoConfigure();

/**
* List of scram credentials.
* List of scram credentials, separated by semicolon.
* <br/>
* Format of the scram string must be in one of the following forms:
* <plaintext>
* SCRAM-SHA-256=[user=alice,password=alice-secret]
* SCRAM-SHA-512=[user=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]
* </plaintext>
*
* @return list of scram credentials
*/
Optional<List<String>> scramCredentials();
Optional<String> scramCredentials();

default List<String> scramCredentialsList() {
return scramCredentials().map(s -> Arrays.stream(s.split(";")).toList())
.orElse(Collections.emptyList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ void startup(@Observes StartupEvent event) {
.withInternalPort(config.internalPort())
.withKafkaHost(config.host().orElse(""))
.withAutoConfigure(config.autoConfigure())
.withScramCredentials(config.scramCredentialsList())
.withConfig(properties -> {
properties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, Reporter.class.getName());
config.propertiesFile().ifPresent(Unchecked.consumer(file ->
properties.putAll(Utils.loadProps(file.toFile().getAbsolutePath()))));
});
config.clusterId().ifPresent(id -> broker.withClusterId(id));
config.scramCredentials().ifPresent(credentials -> broker.withScramCredentials(credentials));
broker.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.jboss.logging.Logger;

import kafka.server.KafkaConfig;

import org.apache.kafka.metadata.properties.MetaProperties;

import kafka.tools.StorageTool;
import scala.collection.immutable.Seq;

Expand Down

0 comments on commit 09430d6

Please sign in to comment.