Skip to content

Commit

Permalink
update the DynamicBrokerConfig comment
Browse files Browse the repository at this point in the history
  • Loading branch information
m1a2st committed Jan 21, 2025
1 parent e84a37c commit 9391127
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 82 deletions.
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object QuotaTypes {
object ClientQuotaManager {
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600

val DefaultString = "<default>"
val DefaultClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(None, Some(DefaultClientIdEntity))
val DefaultUserQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
val DefaultUserClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
Expand All @@ -76,13 +76,13 @@ object ClientQuotaManager {

case object DefaultUserEntity extends BaseUserEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
override def name: String = KraftInternals.DEFAULT_STRING
override def name: String = DefaultString
override def toString: String = "default user"
}

case object DefaultClientIdEntity extends ClientQuotaEntity.ConfigEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID
override def name: String = KraftInternals.DEFAULT_STRING
override def name: String = DefaultString
override def toString: String = "default client-id"
}

Expand All @@ -93,7 +93,7 @@ object ClientQuotaManager {

def sanitizedUser: String = userEntity.map {
case entity: UserEntity => entity.sanitizedUser
case DefaultUserEntity => KraftInternals.DEFAULT_STRING
case DefaultUserEntity => DefaultString
}.getOrElse("")

def clientId: String = clientIdEntity.map(_.name).getOrElse("")
Expand Down Expand Up @@ -419,11 +419,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
lock.writeLock().lock()
try {
val userEntity = sanitizedUser.map {
case KraftInternals.DEFAULT_STRING => DefaultUserEntity
case DefaultString => DefaultUserEntity
case user => UserEntity(user)
}
val clientIdEntity = sanitizedClientId.map {
case KraftInternals.DEFAULT_STRING => DefaultClientIdEntity
case DefaultString => DefaultClientIdEntity
case _ => ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided")))
}
val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity)
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {
def processConfigChanges(brokerId: String, properties: Properties): Unit = {
if (brokerId == KraftInternals.DEFAULT_STRING)
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
if (brokerConfig.brokerId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
}
val updatedDynamicBrokerConfigs = brokerConfig.dynamicConfig.currentDynamicBrokerConfigs
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/scala/kafka/server/DynamicConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.net.{InetAddress, UnknownHostException}
import java.util.Properties
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.config.{QuotaConfig, KraftInternals}
import org.apache.kafka.server.config.QuotaConfig

import java.util
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -85,12 +85,10 @@ object DynamicConfig {
def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false)

def isValidIpEntity(ip: String): Boolean = {
if (ip != KraftInternals.DEFAULT_STRING) {
try {
InetAddress.getByName(ip)
} catch {
case _: UnknownHostException => return false
}
try {
InetAddress.getByName(ip)
} catch {
case _: UnknownHostException => return false
}
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server.metadata

import kafka.network.ConnectionQuotas
import kafka.server.ClientQuotaManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.common.metrics.Quota
Expand All @@ -26,7 +27,7 @@ import org.apache.kafka.common.utils.Sanitizer

import java.net.{InetAddress, UnknownHostException}
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta}
import org.apache.kafka.server.config.{QuotaConfig, KraftInternals}
import org.apache.kafka.server.config.{KraftInternals, QuotaConfig}

import scala.jdk.OptionConverters.RichOptionalDouble

Expand Down Expand Up @@ -147,13 +148,13 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
// Convert entity into Options with sanitized values for QuotaManagers
val (sanitizedUser, sanitizedClientId) = quotaEntity match {
case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None)
case DefaultUserEntity => (Some(KraftInternals.DEFAULT_STRING), None)
case DefaultUserEntity => (Some(ClientQuotaManager.DefaultString), None)
case ClientIdEntity(clientId) => (None, Some(Sanitizer.sanitize(clientId)))
case DefaultClientIdEntity => (None, Some(KraftInternals.DEFAULT_STRING))
case DefaultClientIdEntity => (None, Some(ClientQuotaManager.DefaultString))
case ExplicitUserExplicitClientIdEntity(user, clientId) => (Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId)))
case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(KraftInternals.DEFAULT_STRING))
case DefaultUserExplicitClientIdEntity(clientId) => (Some(KraftInternals.DEFAULT_STRING), Some(Sanitizer.sanitize(clientId)))
case DefaultUserDefaultClientIdEntity => (Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING))
case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(ClientQuotaManager.DefaultString))
case DefaultUserExplicitClientIdEntity(clientId) => (Some(ClientQuotaManager.DefaultString), Some(Sanitizer.sanitize(clientId)))
case DefaultUserDefaultClientIdEntity => (Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString))
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,7 @@ class DynamicConfigPublisher(
)
case BROKER =>
dynamicConfigHandlers.get(ConfigType.BROKER).foreach(nodeConfigHandler =>
if (resource.name().isEmpty) {
try {
// Apply changes to "cluster configs" (also known as default BROKER configs).
// These are stored in KRaft with an empty name field.
info("Updating cluster configuration : " +
toLoggableProps(resource, props).mkString(","))
nodeConfigHandler.processConfigChanges(KraftInternals.DEFAULT_STRING, props)
} catch {
case t: Throwable => faultHandler.handleFault("Error updating " +
s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
s"in $deltaName", t)
}
} else if (resource.name() == conf.nodeId.toString) {
if (resource.name() == conf.nodeId.toString) {
try {
// Apply changes to this node's dynamic configuration.
info(s"Updating node ${conf.nodeId} with new configuration : " +
Expand Down
40 changes: 20 additions & 20 deletions core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("ANONYMOUS", "p1", None, Some("p1"))
val client2 = UserClient("ANONYMOUS", "p2", None, Some("p2"))
val randomClient = UserClient("ANONYMOUS", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", None, Some(KraftInternals.DEFAULT_STRING))
val defaultConfigClient = UserClient("", "", None, Some(ClientQuotaManager.DefaultString))
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}

Expand All @@ -98,7 +98,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("User1", "p1", Some("User1"), None)
val client2 = UserClient("User2", "p2", Some("User2"), None)
val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(KraftInternals.DEFAULT_STRING), None)
val defaultConfigClient = UserClient("", "", Some(ClientQuotaManager.DefaultString), None)
val config = new ClientQuotaManagerConfig()
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}
Expand All @@ -112,7 +112,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("User1", "p1", Some("User1"), Some("p1"))
val client2 = UserClient("User2", "p2", Some("User2"), Some("p2"))
val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING))
val defaultConfigClient = UserClient("", "", Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString))
val config = new ClientQuotaManagerConfig()
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}
Expand All @@ -125,7 +125,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("User1", "p1", Some("User1"), None)
val client2 = UserClient("User2", "p2", Some("User2"), None)
val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(KraftInternals.DEFAULT_STRING), None)
val defaultConfigClient = UserClient("", "", Some(ClientQuotaManager.DefaultString), None)
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}

