Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jvmGCTime metrics #889

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ public final class MetricConstants {
*/
public static final String INITIAL_CONDITION_CHECK_FAILED_PREFIX = "initialConditionCheck.failed.";

/**
* Metric for tracking the JVM GC time per task
*/
public static final String TASK_JVM_GC_TIME_METRIC = "task.jvmGCTime.count";
penghuo marked this conversation as resolved.
Show resolved Hide resolved

/**
* Metric for tracking the total JVM GC time for query
*/
public static final String TOTAL_JVM_GC_TIME_METRIC = "query.totalJvmGCTime.count";

private MetricConstants() {
// Private constructor to prevent instantiation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
package org.opensearch.flint.core.metrics

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd}
import org.apache.spark.sql.SparkSession

/**
* Collect and emit bytesRead/Written and recordsRead/Written metrics
* Collect and emit metrics by listening spark events
*/
class ReadWriteBytesSparkListener extends SparkListener with Logging {
class MetricsSparkListener extends SparkListener with Logging {
var bytesRead: Long = 0
var recordsRead: Long = 0
var bytesWritten: Long = 0
var recordsWritten: Long = 0
var totalJvmGcTime: Long = 0

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val inputMetrics = taskEnd.taskMetrics.inputMetrics
Expand All @@ -31,21 +32,28 @@ class ReadWriteBytesSparkListener extends SparkListener with Logging {
recordsRead += inputMetrics.recordsRead
bytesWritten += outputMetrics.bytesWritten
recordsWritten += outputMetrics.recordsWritten
totalJvmGcTime += taskEnd.taskMetrics.jvmGCTime
penghuo marked this conversation as resolved.
Show resolved Hide resolved

MetricsUtil.addHistoricGauge(
MetricConstants.TASK_JVM_GC_TIME_METRIC,
taskEnd.taskMetrics.jvmGCTime)
}

def emitMetrics(): Unit = {
logInfo(s"Input: totalBytesRead=${bytesRead}, totalRecordsRead=${recordsRead}")
logInfo(s"Output: totalBytesWritten=${bytesWritten}, totalRecordsWritten=${recordsWritten}")
logInfo(s"totalJvmGcTime=${totalJvmGcTime}")
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_BYTES_READ, bytesRead)
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_RECORDS_READ, recordsRead)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_BYTES_WRITTEN, bytesWritten)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_RECORDS_WRITTEN, recordsWritten)
MetricsUtil.addHistoricGauge(MetricConstants.TOTAL_JVM_GC_TIME_METRIC, totalJvmGcTime)
}
}

object ReadWriteBytesSparkListener {
object MetricsSparkListener {
def withMetrics[T](spark: SparkSession, lambda: () => T): T = {
val listener = new ReadWriteBytesSparkListener()
val listener = new MetricsSparkListener()
spark.sparkContext.addSparkListener(listener)

val result = lambda()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.refresh

import java.util.Collections

import org.opensearch.flint.core.metrics.ReadWriteBytesSparkListener
import org.opensearch.flint.core.metrics.MetricsSparkListener
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}
Expand Down Expand Up @@ -68,7 +68,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
// Flint index has specialized logic and capability for incremental refresh
case refresh: StreamingRefresh =>
logInfo("Start refreshing index in streaming style")
val job = ReadWriteBytesSparkListener.withMetrics(
val job = MetricsSparkListener.withMetrics(
spark,
() =>
refresh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.codahale.metrics.Timer
import org.opensearch.flint.common.model.{FlintStatement, InteractiveSession, SessionStates}
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metrics.{MetricConstants, ReadWriteBytesSparkListener}
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener}
import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -525,7 +525,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
val statementTimerContext = getTimerContext(
MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)
val (dataToWrite, returnedVerificationResult) =
ReadWriteBytesSparkListener.withMetrics(
MetricsSparkListener.withMetrics(
spark,
() => {
processStatementOnVerification(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.util.{Failure, Success, Try}

import org.opensearch.flint.common.model.FlintStatement
import org.opensearch.flint.common.scheduler.model.LangType
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil, ReadWriteBytesSparkListener}
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener, MetricsUtil}
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import org.opensearch.flint.spark.FlintSpark

Expand Down Expand Up @@ -70,7 +70,7 @@ case class JobOperator(
val statementExecutionManager =
instantiateStatementExecutionManager(commandContext, resultIndex, osClient)

val readWriteBytesSparkListener = new ReadWriteBytesSparkListener()
val readWriteBytesSparkListener = new MetricsSparkListener()
sparkSession.sparkContext.addSparkListener(readWriteBytesSparkListener)

val statement =
Expand Down
Loading