Skip to content

Commit

Permalink
KAFKA-18592: Cleanup ReplicaManager
Browse files Browse the repository at this point in the history
JIRA: KAFKA-18592
There are few methods in ReplicaManager unused now, we should remove
them.
  • Loading branch information
frankvicky committed Jan 19, 2025
1 parent bb3944c commit 5b59263
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 43 deletions.
18 changes: 0 additions & 18 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1636,24 +1636,6 @@ class Partition(val topicPartition: TopicPartition,
localLog.fetchOffsetSnapshot
}

def legacyFetchOffsetsForTimestamp(timestamp: Long,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = inReadLock(leaderIsrUpdateLock) {
val localLog = localLogWithEpochOrThrow(Optional.empty(), fetchOnlyFromLeader)
val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp, maxNumOffsets)

if (!isFromConsumer) {
allOffsets
} else {
val hw = localLog.highWatermark
if (allOffsets.exists(_ > hw))
hw +: allOffsets.dropWhile(_ > hw)
else
allOffsets
}
}

def logStartOffset: Long = {
inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1)
Expand Down
26 changes: 1 addition & 25 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedOperationKey, DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
Expand Down Expand Up @@ -302,9 +302,6 @@ class ReplicaManager(val config: KafkaConfig,
new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", config.brokerId,
config.deleteRecordsPurgatoryPurgeIntervalRequests))
val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedElectLeader](
"ElectLeader", config.brokerId))
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedRemoteFetch](
"RemoteFetch", config.brokerId))
Expand Down Expand Up @@ -391,13 +388,6 @@ class ReplicaManager(val config: KafkaConfig,

def getLog(topicPartition: TopicPartition): Option[UnifiedLog] = logManager.getLog(topicPartition)

def hasDelayedElectionOperations: Boolean = delayedElectLeaderPurgatory.numDelayed != 0

def tryCompleteElection(key: DelayedOperationKey): Unit = {
val completed = delayedElectLeaderPurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d ElectLeader.".format(key.keyLabel, completed))
}

def startup(): Unit = {
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
Expand Down Expand Up @@ -755,10 +745,6 @@ class ReplicaManager(val config: KafkaConfig,
onlinePartition(topicPartition).flatMap(_.log)
}

def getLogDir(topicPartition: TopicPartition): Option[String] = {
localLog(topicPartition).map(_.parentDir)
}

def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()

def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
Expand Down Expand Up @@ -1617,15 +1603,6 @@ class ReplicaManager(val config: KafkaConfig,
partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager)
}

def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition,
timestamp: Long,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = {
val partition = getPartitionOrException(topicPartition)
partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader)
}

/**
* Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully
* else returns [[None]].
Expand Down Expand Up @@ -2663,7 +2640,6 @@ class ReplicaManager(val config: KafkaConfig,
delayedRemoteListOffsetsPurgatory.shutdown()
delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown()
delayedElectLeaderPurgatory.shutdown()
delayedShareFetchPurgatory.shutdown()
if (checkpointHW)
checkpointHighWatermarks()
Expand Down

0 comments on commit 5b59263

Please sign in to comment.