Expand All @@ -137,7 +137,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("User1", "p1", Some("User1"), Some("p1"))
val client2 = UserClient("User2", "p2", Some("User2"), Some("p2"))
val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING))
val defaultConfigClient = UserClient("", "", Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString))
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}

Expand Down Expand Up @@ -168,7 +168,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
assertEquals(Double.MaxValue, clientQuotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01)

// Set default <user> quota config
clientQuotaManager.updateQuota(Some(KraftInternals.DEFAULT_STRING), None, None, Some(new Quota(10, true)))
clientQuotaManager.updateQuota(Some(ClientQuotaManager.DefaultString), None, None, Some(new Quota(10, true)))
assertEquals(10 * numFullQuotaWindows, clientQuotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01)
} finally {
clientQuotaManager.shutdown()
Expand All @@ -186,11 +186,11 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
checkQuota(clientQuotaManager, "userA", "client1", Long.MaxValue, 1000, expectThrottle = false)

// Set default <user> quota config
clientQuotaManager.updateQuota(Some(KraftInternals.DEFAULT_STRING), None, None, Some(new Quota(10, true)))
clientQuotaManager.updateQuota(Some(ClientQuotaManager.DefaultString), None, None, Some(new Quota(10, true)))
checkQuota(clientQuotaManager, "userA", "client1", 10, 1000, expectThrottle = true)

// Remove default <user> quota config, back to no quotas
clientQuotaManager.updateQuota(Some(KraftInternals.DEFAULT_STRING), None, None, None)
clientQuotaManager.updateQuota(Some(ClientQuotaManager.DefaultString), None, None, None)
checkQuota(clientQuotaManager, "userA", "client1", Long.MaxValue, 1000, expectThrottle = false)
} finally {
clientQuotaManager.shutdown()
Expand Down Expand Up @@ -241,14 +241,14 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
metrics, QuotaType.PRODUCE, time, "")

