Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18555: Avoid casting MetadataCache to KRaftMetadataCache #18632

Merged
merged 14 commits into from
Jan 25, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import kafka.network.RequestChannel;
import kafka.server.AuthHelper;
import kafka.server.KafkaConfig;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.server.MetadataCache;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
Expand All @@ -44,12 +44,12 @@
import static org.apache.kafka.common.resource.ResourceType.TOPIC;

public class DescribeTopicPartitionsRequestHandler {
KRaftMetadataCache metadataCache;
MetadataCache metadataCache;
AuthHelper authHelper;
KafkaConfig config;

public DescribeTopicPartitionsRequestHandler(
KRaftMetadataCache metadataCache,
MetadataCache metadataCache,
AuthHelper authHelper,
KafkaConfig config
) {
Expand Down Expand Up @@ -104,7 +104,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
return isAuthorized;
});

DescribeTopicPartitionsResponseData response = metadataCache.getTopicMetadataForDescribeTopicResponse(
DescribeTopicPartitionsResponseData response = metadataCache.describeTopicResponse(
CollectionConverters.asScala(authorizedTopicsStream.iterator()),
abstractRequest.context().listenerName,
(String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,
Expand Down
37 changes: 16 additions & 21 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1050,28 +1050,23 @@ class Partition(val topicPartition: TopicPartition,
}

private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
metadataCache match {
// In KRaft mode, only a replica which meets all of the following requirements is allowed to join the ISR.
// 1. It is not fenced.
// 2. It is not in controlled shutdown.
// 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch
// request broker epoch is -1 which bypasses the epoch verification.
case kRaftMetadataCache: KRaftMetadataCache =>
val mayBeReplica = getReplica(followerReplicaId)
// The topic is already deleted and we don't have any replica information. In this case, we can return false
// so as to avoid NPE
if (mayBeReplica.isEmpty) {
warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.")
return false
}
val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch
val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(followerReplicaId)
!kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) &&
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)

case _ => true
// A replica which meets all of the following requirements is allowed to join the ISR.
// 1. It is not fenced.
// 2. It is not in controlled shutdown.
// 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch
// request broker epoch is -1 which bypasses the epoch verification.
val mayBeReplica = getReplica(followerReplicaId)
// The topic is already deleted and we don't have any replica information. In this case, we can return false
// so as to avoid NPE
if (mayBeReplica.isEmpty) {
warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.")
return false
}
val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(followerReplicaId)
!metadataCache.isBrokerFenced(followerReplicaId) &&
!metadataCache.isBrokerShuttingDown(followerReplicaId) &&
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
}

private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = {
Expand Down
15 changes: 5 additions & 10 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.cluster

import kafka.log.UnifiedLog
import kafka.server.MetadataCache
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
Expand Down Expand Up @@ -113,15 +112,11 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat
brokerEpoch: Long
): Unit = {
replicaState.updateAndGet { currentReplicaState =>
metadataCache match {
case kRaftMetadataCache: KRaftMetadataCache =>
val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
s"vs expected=${cachedBrokerEpoch.get}")
}
case _ =>
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
s"vs expected=${currentReplicaState.brokerEpoch.get}")
}

val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) {
Expand Down
32 changes: 11 additions & 21 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import kafka.server.metadata.ConfigRepository
import kafka.server.share.SharePartitionManager
import kafka.utils.Logging
import org.apache.kafka.admin.AdminUtils
Expand Down Expand Up @@ -113,11 +113,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config)
val configManager = new ConfigAdminManager(brokerId, config, configRepository)
val describeTopicPartitionsRequestHandler : Option[DescribeTopicPartitionsRequestHandler] = metadataCache match {
case kRaftMetadataCache: KRaftMetadataCache =>
Some(new DescribeTopicPartitionsRequestHandler(kRaftMetadataCache, authHelper, config))
case _ => None
}
val describeTopicPartitionsRequestHandler = new DescribeTopicPartitionsRequestHandler(
metadataCache, authHelper, config)

def close(): Unit = {
aclApis.close()
Expand Down Expand Up @@ -967,21 +964,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}

