Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18533: Remove KafkaConfig zookeeper related logic #18547

Merged
merged 18 commits into from
Jan 25, 2025
Merged
17 changes: 3 additions & 14 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv
Expand Down Expand Up @@ -188,14 +188,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] =
super.valuesWithPrefixOverride(prefix)

/** ********* Zookeeper Configuration ***********/
val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_CONFIG)
val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG)
val zkConnectionTimeoutMs: Int =
Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG))
val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)
Copy link
Member

@ijuma ijuma Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also remove them from ZkConfigs?


private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig

Expand Down Expand Up @@ -231,9 +223,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)

def requiresZookeeper: Boolean = processRoles.isEmpty
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty

private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
Expand Down Expand Up @@ -610,7 +599,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
.map { case (listenerName, protocolName) =>
ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)
}
if (usesSelfManagedQuorum && !originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) {
if (!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) {
// Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value,
// and we are using KRaft.
// Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use
Expand Down Expand Up @@ -734,7 +723,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])

val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
// validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
// validations for all broker setups (i.e. KRaft broker-only and KRaft co-located)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove KRaft?

validateAdvertisedBrokerListenersNonEmptyForBroker()
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
Expand Down
8 changes: 1 addition & 7 deletions core/src/main/scala/kafka/server/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,7 @@ object Server {
): KafkaMetricsContext = {
val contextLabels = new java.util.HashMap[String, Object]
contextLabels.put(ClusterIdLabel, clusterId)

if (config.usesSelfManagedQuorum) {
contextLabels.put(NodeIdLabel, config.nodeId.toString)
} else {
contextLabels.put(BrokerIdLabel, config.brokerId.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove BrokerIdLabel and MockMetricsReporter.BROKERID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed it :)

}

contextLabels.put(BrokerIdLabel, config.brokerId.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this seems like a bug - we may want to add a unit test if nothing failed. We should be setting NodeIdLabel, not BrokerIdLabel. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ServerTest.scala will test this bug, I think we won't need to add a new test for it.

contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
new KafkaMetricsContext(MetricsPrefix, contextLabels)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,6 @@ class KafkaConfigTest {
defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString)

val config = KafkaConfig.fromProps(defaults)
assertEquals(1234, config.zkConnectionTimeoutMs)
assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId)
assertEquals(1, config.brokerId)
Expand Down
Loading