Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18225: ClientQuotaCallback#updateClusterMetadata is unsupported by kraft #18196

Open
wants to merge 41 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5fe6d1a
wip
m1a2st Dec 15, 2024
b3edea5
add clusterId to DynamicClientQuotaPublisher
m1a2st Dec 15, 2024
15d7dbb
Draft a version
m1a2st Dec 15, 2024
4123c70
revert test
m1a2st Dec 16, 2024
765d7f1
Merge branch 'trunk' into KAFKA-18225
m1a2st Dec 16, 2024
513b381
add new test
m1a2st Dec 16, 2024
74b1006
update the test
m1a2st Dec 17, 2024
d1cfe97
Merge branch 'trunk' into KAFKA-18225
m1a2st Dec 18, 2024
a4758e7
Merge branch 'trunk' into KAFKA-18225
m1a2st Dec 31, 2024
dee9a65
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 8, 2025
cd853b6
resolve conflict
m1a2st Jan 8, 2025
48a8b41
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 9, 2025
6bf41a7
addressed by comments
m1a2st Jan 9, 2025
3fe1d7c
addressed by comments
m1a2st Jan 11, 2025
ed50ea8
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 11, 2025
f87db91
fix var name
m1a2st Jan 12, 2025
57fad78
update delete topic
m1a2st Jan 13, 2025
f468e55
addressed by comment
m1a2st Jan 13, 2025
0c136b7
addressed by comment
m1a2st Jan 13, 2025
ac9f55b
addressed by comment
m1a2st Jan 13, 2025
909d8f9
addressed by comment
m1a2st Jan 13, 2025
5e88ee1
update when updateClusterMetadata success
m1a2st Jan 13, 2025
b3365f0
addressed by comment
m1a2st Jan 15, 2025
8e36e2f
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 16, 2025
977a603
update the doc
m1a2st Jan 16, 2025
de16d0e
addressed the comment
m1a2st Jan 16, 2025
9f04283
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 18, 2025
830c99c
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 19, 2025
3a3e409
fix the test
m1a2st Jan 19, 2025
e48ca88
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 20, 2025
6f679f1
fix the conflict
m1a2st Jan 20, 2025
be432e6
separator logic
m1a2st Jan 20, 2025
af4cee5
add license
m1a2st Jan 20, 2025
39f542e
revert unused change
m1a2st Jan 20, 2025
0bab1c7
revert unused change
m1a2st Jan 20, 2025
1ea0067
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 21, 2025
929e87a
wip
m1a2st Jan 24, 2025
101cbdc
addressed by comments
m1a2st Jan 24, 2025
9c28db2
add new test
m1a2st Jan 24, 2025
9514964
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 26, 2025
498ab89
resolve the conflict
m1a2st Jan 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public interface ClientQuotaCallback extends Configurable {
boolean quotaResetRequired(ClientQuotaType quotaType);

/**
* Metadata update callback that is invoked whenever UpdateMetadata request is received from
* the controller. This is useful if quota computation takes partitions into account.
* Metadata update callback that is invoked whenever the topic and cluster delta changed.
* This is useful if quota computation takes partitions into account.
* Topics that are being deleted will not be included in `cluster`.
*
* @param cluster Cluster metadata including partitions and their leaders if known
* @param cluster Cluster metadata including topic and cluster
* @return true if quotas have changed and metric configs may need to be updated
*/
boolean updateClusterMetadata(Cluster cluster);
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,15 @@ class BrokerServer(
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
clientQuotaMetadataManager),
clientQuotaMetadataManager,
),
new DynamicTopicClusterQuotaPublisher(
clusterId,
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
quotaManagers,
),
new ScramPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
Expand Down Expand Up @@ -332,7 +332,17 @@ class ControllerServer(
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
clientQuotaMetadataManager))
clientQuotaMetadataManager
))

// Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics.
metadataPublishers.add(new DynamicTopicClusterQuotaPublisher(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add test for controller

clusterId,
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Controller?

quotaManagers,
))

// Set up the SCRAM publisher.
metadataPublishers.add(new ScramPublisher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class BrokerMetadataPublisher(
shareCoordinator: Option[ShareCoordinator],
var dynamicConfigPublisher: DynamicConfigPublisher,
dynamicClientQuotaPublisher: DynamicClientQuotaPublisher,
dynamicTopicClusterQuotaPublisher: DynamicTopicClusterQuotaPublisher,
scramPublisher: ScramPublisher,
delegationTokenPublisher: DelegationTokenPublisher,
aclPublisher: AclPublisher,
Expand Down Expand Up @@ -199,6 +200,9 @@ class BrokerMetadataPublisher(
// Apply client quotas delta.
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)

// Apply topic or cluster quotas delta.
dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage)

// Apply SCRAM delta.
scramPublisher.onMetadataUpdate(delta, newImage)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package kafka.server.metadata

import kafka.server.KafkaConfig
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.server.fault.FaultHandler

class DynamicTopicClusterQuotaPublisher (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments to this new publisher. we should emphasize this approach is temporary and there is a follow-up jira

clusterId: String,
conf: KafkaConfig,
faultHandler: FaultHandler,
nodeType: String,
quotaManagers: QuotaManagers
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "

override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType id=${conf.nodeId}"

override def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
manifest: LoaderManifest
): Unit = {
onMetadataUpdate(delta, newImage)
}

def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
): Unit = {
val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
try {
quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => {
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
val cluster = KRaftMetadataCache.toCluster(clusterId, newImage)
if (clientQuotaCallback.updateClusterMetadata(cluster)) {
quotaManagers.fetch.updateQuotaMetricConfigs()
quotaManagers.produce.updateQuotaMetricConfigs()
quotaManagers.request.updateQuotaMetricConfigs()
quotaManagers.controllerMutation.updateQuotaMetricConfigs()
}
}
})
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
s"publishing dynamic topic or cluster changes from $deltaName", t)
}
}
}
133 changes: 96 additions & 37 deletions core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server.metadata