def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = {
describeTopicPartitionsRequestHandler match {
case Some(handler) => {
val response = handler.handleDescribeTopicPartitionsRequest(request)
trace("Sending topic partitions metadata %s for correlation id %d to client %s".format(response.topics().asScala.mkString(","),
request.header.correlationId, request.header.clientId))

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
response.setThrottleTimeMs(requestThrottleMs)
new DescribeTopicPartitionsResponse(response)
})
}
case None => {
requestHelper.sendMaybeThrottle(request, request.body[DescribeTopicPartitionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
}
}
val response = describeTopicPartitionsRequestHandler.handleDescribeTopicPartitionsRequest(request)
trace("Sending topic partitions metadata %s for correlation id %d to client %s".format(response.topics().asScala.mkString(","),
request.header.correlationId, request.header.clientId))

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
response.setThrottleTimeMs(requestThrottleMs)
new DescribeTopicPartitionsResponse(response)
})
}

/**
Expand Down
32 changes: 31 additions & 1 deletion core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.server

import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData}
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeTopicPartitionsResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderAndIsr
Expand Down Expand Up @@ -56,6 +56,12 @@ trait MetadataCache extends ConfigRepository {

def getAliveBrokers(): Iterable[BrokerMetadata]

def getAliveBrokerEpoch(brokerId: Int): Option[Long]

def isBrokerFenced(brokerId: Int): Boolean

def isBrokerShuttingDown(brokerId: Int): Boolean

def getTopicId(topicName: String): Uuid

def getTopicName(topicId: Uuid): Option[String]
Expand Down Expand Up @@ -105,6 +111,30 @@ trait MetadataCache extends ConfigRepository {
def describeClientQuotas(request: DescribeClientQuotasRequestData): DescribeClientQuotasResponseData

def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData

/**
* Get the topic metadata for the given topics.
*
* The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first
* partition can't be returned due the limit.
* If a topic can't return any partition due to quota limit reached, this topic will not be included in the response.
*
* Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData
* will also be sorted in alphabetical order.
*
* @param topics The iterator of topics and their corresponding first partition id to fetch.
* @param listenerName The listener name.
* @param topicPartitionStartIndex The start partition index for the first topic
* @param maximumNumberOfPartitions The max number of partitions to return.
* @param ignoreTopicsWithExceptions Whether ignore the topics with exception.
*/
def describeTopicResponse(
topics: Iterator[String],
listenerName: ListenerName,
topicPartitionStartIndex: String => Int,
maximumNumberOfPartitions: Int,
ignoreTopicsWithExceptions: Boolean
): DescribeTopicPartitionsResponseData
}

object MetadataCache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class KRaftMetadataCache(
* @param maximumNumberOfPartitions The max number of partitions to return.
* @param ignoreTopicsWithExceptions Whether ignore the topics with exception.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this scaladoc to the interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Could you please remove those comments as you have moved them to the interface

def getTopicMetadataForDescribeTopicResponse(
override def describeTopicResponse(
topics: Iterator[String],
listenerName: ListenerName,
topicPartitionStartIndex: String => Int,
Expand Down Expand Up @@ -353,11 +353,11 @@ class KRaftMetadataCache(
Option(_currentImage.cluster.broker(brokerId)).count(!_.fenced()) == 1
}

def isBrokerFenced(brokerId: Int): Boolean = {
override def isBrokerFenced(brokerId: Int): Boolean = {
Option(_currentImage.cluster.broker(brokerId)).count(_.fenced) == 1
}

def isBrokerShuttingDown(brokerId: Int): Boolean = {
override def isBrokerShuttingDown(brokerId: Int): Boolean = {
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1
}

Expand Down Expand Up @@ -452,7 +452,7 @@ class KRaftMetadataCache(
}
}

def getAliveBrokerEpoch(brokerId: Int): Option[Long] = {
override def getAliveBrokerEpoch(brokerId: Int): Option[Long] = {
Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
map(brokerRegistration => brokerRegistration.epoch())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ class PartitionLockTest extends Logging {
val controllerEpoch = 0
val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava
val isr = replicas
replicas.forEach(replicaId => when(metadataCache.getAliveBrokerEpoch(replicaId)).thenReturn(Some(1L)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this extra code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use mock(classOf[MetadataCache]) for metadataCache, so it didn't really get into KRaftMetadataCache block. In this PR, we remove match block, so we have to add related mock.

val metadataCache: MetadataCache = mock(classOf[MetadataCache])

metadataCache match {
case kRaftMetadataCache: KRaftMetadataCache =>
val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
s"vs expected=${cachedBrokerEpoch.get}")
}
case _ =>
}


assertTrue(partition.makeLeader(new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(controllerEpoch)
Expand Down
Loading
Loading