try {
clientQuotaManager.updateQuota(Some(KraftInternals.DEFAULT_STRING), None, None, Some(new Quota(1000, true)))
clientQuotaManager.updateQuota(None, Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING), Some(new Quota(2000, true)))
clientQuotaManager.updateQuota(Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING), Some(new Quota(3000, true)))
clientQuotaManager.updateQuota(Some(ClientQuotaManager.DefaultString), None, None, Some(new Quota(1000, true)))
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString), Some(new Quota(2000, true)))
clientQuotaManager.updateQuota(Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString), Some(new Quota(3000, true)))
clientQuotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(4000, true)))
clientQuotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(5000, true)))
clientQuotaManager.updateQuota(Some("userB"), None, None, Some(new Quota(6000, true)))
clientQuotaManager.updateQuota(Some("userB"), Some("client1"), Some("client1"), Some(new Quota(7000, true)))
clientQuotaManager.updateQuota(Some("userB"), Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING), Some(new Quota(8000, true)))
clientQuotaManager.updateQuota(Some("userB"), Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString), Some(new Quota(8000, true)))
clientQuotaManager.updateQuota(Some("userC"), None, None, Some(new Quota(10000, true)))
clientQuotaManager.updateQuota(None, Some("client1"), Some("client1"), Some(new Quota(9000, true)))

Expand All @@ -266,14 +266,14 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
checkQuota(clientQuotaManager, "userE", "client1", 3000, 2500, expectThrottle = false)

// Remove default <user, client> quota config, revert to <user> default
clientQuotaManager.updateQuota(Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING), None)
clientQuotaManager.updateQuota(Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString), None)
checkQuota(clientQuotaManager, "userD", "client1", 1000, 0, expectThrottle = false) // Metrics tags changed, restart counter
checkQuota(clientQuotaManager, "userE", "client4", 1000, 1500, expectThrottle = true)
checkQuota(clientQuotaManager, "userF", "client4", 1000, 800, expectThrottle = false) // Default <user> quota shared across clients of user
checkQuota(clientQuotaManager, "userF", "client5", 1000, 800, expectThrottle = true)

// Remove default <user> quota config, revert to <client-id> default
clientQuotaManager.updateQuota(Some(KraftInternals.DEFAULT_STRING), None, None, None)
clientQuotaManager.updateQuota(Some(ClientQuotaManager.DefaultString), None, None, None)
checkQuota(clientQuotaManager, "userF", "client4", 2000, 0, expectThrottle = false) // Default <client-id> quota shared across client-id of all users
checkQuota(clientQuotaManager, "userF", "client5", 2000, 0, expectThrottle = false)
checkQuota(clientQuotaManager, "userF", "client5", 2000, 2500, expectThrottle = true)
Expand All @@ -290,7 +290,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
checkQuota(clientQuotaManager, "userA", "client6", 8000, 0, expectThrottle = true) // Throttled due to shared user quota
clientQuotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), Some(new Quota(11000, true)))
checkQuota(clientQuotaManager, "userA", "client6", 11000, 8500, expectThrottle = false)
clientQuotaManager.updateQuota(Some("userA"), Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING), Some(new Quota(12000, true)))
clientQuotaManager.updateQuota(Some("userA"), Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString), Some(new Quota(12000, true)))
clientQuotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), None)
checkQuota(clientQuotaManager, "userA", "client6", 12000, 4000, expectThrottle = true) // Throttled due to sum of new and earlier values

Expand All @@ -304,7 +304,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.PRODUCE, time, "")
val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
try {
clientQuotaManager.updateQuota(None, Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING),
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString),
Some(new Quota(500, true)))

// We have 10 second windows. Make sure that there is no quota violation
Expand Down Expand Up @@ -352,7 +352,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
def testExpireThrottleTimeSensor(): Unit = {
val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.PRODUCE, time, "")
try {
clientQuotaManager.updateQuota(None, Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING),
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString),
Some(new Quota(500, true)))

maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100)
Expand All @@ -374,7 +374,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
def testExpireQuotaSensors(): Unit = {
val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.PRODUCE, time, "")
try {
clientQuotaManager.updateQuota(None, Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING),
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString),
Some(new Quota(500, true)))

maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100)
Expand All @@ -401,7 +401,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.PRODUCE, time, "")
val clientId = "client@#$%"
try {
clientQuotaManager.updateQuota(None, Some(KraftInternals.DEFAULT_STRING), Some(KraftInternals.DEFAULT_STRING),
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.DefaultString), Some(ClientQuotaManager.DefaultString),
Some(new Quota(500, true)))

maybeRecord(clientQuotaManager, "ANONYMOUS", clientId, 100)
Expand All @@ -421,6 +421,6 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
// The class under test expects only sanitized client configs. We pass both the default value (which should not be
// sanitized to ensure it remains unique) and non-default values, so we need to take care in generating the sanitized
// client ID
def sanitizedConfigClientId = configClientId.map(x => if (x == KraftInternals.DEFAULT_STRING) KraftInternals.DEFAULT_STRING else Sanitizer.sanitize(x))
def sanitizedConfigClientId = configClientId.map(x => if (x == ClientQuotaManager.DefaultString) ClientQuotaManager.DefaultString else Sanitizer.sanitize(x))
}
}
Loading

0 comments on commit 9391127

Please sign in to comment.