Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18555: Avoid casting MetadataCache to KRaftMetadataCache #18632

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Jan 20, 2025

The ZkMetadataCache is removed in https://issues.apache.org/jira/browse/KAFKA-18373. We should pull up the relevant methods so that no casting from MetadataCache to KRaftMetadataCache is required.

Add following methods to MetadataCache:

  • getAliveBrokerEpoch
  • isBrokerFenced
  • isBrokerShuttingDown
  • describeTopicResponse

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added the core Kafka Broker label Jan 20, 2025
@FrankYang0529
Copy link
Member Author

@ijuma The only remaining casting is in DescribeTopicPartitionsRequestHandler. It uses KRaftMetadataCache#getTopicMetadataForDescribeTopicResponse. Do you think that we should either add it to MetadataCache or refactoring it to move related logic to DescribeTopicPartitionsRequestHandler? Or we can just keep it as current implementation? Thanks.

  val describeTopicPartitionsRequestHandler : DescribeTopicPartitionsRequestHandler = new DescribeTopicPartitionsRequestHandler(
    metadataCache.asInstanceOf[KRaftMetadataCache], authHelper, config)

@ijuma
Copy link
Member

ijuma commented Jan 20, 2025

It's ok to pull up that method to MetadataCache, but we should take the chance and make it more concise. Instead of metadataCache.getTopicMetadataForDescribeTopicResponse, we can simply call it metadataCache.describeTopicResponse.

@FrankYang0529 FrankYang0529 requested a review from ijuma January 21, 2025 03:27
Copy link
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few comments.

Some(new DescribeTopicPartitionsRequestHandler(kRaftMetadataCache, authHelper, config))
case _ => None
}
val describeTopicPartitionsRequestHandler : DescribeTopicPartitionsRequestHandler = new DescribeTopicPartitionsRequestHandler(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue, but since you're touching this file, remove the : DescribeTopicPartitionsRequestHandler (it's redundant).

@@ -272,7 +272,7 @@ class KRaftMetadataCache(
* @param maximumNumberOfPartitions The max number of partitions to return.
* @param ignoreTopicsWithExceptions Whether ignore the topics with exception.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this scaladoc to the interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Could you please remove those comments as you have moved them to the interface

KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_1);
updateKraftMetadataCache(metadataCache, records);
MetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_1);
updateKraftMetadataCache((KRaftMetadataCache) metadataCache, records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these changes seem to make things worse.

KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_1);
updateKraftMetadataCache(metadataCache, records);
MetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_1);
updateKraftMetadataCache((KRaftMetadataCache) metadataCache, records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the comment above.

@@ -348,6 +348,7 @@ class PartitionLockTest extends Logging {
val controllerEpoch = 0
val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava
val isr = replicas
replicas.forEach(replicaId => when(metadataCache.getAliveBrokerEpoch(replicaId)).thenReturn(Some(1L)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this extra code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use mock(classOf[MetadataCache]) for metadataCache, so it didn't really get into KRaftMetadataCache block. In this PR, we remove match block, so we have to add related mock.

val metadataCache: MetadataCache = mock(classOf[MetadataCache])

metadataCache match {
case kRaftMetadataCache: KRaftMetadataCache =>
val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
s"vs expected=${cachedBrokerEpoch.get}")
}
case _ =>
}

@@ -186,6 +186,7 @@ class PartitionTest extends AbstractPartitionTest {
val leaderEpoch = 10
val logStartOffset = 0L
val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true)
addBrokerEpochToMockMetadataCache(metadataCache, List(remoteReplicaId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why are these changes needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. We use mock(classOf[MetadataCache]) in AbstractPartitionTest.

val metadataCache: MetadataCache = mock(classOf[MetadataCache])

@@ -793,6 +793,7 @@ class ReplicaManagerTest {

try {
val brokerList = Seq[Integer](0, 1).asJava
when(replicaManager.metadataCache.getAliveBrokerEpoch(1)).thenReturn(Some(brokerEpoch))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. In ReplicaManagerTest#setupReplicaManagerWithMockedPurgatories, it uses mock(classOf[MetadataCache]).

val metadataCache: MetadataCache = mock(classOf[MetadataCache])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please move the mock to line#3154? We should mock the method after creating the mocked metadataCache

isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)

case _ => true
// In KRaft mode, only a replica which meets all of the following requirements is allowed to join the ISR.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can remove "In KRaft mode"?

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529
Copy link
Member Author

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 overall LGTM except for one trivial comment

@@ -272,7 +272,7 @@ class KRaftMetadataCache(
* @param maximumNumberOfPartitions The max number of partitions to return.
* @param ignoreTopicsWithExceptions Whether ignore the topics with exception.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Could you please remove those comments as you have moved them to the interface

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants