From 7ec12fd3f605d383e78faf6ff5c4314375b1171b Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 11 Nov 2024 12:56:25 -0800 Subject: [PATCH 1/2] Add jvmGCTime metrics Signed-off-by: Tomoyuki Morita --- .../flint/core/metrics/MetricConstants.java | 10 ++++++++ ...tener.scala => MetricsSparkListener.scala} | 23 +++++++++++++++---- .../spark/refresh/AutoIndexRefresh.scala | 4 ++-- .../org/apache/spark/sql/FlintREPL.scala | 4 ++-- .../org/apache/spark/sql/JobOperator.scala | 4 ++-- 5 files changed, 34 insertions(+), 11 deletions(-) rename flint-core/src/main/scala/org/opensearch/flint/core/metrics/{ReadWriteBytesSparkListener.scala => MetricsSparkListener.scala} (64%) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index ef4d01652..1ab8cf073 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -175,6 +175,16 @@ public final class MetricConstants { */ public static final String INITIAL_CONDITION_CHECK_FAILED_PREFIX = "initialConditionCheck.failed."; + /** + * Metric for tracking the JVM GC time per task + */ + public static final String TASK_JVM_GC_TIME_METRIC = "task.jvmGCTime.count"; + + /** + * Metric for tracking the total JVM GC time for query + */ + public static final String TOTAL_JVM_GC_TIME_METRIC = "query.totalJvmGCTime.count"; + private MetricConstants() { // Private constructor to prevent instantiation } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metrics/ReadWriteBytesSparkListener.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala similarity index 64% rename from flint-core/src/main/scala/org/opensearch/flint/core/metrics/ReadWriteBytesSparkListener.scala rename to flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala index bfafd3eb3..1138545c7 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metrics/ReadWriteBytesSparkListener.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala @@ -6,17 +6,18 @@ package org.opensearch.flint.core.metrics import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd} import org.apache.spark.sql.SparkSession /** - * Collect and emit bytesRead/Written and recordsRead/Written metrics + * Collect and emit metrics by listening spark events */ -class ReadWriteBytesSparkListener extends SparkListener with Logging { +class MetricsSparkListener extends SparkListener with Logging { var bytesRead: Long = 0 var recordsRead: Long = 0 var bytesWritten: Long = 0 var recordsWritten: Long = 0 + var totalJvmGcTime: Long = 0 override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val inputMetrics = taskEnd.taskMetrics.inputMetrics @@ -31,21 +32,33 @@ class ReadWriteBytesSparkListener extends SparkListener with Logging { recordsRead += inputMetrics.recordsRead bytesWritten += outputMetrics.bytesWritten recordsWritten += outputMetrics.recordsWritten + totalJvmGcTime += taskEnd.taskMetrics.jvmGCTime + + MetricsUtil.addHistoricGauge(MetricConstants.TASK_JVM_GC_TIME_METRIC, taskEnd.taskMetrics.jvmGCTime) + } + + override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { + executorMetricsUpdate.executorUpdates.foreach { case (taskId, metrics) => + val totalGcTime = metrics.getMetricValue("totalGCTime") + logInfo(s"ExecutorID: ${executorMetricsUpdate.execId}, Task ID: $taskId, Executor totalGcTime: $totalGcTime") + } } def emitMetrics(): Unit = { logInfo(s"Input: totalBytesRead=${bytesRead}, totalRecordsRead=${recordsRead}") logInfo(s"Output: totalBytesWritten=${bytesWritten}, totalRecordsWritten=${recordsWritten}") + logInfo(s"totalJvmGcTime=${totalJvmGcTime}") MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_BYTES_READ, bytesRead) MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_RECORDS_READ, recordsRead) MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_BYTES_WRITTEN, bytesWritten) MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_RECORDS_WRITTEN, recordsWritten) + MetricsUtil.addHistoricGauge(MetricConstants.TOTAL_JVM_GC_TIME_METRIC, totalJvmGcTime) } } -object ReadWriteBytesSparkListener { +object MetricsSparkListener { def withMetrics[T](spark: SparkSession, lambda: () => T): T = { - val listener = new ReadWriteBytesSparkListener() + val listener = new MetricsSparkListener() spark.sparkContext.addSparkListener(listener) val result = lambda() diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index bedeeba54..ba605d3bf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.refresh import java.util.Collections -import org.opensearch.flint.core.metrics.ReadWriteBytesSparkListener +import org.opensearch.flint.core.metrics.MetricsSparkListener import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} @@ -68,7 +68,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) // Flint index has specialized logic and capability for incremental refresh case refresh: StreamingRefresh => logInfo("Start refreshing index in streaming style") - val job = ReadWriteBytesSparkListener.withMetrics( + val job = MetricsSparkListener.withMetrics( spark, () => refresh diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index ef0e76557..7f819415c 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -17,7 +17,7 @@ import com.codahale.metrics.Timer import org.opensearch.flint.common.model.{FlintStatement, InteractiveSession, SessionStates} import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.logging.CustomLogging -import org.opensearch.flint.core.metrics.{MetricConstants, ReadWriteBytesSparkListener} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener} import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer} import org.apache.spark.SparkConf @@ -525,7 +525,7 @@ object FlintREPL extends Logging with FlintJobExecutor { val statementTimerContext = getTimerContext( MetricConstants.STATEMENT_PROCESSING_TIME_METRIC) val (dataToWrite, returnedVerificationResult) = - ReadWriteBytesSparkListener.withMetrics( + MetricsSparkListener.withMetrics( spark, () => { processStatementOnVerification( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 6cdbdb16d..316b91c2e 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -14,7 +14,7 @@ import scala.util.{Failure, Success, Try} import org.opensearch.flint.common.model.FlintStatement import org.opensearch.flint.common.scheduler.model.LangType -import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil, ReadWriteBytesSparkListener} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil, MetricsSparkListener} import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter import org.opensearch.flint.spark.FlintSpark @@ -70,7 +70,7 @@ case class JobOperator( val statementExecutionManager = instantiateStatementExecutionManager(commandContext, resultIndex, osClient) - val readWriteBytesSparkListener = new ReadWriteBytesSparkListener() + val readWriteBytesSparkListener = new MetricsSparkListener() sparkSession.sparkContext.addSparkListener(readWriteBytesSparkListener) val statement = From e762481b3ff3df3ae612634898de2d03197f28ca Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 11 Nov 2024 14:48:37 -0800 Subject: [PATCH 2/2] Fix style Signed-off-by: Tomoyuki Morita --- .../flint/core/metrics/MetricsSparkListener.scala | 11 +++-------- .../main/scala/org/apache/spark/sql/JobOperator.scala | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala index 1138545c7..2ee941260 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala @@ -34,14 +34,9 @@ class MetricsSparkListener extends SparkListener with Logging { recordsWritten += outputMetrics.recordsWritten totalJvmGcTime += taskEnd.taskMetrics.jvmGCTime - MetricsUtil.addHistoricGauge(MetricConstants.TASK_JVM_GC_TIME_METRIC, taskEnd.taskMetrics.jvmGCTime) - } - - override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { - executorMetricsUpdate.executorUpdates.foreach { case (taskId, metrics) => - val totalGcTime = metrics.getMetricValue("totalGCTime") - logInfo(s"ExecutorID: ${executorMetricsUpdate.execId}, Task ID: $taskId, Executor totalGcTime: $totalGcTime") - } + MetricsUtil.addHistoricGauge( + MetricConstants.TASK_JVM_GC_TIME_METRIC, + taskEnd.taskMetrics.jvmGCTime) } def emitMetrics(): Unit = { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 316b91c2e..8582d3037 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -14,7 +14,7 @@ import scala.util.{Failure, Success, Try} import org.opensearch.flint.common.model.FlintStatement import org.opensearch.flint.common.scheduler.model.LangType -import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil, MetricsSparkListener} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener, MetricsUtil} import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter import org.opensearch.flint.spark.FlintSpark