Skip to content

Commit

Permalink
KAFKA-18597: Add unit tests for additional impacted metrics in LogCle…
Browse files Browse the repository at this point in the history
…anerTest

- Added testMaxBufferUtilizationPercentMetric to validate maximum buffer utilization metric.
- Added testMaxCleanTimeMetric to verify the maximum clean time metric.
- Added testMaxCompactionDelayMetrics to ensure correctness of maximum compaction delay metric.
  • Loading branch information
LoganZhuZzz committed Jan 24, 2025
1 parent 495df20 commit 3d4f8e0
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 47 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ object LogCleaner {
// Visible for test.
private[log] val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
private val MaxCleanTimeMetricName = "max-clean-time-secs"
private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
private[log] val MaxCleanTimeMetricName = "max-clean-time-secs"
private[log] val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
private val DeadThreadCountMetricName = "DeadThreadCount"
// package private for testing
private[log] val MetricNames = Set(
Expand Down
170 changes: 125 additions & 45 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.log

import kafka.log.LogCleaner.MaxBufferUtilizationPercentMetricName
import kafka.log.LogCleaner.{MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -2047,35 +2047,59 @@ class LogCleanerTest extends Logging {
}

@Test
def testMaxOverCleanerThreads(): Unit = {
val logCleaner = new LogCleaner(new CleanerConfig(true),
def testMaxBufferUtilizationPercentMetric(): Unit = {
val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
time = time
)

def assertMaxBufferUtilizationPercent(expected: Int): Unit = {
val gauge = logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
() => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
assertEquals(expected, gauge.value())
}

try {
// No CleanerThreads
assertMaxBufferUtilizationPercent(0)

val cleaners = logCleaner.cleaners

val cleaners = logCleaner.cleaners
val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.bufferUtilization = 0.75
cleaners += cleaner1

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.bufferUtilization = 0.75
cleaners += cleaner1
val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.bufferUtilization = 0.85
cleaners += cleaner2

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.bufferUtilization = 0.85
cleaners += cleaner2
val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.bufferUtilization = 0.65
cleaners += cleaner3

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.bufferUtilization = 0.65
cleaners += cleaner3
// expect the gauge value to reflect the maximum bufferUtilization
assertMaxBufferUtilizationPercent(85)

assertEquals(0.85, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
// Update bufferUtilization and verify the gauge value updates
cleaner1.lastStats.bufferUtilization = 0.9
assertMaxBufferUtilizationPercent(90)

// All CleanerThreads have the same bufferUtilization
cleaners.foreach(_.lastStats.bufferUtilization = 0.5)
assertMaxBufferUtilizationPercent(50)
} finally {
logCleaner.shutdown()
}
}

@Test
def testMaxBufferUtilizationPercentMetric(): Unit = {
def testMaxCleanTimeMetric(): Unit = {
val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
Expand All @@ -2084,42 +2108,98 @@ class LogCleanerTest extends Logging {
time = time
)

def assertMaxBufferUtilizationPercent(expected: Int): Unit = {
val gauge = logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
() => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
def assertMaxCleanTime(expected: Int): Unit = {
val gauge = logCleaner.metricsGroup.newGauge(MaxCleanTimeMetricName,
() => logCleaner.maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
assertEquals(expected, gauge.value())
}

try {
// No CleanerThreads
assertMaxCleanTime(0)

val cleaners = logCleaner.cleaners

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 1_000L
cleaners += cleaner1

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.endTime = cleaner2.lastStats.startTime + 2_000L
cleaners += cleaner2

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.endTime = cleaner3.lastStats.startTime + 3_000L
cleaners += cleaner3

// expect the gauge value to reflect the maximum cleanTime
assertMaxCleanTime(3)

// Update cleanTime and verify the gauge value updates
cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 4_000L
assertMaxCleanTime(4)

// All CleanerThreads have the same cleanTime
cleaners.foreach(cleaner => cleaner.lastStats.endTime = cleaner.lastStats.startTime + 1_500L)
assertMaxCleanTime(1)
} finally {
logCleaner.shutdown()
}
}

@Test
def testMaxCompactionDelayMetrics(): Unit = {
val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time
)

def assertMaxCompactionDelay(expected: Int): Unit = {
val gauge = logCleaner.metricsGroup.newGauge(MaxCompactionDelayMetricsName,
() => (logCleaner.maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt)
assertEquals(expected, gauge.value())
}

// No CleanerThreads
assertMaxBufferUtilizationPercent(0)
try {
// No CleanerThreads
assertMaxCompactionDelay(0)

val cleaners = logCleaner.cleaners
val cleaners = logCleaner.cleaners

val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastStats.bufferUtilization = 0.75
cleaners += cleaner1
val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 1_000L
cleaners += cleaner1

val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastStats.bufferUtilization = 0.85
cleaners += cleaner2
val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastPreCleanStats.maxCompactionDelayMs = 2_000L
cleaners += cleaner2

val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastStats.bufferUtilization = 0.65
cleaners += cleaner3
val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastPreCleanStats.maxCompactionDelayMs = 3_000L
cleaners += cleaner3

// expect the gauge value to reflect the maximum bufferUtilization
assertMaxBufferUtilizationPercent(85)
// expect the gauge value to reflect the maximum CompactionDelay
assertMaxCompactionDelay(3)

// Update bufferUtilization and verify the gauge value updates
cleaner1.lastStats.bufferUtilization = 0.9
assertMaxBufferUtilizationPercent(90)
// Update CompactionDelay and verify the gauge value updates
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 4_000L
assertMaxCompactionDelay(4)

// All CleanerThreads have the same bufferUtilization
cleaners.foreach(_.lastStats.bufferUtilization = 0.5)
assertMaxBufferUtilizationPercent(50)
// All CleanerThreads have the same CompactionDelay
cleaners.foreach(_.lastPreCleanStats.maxCompactionDelayMs = 1_500L)
assertMaxCompactionDelay(1)
} finally {
logCleaner.shutdown()
}
}

private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
Expand Down

0 comments on commit 3d4f8e0

Please sign in to comment.