import kafka.server.MetadataCache
import kafka.server.metadata.KRaftMetadataCache.{getOfflineReplicas, getRandomAliveBroker}
import kafka.utils.Logging
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common._
Expand Down Expand Up @@ -206,28 +207,7 @@ class KRaftMetadataCache(
}
}
}

private def getOfflineReplicas(image: MetadataImage,
partition: PartitionRegistration,
listenerName: ListenerName): util.List[Integer] = {
val offlineReplicas = new util.ArrayList[Integer](0)
for (brokerId <- partition.replicas) {
Option(image.cluster().broker(brokerId)) match {
case None => offlineReplicas.add(brokerId)
case Some(broker) => if (isReplicaOffline(partition, listenerName, broker)) {
offlineReplicas.add(brokerId)
}
}
}
offlineReplicas
}

private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) =
broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition)

private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean =
!broker.hasOnlineDir(partition.directory(broker.id()))


/**
* Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
* be added dynamically, so a broker with a missing listener could be a transient error.
Expand Down Expand Up @@ -361,12 +341,7 @@ class KRaftMetadataCache(
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1
}

override def getAliveBrokers(): Iterable[BrokerMetadata] = getAliveBrokers(_currentImage)

private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = {
image.cluster().brokers().values().asScala.filterNot(_.fenced()).
map(b => new BrokerMetadata(b.id, b.rack))
}
override def getAliveBrokers(): Iterable[BrokerMetadata] = KRaftMetadataCache.getAliveBrokers(_currentImage)

override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = {
Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
Expand Down Expand Up @@ -443,15 +418,6 @@ class KRaftMetadataCache(
getRandomAliveBroker(_currentImage)
}

private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
val aliveBrokers = getAliveBrokers(image).toList
if (aliveBrokers.isEmpty) {
None
} else {
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
}
}

def getAliveBrokerEpoch(brokerId: Int): Option[Long] = {
Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
map(brokerRegistration => brokerRegistration.epoch())
Expand Down Expand Up @@ -545,3 +511,96 @@ class KRaftMetadataCache(
}
}

object KRaftMetadataCache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#18632 is trying to remove reference of KRaftMetadataCache, so maybe we can move the helpers to MetadataCache?


def toCluster(clusterId: String, image: MetadataImage): Cluster = {
Copy link
Contributor

@TaiJuWu TaiJuWu Dec 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result is different with KraftMetadataCache#toCluster (the old will filter partitions base on listener).
If we decide to change this one, maybe we need to document it.

val brokerToNodes = new util.HashMap[Integer, java.util.List[Node]]
image.cluster().brokers()
.values().stream()
.filter(broker => !broker.fenced())
.forEach { broker => brokerToNodes.put(broker.id(), broker.nodes()) }

def getNodes(id: Int): java.util.List[Node] = brokerToNodes.get(id)

val partitionInfos = new util.ArrayList[PartitionInfo]
val internalTopics = new util.HashSet[String]

def toArray(replicas: Array[Int]): Array[Node] = {
util.Arrays.stream(replicas)
.mapToObj(replica => getNodes(replica))
.flatMap(replica => replica.stream()).toArray(size => new Array[Node](size))
}

val topicImages = image.topics().topicsByName().values()
if (topicImages != null) {
topicImages.forEach { topic =>
topic.partitions().forEach { (key, value) =>
val partitionId = key
val partition = value
val nodes = getNodes(partition.leader)
if (nodes != null) {
nodes.forEach(node => {
partitionInfos.add(new PartitionInfo(topic.name(),
partitionId,
node,
toArray(partition.replicas),
toArray(partition.isr),
getOfflineReplicas(image, partition).stream()
.map(replica => getNodes(replica))
.flatMap(replica => replica.stream()).toArray(size => new Array[Node](size))))
})
if (Topic.isInternal(topic.name())) {
internalTopics.add(topic.name())
}
}
}
}
}

val controllerNode = getNodes(getRandomAliveBroker(image).getOrElse(-1)) match {
case null => Node.noNode()
case nodes => nodes.get(0)
}
// Note: the constructor of Cluster does not allow us to reference unregistered nodes.
// So, for example, if partition foo-0 has replicas [1, 2] but broker 2 is not
// registered, we pass its replicas as [1, -1]. This doesn't make a lot of sense, but
// we are duplicating the behavior of ZkMetadataCache, for now.
new Cluster(clusterId, brokerToNodes.values().stream().flatMap(n => n.stream()).collect(util.stream.Collectors.toList()),
partitionInfos, Collections.emptySet(), internalTopics, controllerNode)
}

private def getOfflineReplicas(image: MetadataImage,
partition: PartitionRegistration,
listenerName: ListenerName = null): util.List[Integer] = {
val offlineReplicas = new util.ArrayList[Integer](0)
for (brokerId <- partition.replicas) {
Option(image.cluster().broker(brokerId)) match {
case None => offlineReplicas.add(brokerId)
case Some(broker) => if (listenerName == null || isReplicaOffline(partition, listenerName, broker)) {
offlineReplicas.add(brokerId)
}
}
}
offlineReplicas
}

private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) =
broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition)

private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean =
!broker.hasOnlineDir(partition.directory(broker.id()))

private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
val aliveBrokers = getAliveBrokers(image).toList
if (aliveBrokers.isEmpty) {
None
} else {
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
}
}

private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = {
image.cluster().brokers().values().asScala.filterNot(_.fenced()).
map(b => new BrokerMetadata(b.id, b.rack))
}
}
Loading
Loading