Skip to content

Commit

Permalink
KAFKA-18533 Remove KafkaConfig zookeeper related logic (#18547)
Browse files Browse the repository at this point in the history
Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
m1a2st authored Jan 25, 2025
1 parent 43af241 commit c40e7a1
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 78 deletions.
17 changes: 3 additions & 14 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv
Expand Down Expand Up @@ -188,14 +188,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] =
super.valuesWithPrefixOverride(prefix)

/** ********* Zookeeper Configuration ***********/
val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_CONFIG)
val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG)
val zkConnectionTimeoutMs: Int =
Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG))
val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)

private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig

Expand Down Expand Up @@ -231,9 +223,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)

def requiresZookeeper: Boolean = processRoles.isEmpty
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty

private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
Expand Down Expand Up @@ -610,7 +599,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
.map { case (listenerName, protocolName) =>
ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)
}
if (usesSelfManagedQuorum && !originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) {
if (!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) {
// Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value,
// and we are using KRaft.
// Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use
Expand Down Expand Up @@ -734,7 +723,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])

val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
// validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
// validations for all broker setups (i.e. broker-only and co-located)
validateAdvertisedBrokerListenersNonEmptyForBroker()
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
Expand Down
9 changes: 1 addition & 8 deletions core/src/main/scala/kafka/server/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ trait Server {
object Server {
val MetricsPrefix: String = "kafka.server"
val ClusterIdLabel: String = "kafka.cluster.id"
val BrokerIdLabel: String = "kafka.broker.id"
val NodeIdLabel: String = "kafka.node.id"

def initializeMetrics(
Expand Down Expand Up @@ -69,13 +68,7 @@ object Server {
): KafkaMetricsContext = {
val contextLabels = new java.util.HashMap[String, Object]
contextLabels.put(ClusterIdLabel, clusterId)

if (config.usesSelfManagedQuorum) {
contextLabels.put(NodeIdLabel, config.nodeId.toString)
} else {
contextLabels.put(BrokerIdLabel, config.brokerId.toString)
}

contextLabels.put(NodeIdLabel, config.nodeId.toString)
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
new KafkaMetricsContext(MetricsPrefix, contextLabels)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs}
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}

Expand Down Expand Up @@ -79,7 +79,6 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
import DescribeAuthorizedOperationsTest._

override val brokerCount = 1
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)

var client: Admin = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils}
import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows}
import org.apache.logging.log4j.core.config.Configurator
Expand Down Expand Up @@ -4082,7 +4082,7 @@ object PlaintextAdminIntegrationTest {
new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET)
))
alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET)))
var alterResult = admin.incrementalAlterConfigs(alterConfigs)

assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
Expand Down Expand Up @@ -4111,7 +4111,7 @@ object PlaintextAdminIntegrationTest {
new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET)
))
alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET)))
alterResult = admin.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true))

assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package kafka.api
import kafka.security.JaasTestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Expand All @@ -26,7 +25,6 @@ import scala.jdk.CollectionConverters._
class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
private val kafkaClientSaslMechanism = "PLAIN"
private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ package kafka.api
import kafka.security.JaasTestUtils
import kafka.utils.TestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}

@Timeout(600)
class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.util.KafkaScheduler
Expand Down Expand Up @@ -253,7 +253,7 @@ class DynamicBrokerConfigTest {

val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix)
val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_CONFIG -> "somehost:2181")
val nonDynamicProps = Map(KRaftConfigs.NODE_ID_CONFIG -> "123")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps)

// Test update of configs with invalid type
Expand Down
30 changes: 14 additions & 16 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ import org.junit.jupiter.api.function.Executable
import scala.jdk.CollectionConverters._

class KafkaConfigTest {

def createDefaultConfig(): Properties = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:5000")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
props
}

@Test
def testLogRetentionTimeHoursProvided(): Unit = {
Expand Down Expand Up @@ -547,9 +558,7 @@ class KafkaConfigTest {

@Test
def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
val props = createDefaultConfig()

props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091,REPLICATION://localhost:9092")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL")
Expand All @@ -558,9 +567,7 @@ class KafkaConfigTest {

@Test
def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
val props = createDefaultConfig()

props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION")
Expand All @@ -569,9 +576,7 @@ class KafkaConfigTest {

@Test
def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
val props = new Properties()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
val props = createDefaultConfig()

props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL")
Expand Down Expand Up @@ -794,11 +799,6 @@ class KafkaConfigTest {

KafkaConfig.configNames.foreach { name =>
name match {
case ZkConfigs.ZK_CONNECT_CONFIG => // ignore string
case ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG => //ignore string
case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG => //ignore string
Expand Down Expand Up @@ -1181,7 +1181,6 @@ class KafkaConfigTest {
defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
// For ZkConnectionTimeoutMs
defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234")
defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false")
defaults.setProperty(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, "1")
defaults.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
Expand All @@ -1198,7 +1197,6 @@ class KafkaConfigTest {
defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString)

val config = KafkaConfig.fromProps(defaults)
assertEquals(1234, config.zkConnectionTimeoutMs)
assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId)
assertEquals(1, config.brokerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ object KafkaMetricsReporterTest {

MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext))
MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext))
MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", metricsContext))
MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext))
}

Expand All @@ -58,7 +57,6 @@ object KafkaMetricsReporterTest {

object MockMetricsReporter {
val JMXPREFIX: AtomicReference[String] = new AtomicReference[String]
val BROKERID : AtomicReference[String] = new AtomicReference[String]
val NODEID : AtomicReference[String] = new AtomicReference[String]
val CLUSTERID : AtomicReference[String] = new AtomicReference[String]
}
Expand All @@ -84,7 +82,6 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
@ValueSource(strings = Array("kraft"))
def testMetricsContextNamespacePresent(quorum: String): Unit = {
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/ServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
class ServerTest {

@Test
def testCreateSelfManagedKafkaMetricsContext(): Unit = {
def testCreateKafkaMetricsContext(): Unit = {
val nodeId = 0
val clusterId = Uuid.randomUuid().toString

Expand Down
Loading

0 comments on commit c40e7a1

Please sign in to comment.