Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18592: Cleanup ReplicaManager #18621

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import kafka.server.AddPartitionsToTxnManager;
import kafka.server.AlterPartitionManager;
import kafka.server.DelayedDeleteRecords;
import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch;
import kafka.server.DelayedProduce;
import kafka.server.DelayedRemoteFetch;
Expand Down Expand Up @@ -66,7 +65,6 @@ public class ReplicaManagerBuilder {
private Optional<DelayedOperationPurgatory<DelayedProduce>> delayedProducePurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> delayedRemoteListOffsetsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedShareFetch>> delayedShareFetchPurgatory = Optional.empty();
Expand Down Expand Up @@ -130,36 +128,11 @@ public ReplicaManagerBuilder setBrokerTopicStats(BrokerTopicStats brokerTopicSta
return this;
}

public ReplicaManagerBuilder setIsShuttingDown(AtomicBoolean isShuttingDown) {
this.isShuttingDown = isShuttingDown;
return this;
}

public ReplicaManagerBuilder setDelayedProducePurgatory(DelayedOperationPurgatory<DelayedProduce> delayedProducePurgatory) {
this.delayedProducePurgatory = Optional.of(delayedProducePurgatory);
return this;
}

public ReplicaManagerBuilder setDelayedFetchPurgatory(DelayedOperationPurgatory<DelayedFetch> delayedFetchPurgatory) {
this.delayedFetchPurgatory = Optional.of(delayedFetchPurgatory);
return this;
}

public ReplicaManagerBuilder setDelayedRemoteFetchPurgatory(DelayedOperationPurgatory<DelayedRemoteFetch> delayedRemoteFetchPurgatory) {
this.delayedRemoteFetchPurgatory = Optional.of(delayedRemoteFetchPurgatory);
return this;
}

public ReplicaManagerBuilder setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory<DelayedDeleteRecords> delayedDeleteRecordsPurgatory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you can't remove the field(s) associated with this setter (and others) as well? As far as I can see it is only used as an argument for creating a ReplicaManager and you could just leave the instantiation of the purgatory in the constructor of ReplicaManager. I am happy for this to be done in a follow-up as long as it has already been accounted for and it isn't just a miss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. These fields are something we miss and also need to be clean.
I will file a jira for this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.delayedDeleteRecordsPurgatory = Optional.of(delayedDeleteRecordsPurgatory);
return this;
}

public ReplicaManagerBuilder setDelayedElectLeaderPurgatoryParam(DelayedOperationPurgatory<DelayedElectLeader> delayedElectLeaderPurgatory) {
this.delayedElectLeaderPurgatory = Optional.of(delayedElectLeaderPurgatory);
return this;
}

public ReplicaManagerBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = Optional.of(threadNamePrefix);
return this;
Expand All @@ -170,11 +143,6 @@ public ReplicaManagerBuilder setBrokerEpoch(long brokerEpoch) {
return this;
}

public ReplicaManagerBuilder setAddPartitionsToTransactionManager(AddPartitionsToTxnManager addPartitionsToTxnManager) {
this.addPartitionsToTxnManager = Optional.of(addPartitionsToTxnManager);
return this;
}

public ReplicaManagerBuilder setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) {
this.directoryEventHandler = directoryEventHandler;
return this;
Expand Down Expand Up @@ -206,7 +174,6 @@ public ReplicaManager build() {
OptionConverters.toScala(delayedProducePurgatory),
OptionConverters.toScala(delayedFetchPurgatory),
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
OptionConverters.toScala(delayedElectLeaderPurgatory),
OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(delayedRemoteListOffsetsPurgatory),
OptionConverters.toScala(delayedShareFetchPurgatory),
Expand Down
18 changes: 0 additions & 18 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1635,24 +1635,6 @@ class Partition(val topicPartition: TopicPartition,
localLog.fetchOffsetSnapshot
}

def legacyFetchOffsetsForTimestamp(timestamp: Long,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = inReadLock(leaderIsrUpdateLock) {
val localLog = localLogWithEpochOrThrow(Optional.empty(), fetchOnlyFromLeader)
val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp, maxNumOffsets)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it can be removed from LocalLog too (in a follow-up).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.
If this PR gets merged, this method would only be invoked in tests.
I will file a JIRA for this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if (!isFromConsumer) {
allOffsets
} else {
val hw = localLog.highWatermark
if (allOffsets.exists(_ > hw))
hw +: allOffsets.dropWhile(_ > hw)
else
allOffsets
}
}

def logStartOffset: Long = {
inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1)
Expand Down
27 changes: 1 addition & 26 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedOperationKey, DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
Expand Down Expand Up @@ -274,7 +274,6 @@ class ReplicaManager(val config: KafkaConfig,
delayedProducePurgatoryParam: Option[DelayedOperationPurgatory[DelayedProduce]] = None,
delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None,
delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
delayedRemoteListOffsetsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None,
delayedShareFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedShareFetch]] = None,
Expand All @@ -298,9 +297,6 @@ class ReplicaManager(val config: KafkaConfig,
new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", config.brokerId,
config.deleteRecordsPurgatoryPurgeIntervalRequests))
val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove delayedElectLeaderPurgatoryParam also

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this also remove the following metrics.

  1. kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=PurgatorySize
  2. kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=NumDelayedOperations

