Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-wu24 committed Jan 23, 2025
1 parent faf6216 commit 90e8fe5
Show file tree
Hide file tree
Showing 22 changed files with 215 additions and 195 deletions.
4 changes: 0 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2040,12 +2040,8 @@ project(':raft') {
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':server')
testImplementation project(':metadata')
testImplementation libs.jacksonDatabindYaml
testImplementation libs.junitJupiter
testImplementation libs.metrics
testImplementation libs.mockitoCore
testImplementation libs.jqwik
testImplementation testLog4j2Libs
Expand Down
4 changes: 0 additions & 4 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,10 @@
</subpackage>

<subpackage name="raft">
<allow pkg="com.yammer.metrics" />
<allow pkg="kafka.raft" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
Expand All @@ -480,7 +477,6 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.fault"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import org.apache.kafka.raft.ExternalKRaftMetrics
import org.apache.kafka.server.metrics.BrokerServerMetrics

class DefaultExternalKRaftMetrics(
val brokerServerMetrics: BrokerServerMetrics,
val controllerMetadataMetrics: ControllerMetadataMetrics
val brokerServerMetricsOpt: Option[BrokerServerMetrics],
val controllerMetadataMetricsOpt: Option[ControllerMetadataMetrics]
) extends ExternalKRaftMetrics {
val brokerServerMetricsOpt: Option[BrokerServerMetrics] = Option(brokerServerMetrics)
val controllerMetadataMetricsOpt: Option[ControllerMetadataMetrics] = Option(controllerMetadataMetrics)

override def setIgnoredStaticVoters(): Unit = {
brokerServerMetricsOpt.foreach(metrics => metrics.setIgnoredStaticVoters())
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ class KafkaRaftManager[T](
topicId: Uuid,
time: Time,
metrics: Metrics,
externalKRaftMetrics: ExternalKRaftMetrics,
threadNamePrefixOpt: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
bootstrapServers: JCollection[InetSocketAddress],
localListeners: Endpoints,
fatalFaultHandler: FaultHandler,
externalKRaftMetrics: ExternalKRaftMetrics
fatalFaultHandler: FaultHandler
) extends RaftManager[T] with Logging {

val apiVersions = new ApiVersions()
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class SharedServer(
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}

val externalKRaftMetrics = new DefaultExternalKRaftMetrics(brokerMetrics, controllerServerMetrics)
val externalKRaftMetrics = new DefaultExternalKRaftMetrics(Option(brokerMetrics), Option(controllerServerMetrics))

val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
clusterId,
Expand All @@ -288,12 +288,12 @@ class SharedServer(
KafkaRaftServer.MetadataTopicId,
time,
metrics,
externalKRaftMetrics,
Some(s"kafka-${sharedServerConfig.nodeId}-raft"), // No dash expected at the end
controllerQuorumVotersFuture,
bootstrapServers,
listenerEndpoints,
raftManagerFaultHandler,
externalKRaftMetrics
raftManagerFaultHandler
)
raftManager = _raftManager
_raftManager.startup()
Expand Down
12 changes: 2 additions & 10 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package kafka.tools

import com.yammer.metrics.core.MetricsRegistry

import java.net.InetSocketAddress
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
Expand All @@ -37,19 +35,16 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler
import org.apache.kafka.server.metrics.BrokerServerMetrics
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils, ShutdownableThread}
import org.apache.kafka.snapshot.SnapshotReader

import java.util.Optional
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -107,15 +102,12 @@ class TestRaftServer(
topicId,
time,
metrics,
new DefaultExternalKRaftMetrics(None, None),
Some(threadNamePrefix),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
new ProcessTerminatingFaultHandler.Builder().build(),
new DefaultExternalKRaftMetrics(
new BrokerServerMetrics(metrics),
new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()))
)
new ProcessTerminatingFaultHandler.Builder().build()
)

workloadGenerator = new RaftWorkloadGenerator(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.raft

import com.yammer.metrics.core.MetricsRegistry
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.server.metrics.BrokerServerMetrics
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
import org.junit.jupiter.api.Test

import java.util.Optional

final class DefaultExternalKRaftMetricsTest {
@Test
def testDefaultExternalKRaftMetrics(): Unit = {
val brokerServerMetrics = new BrokerServerMetrics(new Metrics())
val controllerMetadataMetrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()))
var metrics = new DefaultExternalKRaftMetrics(
Option(brokerServerMetrics),
Option(controllerMetadataMetrics)
)

assertFalse(brokerServerMetrics.ignoredStaticVoters())
assertFalse(controllerMetadataMetrics.ignoredStaticVoters())

metrics.setIgnoredStaticVoters()

assertTrue(brokerServerMetrics.ignoredStaticVoters())
assertTrue(controllerMetadataMetrics.ignoredStaticVoters())

metrics.setIgnoredStaticVoters()

assertTrue(brokerServerMetrics.ignoredStaticVoters())
assertTrue(controllerMetadataMetrics.ignoredStaticVoters())

metrics = new DefaultExternalKRaftMetrics(None, None)
metrics.setIgnoredStaticVoters()
}
}
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ class RaftManagerTest {
topicId,
Time.SYSTEM,
new Metrics(Time.SYSTEM),
new DefaultExternalKRaftMetrics(None, None),
Option.empty,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
mock(classOf[FaultHandler]),
new DefaultExternalKRaftMetrics(null, null)
mock(classOf[FaultHandler])
)
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "MetadataErrorCount");
private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = getMetricName(
"ControllerStats", "UncleanLeaderElectionsPerSec");

