-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18225: ClientQuotaCallback#updateClusterMetadata is unsupported by kraft #18196
base: trunk
Are you sure you want to change the base?
Conversation
core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
Outdated
Show resolved
Hide resolved
@@ -565,3 +531,97 @@ class KRaftMetadataCache( | |||
} | |||
} | |||
|
|||
object KRaftMetadataCache { | |||
|
|||
def toCluster(clusterId: String, image: MetadataImage): Cluster = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This result is different with KraftMetadataCache#toCluster
(the old will filter partitions base on listener
).
If we decide to change this one, maybe we need to document it.
@m1a2st please fix the conflicts |
# Conflicts: # core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
Thanks for @chia7712 reminder, resolve the conflict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st thanks for this fix!
import scala.collection.mutable.ArrayBuffer | ||
import scala.jdk.CollectionConverters._ | ||
|
||
@Disabled("KAFKA-18213") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice to see this test gets fixed. Could you please close KAFKA-18213 as duplicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I move it to L104, I didn't fix it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will take a look in this fail test
@@ -48,6 +51,11 @@ class DynamicClientQuotaPublisher( | |||
): Unit = { | |||
val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" | |||
try { | |||
val clientQuotaCallback = conf.getConfiguredInstance(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, classOf[ClientQuotaCallback]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use the callback in quotaManagers
rather than creating a new one! They are different instances and so this approach can update the callback used by quotaManagers
@@ -235,6 +235,13 @@ public Optional<Node> node(String listenerName) { | |||
} | |||
return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null), fenced)); | |||
} | |||
|
|||
public List<Node> nodes() { | |||
List<Node> nodes = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can leverage existent method. for example:
return listeners.keySet().stream().flatMap(l -> node(l).stream()).toList();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st thanks for this patch. I test your patch on my local and please fix following code also:
deleteTopic
needs to use authorized admin- You must call removeQuotaOverrides after creating
group1_user2
. Otherwise, theremoveQuota
method of the custom callback will not be invoked. This is an interesting discrepancy. In ZooKeeper mode,removeQuota
is called when altering SASL/SCRAM credentials. However, in Kraft mode, this behavior is absent. I'm uncertain whether this constitutes a breaking change. It appears to be an unusual behavior, as it is triggered by the addition of users. Instead of implementing this peculiar behavior, I suggest updating the documentation of the callback to reflect the actual implementation.
cc @dajac and @cmccabe
@@ -179,13 +212,12 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { | |||
} | |||
|
|||
private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = { | |||
// TODO createTopic | |||
TestUtils.createTopicWithAdmin(createAdminClient(), topic, brokers, controllerServers, numPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have to honor the leader
to make replica leader hosted by correct broker - otherwise, this test will get flaky in the future
core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
Outdated
Show resolved
Hide resolved
Thanks for @chia7712 review, addressed all comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st thanks for this patch.
@@ -337,6 +371,7 @@ object GroupedUserQuotaCallback { | |||
val DefaultProduceQuotaProp = "default.produce.quota" | |||
val DefaultFetchQuotaProp = "default.fetch.quota" | |||
val UnlimitedQuotaMetricTags = new util.HashMap[String, String] | |||
val updateClusterMetadataCalls = new AtomicInteger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why to add this count?
quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => { | ||
if (delta.topicsDelta() != null || delta.clusterDelta() != null) { | ||
val cluster = KRaftMetadataCache.toCluster(clusterId, newImage) | ||
clientQuotaCallback.updateClusterMetadata(cluster) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this line is required.
@@ -136,9 +148,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { | |||
// Create large number of partitions on another broker, should result in throttling on first partition | |||
val largeTopic = "group1_largeTopic" | |||
createTopic(largeTopic, numPartitions = 99, leader = 0) | |||
user.removeThrottleMetrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unnecessary if we remove the unnecessary call of updateClusterMetadata
user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota) | ||
user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true) | ||
|
||
user.removeQuotaOverrides() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we call removeQuotaOverrides
after creating new user for consistency?
@m1a2st could you please revise the docs of |
@m1a2st could you please fix the conflicts? |
# Conflicts: # core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@m1a2st please fix the conflicts :( |
# Conflicts: # core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@m1a2st please fix the conflicts |
# Conflicts: # core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta => | ||
clientQuotaMetadataManager.update(clientQuotasDelta) | ||
quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => { | ||
if (delta.topicsDelta() != null || delta.clusterDelta() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving this callback trigger to a individual publisher? That can decouple the callback and other publisher. Also, that can avoid producing incorrect error message?
# Conflicts: # server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st thanks for this patch
import org.apache.kafka.image.loader.LoaderManifest | ||
import org.apache.kafka.server.fault.FaultHandler | ||
|
||
class DynamicTopicClusterQuotaPublisher ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments to this new publisher. we should emphasize this approach is temporary and there is a follow-up jira
)) | ||
|
||
// Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics. | ||
metadataPublishers.add(new DynamicTopicClusterQuotaPublisher( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add test for controller
@@ -545,3 +511,96 @@ class KRaftMetadataCache( | |||
} | |||
} | |||
|
|||
object KRaftMetadataCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#18632 is trying to remove reference of KRaftMetadataCache
, so maybe we can move the helpers to MetadataCache
?
clusterId, | ||
config, | ||
sharedServer.metadataPublishingFaultHandler, | ||
"broker", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Controller?
Jira: https://issues.apache.org/jira/browse/KAFKA-18225
we don't implement the
ClientQuotaCallback#updateClusterMetadata
in Kraft mode. We will implement it in 4.0 version to passCustomQuotaCallbackTest
test in Kraft mode.Committer Checklist (excluded from commit message)