please add them into zk2kraft.html

Copy link
Member

@ijuma ijuma Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, we still allow elect leader to be called. Why are these metrics no longer relevant?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zk broker waits for the metadata update before returning a response to the ElectLeader request, whereas the Kraft broker does not. Consequently, the Kraft broker does not need delayedOperation metrics for ElectLeader requests. Instead, kraft broker has ForwardingManager metrics to monitor the forwarded requests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

new DelayedOperationPurgatory[DelayedElectLeader](
"ElectLeader", config.brokerId))
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedRemoteFetch](
"RemoteFetch", config.brokerId))
Expand Down Expand Up @@ -387,13 +383,6 @@ class ReplicaManager(val config: KafkaConfig,

def getLog(topicPartition: TopicPartition): Option[UnifiedLog] = logManager.getLog(topicPartition)

def hasDelayedElectionOperations: Boolean = delayedElectLeaderPurgatory.numDelayed != 0

def tryCompleteElection(key: DelayedOperationKey): Unit = {
val completed = delayedElectLeaderPurgatory.checkAndComplete(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the purgatory too?

debug("Request key %s unblocked %d ElectLeader.".format(key.keyLabel, completed))
}

def startup(): Unit = {
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
Expand Down Expand Up @@ -628,10 +617,6 @@ class ReplicaManager(val config: KafkaConfig,
onlinePartition(topicPartition).flatMap(_.log)
}

def getLogDir(topicPartition: TopicPartition): Option[String] = {
localLog(topicPartition).map(_.parentDir)
}

def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()

def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
Expand Down Expand Up @@ -1490,15 +1475,6 @@ class ReplicaManager(val config: KafkaConfig,
partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager)
}

def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition,
timestamp: Long,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = {
val partition = getPartitionOrException(topicPartition)
partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove it from partition also?

}

/**
* Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully
* else returns [[None]].
Expand Down Expand Up @@ -2525,7 +2501,6 @@ class ReplicaManager(val config: KafkaConfig,
delayedRemoteListOffsetsPurgatory.shutdown()
delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown()
delayedElectLeaderPurgatory.shutdown()
delayedShareFetchPurgatory.shutdown()
if (checkpointHW)
checkpointHighWatermarks()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ object AbstractCoordinatorConcurrencyTest {
delayedProducePurgatoryParam = Some(producePurgatory),
delayedFetchPurgatoryParam = Some(delayedFetchPurgatoryParam),
delayedDeleteRecordsPurgatoryParam = Some(delayedDeleteRecordsPurgatoryParam),
delayedElectLeaderPurgatoryParam = Some(delayedElectLeaderPurgatoryParam),
delayedRemoteFetchPurgatoryParam = Some(delayedRemoteFetchPurgatoryParam),
delayedRemoteListOffsetsPurgatoryParam = Some(delayedRemoteListOffsetsPurgatoryParam),
threadNamePrefix = Option(this.getClass.getName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2723,8 +2723,6 @@ class ReplicaManagerTest {
"Fetch", timer, 0, false)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", timer, 0, false)
val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
"ElectLeader", timer, 0, false)
val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
"RemoteFetch", timer, 0, false)
val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
Expand Down Expand Up @@ -2754,7 +2752,6 @@ class ReplicaManagerTest {
delayedProducePurgatoryParam = Some(mockProducePurgatory),
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam = Some(mockRemoteListOffsetsPurgatory),
delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
Expand Down Expand Up @@ -3150,8 +3147,6 @@ class ReplicaManagerTest {
"Fetch", timer, 0, false)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", timer, 0, false)
val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
"DelayedElectLeader", timer, 0, false)
val mockDelayedRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
"DelayedRemoteFetch", timer, 0, false)
val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
Expand Down Expand Up @@ -3188,7 +3183,6 @@ class ReplicaManagerTest {
delayedProducePurgatoryParam = Some(mockProducePurgatory),
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam = Some(mockDelayedRemoteListOffsetsPurgatory),
delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
Expand Down
9 changes: 9 additions & 0 deletions docs/zk2kraft.html
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,14 @@ <h5 class="anchor-heading">Removal metrics</h5>
In Kraft mode, Zookeeper is not used, so the metrics is removed.
</p>
</li>
<li>
<p>
Remove the metrics for leader election purgatory.
</p>
<ul>
<li><code>kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=PurgatorySize</code></li>
<li><code>kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=NumDelayedOperations</code></li>
</ul>
</li>
</ul>
</div>
Loading