private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters");

Expand All @@ -68,10 +67,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private final AtomicInteger preferredReplicaImbalanceCount = new AtomicInteger(0);
private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();

private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);


/**
* Create a new ControllerMetadataMetrics object.
*
Expand Down Expand Up @@ -219,6 +216,7 @@ public void updateUncleanLeaderElection(int count) {
public void setIgnoredStaticVoters() {
ignoredStaticVoters.set(true);
}

public boolean ignoredStaticVoters() {
return ignoredStaticVoters.get();
}
Expand Down
4 changes: 0 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,6 @@ private void onBecomeLeader(long currentTimeMs) {

resetConnections();
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
kafkaRaftMetrics.addLeaderMetrics();
}

private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
Expand Down Expand Up @@ -729,21 +728,18 @@ private void transitionToProspective(long currentTimeMs) {
private void transitionToUnattached(int epoch, OptionalInt leaderId) {
quorum.transitionToUnattached(epoch, leaderId);
maybeFireLeaderChange();
kafkaRaftMetrics.removeLeaderMetrics();
resetConnections();
}

private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(
Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
quorum.transitionToResigned(preferredSuccessors);
kafkaRaftMetrics.removeLeaderMetrics();
resetConnections();
}

private void onBecomeFollower(long currentTimeMs) {
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
kafkaRaftMetrics.removeLeaderMetrics();

resetConnections();

Expand Down
12 changes: 7 additions & 5 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,7 @@ public void resetAddVoterHandlerState(
.complete(RaftUtil.addVoterResponse(error, message))
);
addVoterHandlerState = state;
kafkaRaftMetrics.updateUncommittedVoterChange(
addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent()
);
updateUncommittedVoterChangeMetric();
}

public Optional<RemoveVoterHandlerState> removeVoterHandlerState() {
Expand All @@ -244,6 +242,10 @@ public void resetRemoveVoterHandlerState(
.complete(RaftUtil.removeVoterResponse(error, message))
);
removeVoterHandlerState = state;
updateUncommittedVoterChangeMetric();
}

private void updateUncommittedVoterChangeMetric() {
kafkaRaftMetrics.updateUncommittedVoterChange(
addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent()
);
Expand Down Expand Up @@ -689,7 +691,6 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) {
// Remove the voter from the previous data structures
oldVoterStates.remove(voterNode.voterKey().id());
observerStates.remove(voterNode.voterKey());
kafkaRaftMetrics.updateNumObservers(observerStates.size());

// Make sure that the replica key in the replica state matches the voter's
state.setReplicaKey(voterNode.voterKey());
Expand All @@ -704,8 +705,8 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) {
for (ReplicaState replicaStateEntry : oldVoterStates.values()) {
replicaStateEntry.clearListeners();
observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry);
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}

public static class ReplicaState implements Comparable<ReplicaState> {
Expand Down Expand Up @@ -879,6 +880,7 @@ public String name() {
public void close() {
resetAddVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, Optional.empty());
resetRemoveVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, Optional.empty());
kafkaRaftMetrics.removeLeaderMetrics();

accumulator.close();
}
Expand Down
5 changes: 3 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;

import org.apache.kafka.raft.internals.KafkaRaftMetrics;

import org.slf4j.Logger;

import java.io.IOException;
Expand Down Expand Up @@ -96,7 +96,6 @@ public class QuorumState {
private final int electionTimeoutMs;
private final int fetchTimeoutMs;
private final LogContext logContext;

private final KafkaRaftMetrics kafkaRaftMetrics;

private volatile EpochState state;
Expand Down Expand Up @@ -731,6 +730,8 @@ public <T> LeaderState<T> transitionToLeader(long epochStartOffset, BatchAccumul
logContext,
kafkaRaftMetrics
);
kafkaRaftMetrics.addLeaderMetrics();

durableTransitionTo(state);
return state;
}
Expand Down
2 changes: 1 addition & 1 deletion raft/src/main/java/org/apache/kafka/raft/VoterSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Set<VoterNode> voterNodes() {
}

/**
* Returns size of the voter set.
* Returns the number of voters in the voter set.
*/
public int size() {
return voters.size();
Expand Down
Loading

0 comments on commit 90e8fe5

Please sign in to comment.