From dd4535bf2c99772100dddd8b4cd93cefc8de1d41 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Thu, 7 Nov 2024 11:19:46 -0800 Subject: [PATCH] match Array when reading sourceTables Signed-off-by: Sean Kao --- .../flint/spark/FlintSparkIndexFactory.scala | 14 ++----- .../metadatacache/FlintMetadataCache.scala | 18 ++------- .../FlintOpenSearchMetadataCacheWriter.scala | 1 - .../spark/mv/FlintSparkMaterializedView.scala | 40 +++++++++++++++++-- .../FlintSparkMaterializedViewITSuite.scala | 6 +-- 5 files changed, 45 insertions(+), 34 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index ca659550d..3a12b63fe 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -14,7 +14,7 @@ import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedView -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind @@ -141,9 +141,9 @@ object FlintSparkIndexFactory extends Logging { } private def getMvSourceTables(spark: SparkSession, metadata: FlintMetadata): Array[String] = { - val sourceTables = getArrayString(metadata.properties, "sourceTables") + val sourceTables = getSourceTablesFromMetadata(metadata) if (sourceTables.isEmpty) { - FlintSparkMaterializedView.extractSourceTableNames(spark, metadata.source) + FlintSparkMaterializedView.extractSourceTablesFromQuery(spark, metadata.source) } else { sourceTables } @@ -161,12 +161,4 @@ object FlintSparkIndexFactory extends Logging { Some(value.asInstanceOf[String]) } } - - private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = { - map.get(key) match { - case list: java.util.ArrayList[_] => - list.toArray.map(_.toString) - case _ => Array.empty[String] - } - } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala index 150514783..86267c881 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala @@ -10,11 +10,9 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark.FlintSparkIndexOptions -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE} import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser -import org.apache.spark.internal.Logging - /** * Flint metadata cache defines metadata required to store in read cache for frontend user to * access. @@ -47,7 +45,7 @@ case class FlintMetadataCache( } } -object FlintMetadataCache extends Logging { +object FlintMetadataCache { val metadataCacheVersion = "1.0" @@ -63,17 +61,7 @@ object FlintMetadataCache extends Logging { None } val sourceTables = metadata.kind match { - case MV_INDEX_TYPE => - val sourceTables = metadata.properties.get("sourceTables") - logInfo(s"sourceTables: $sourceTables") - sourceTables match { - case list: java.util.ArrayList[_] => - logInfo("sourceTables is ArrayList") - list.toArray.map(_.toString) - case _ => - logInfo(s"sourceTables is not ArrayList. It is of type: ${sourceTables.getClass.getName}") - Array.empty[String] - } + case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata) case _ => Array(metadata.source) } val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala index c181a1bf3..f6fc0ba6f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala @@ -98,7 +98,6 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) */ private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = { val metadataCacheProperties = FlintMetadataCache(metadata).toMap - logInfo(s"metadataCacheProperties: $metadataCacheProperties") if (includeSpec) { (metadataCacheProperties ++ metadata.properties.asScala).asJava diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index aecfc99df..549ed049c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -7,6 +7,7 @@ package org.opensearch.flint.spark.mv import java.util.Locale +import scala.collection.JavaConverters.asScalaBufferConverter import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.convert.ImplicitConversions.`map AsScala` @@ -18,6 +19,7 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE} +import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation} @@ -147,7 +149,7 @@ case class FlintSparkMaterializedView( } } -object FlintSparkMaterializedView { +object FlintSparkMaterializedView extends Logging { /** MV index type name */ val MV_INDEX_TYPE = "mv" @@ -179,13 +181,43 @@ object FlintSparkMaterializedView { * @return * source table names */ - def extractSourceTableNames(spark: SparkSession, query: String): Array[String] = { - spark.sessionState.sqlParser + def extractSourceTablesFromQuery(spark: SparkSession, query: String): Array[String] = { + logInfo(s"Extracting source tables from query $query") + val sourceTables = spark.sessionState.sqlParser .parsePlan(query) .collect { case relation: UnresolvedRelation => qualifyTableName(spark, relation.tableName) } .toArray + logInfo(s"Extracted tables: [${sourceTables.mkString(", ")}]") + sourceTables + } + + /** + * Get source tables from Flint metadata properties field. TODO: test case + * + * @param metadata + * Flint metadata + * @return + * source table names + */ + def getSourceTablesFromMetadata(metadata: FlintMetadata): Array[String] = { + logInfo(s"Getting source tables from metadata $metadata") + metadata.properties.get("sourceTables") match { + case list: java.util.ArrayList[_] => + logInfo(s"sourceTables is java.util.ArrayList: [${list.asScala.mkString(", ")}]") + list.toArray.map(_.toString) + case array: Array[_] => + logInfo(s"sourceTables is Array: [${array.mkString(", ")}]") + array.map(_.toString) + case null => + logInfo("sourceTables property does not exist") + Array.empty[String] + case _ => + logInfo( + s"sourceTables is of type: ${metadata.properties.get("sourceTables").getClass.getName}") + Array.empty[String] + } } /** Builder class for MV build */ @@ -217,7 +249,7 @@ object FlintSparkMaterializedView { */ def query(query: String): Builder = { this.query = query - this.sourceTables = extractSourceTableNames(flint.spark, query) + this.sourceTables = extractSourceTablesFromQuery(flint.spark, query) this } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index fc77faaea..710d5e2a3 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -18,7 +18,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName import org.opensearch.flint.spark.mv.FlintSparkMaterializedView -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTableNames, getFlintIndexName} +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTablesFromQuery, getFlintIndexName} import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -65,14 +65,14 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | FROM spark_catalog.default.`table/3` | INNER JOIN spark_catalog.default.`table.4` |""".stripMargin - extractSourceTableNames(flint.spark, testComplexQuery) should contain theSameElementsAs + extractSourceTablesFromQuery(flint.spark, testComplexQuery) should contain theSameElementsAs Array( "spark_catalog.default.table1", "spark_catalog.default.table2", "spark_catalog.default.`table/3`", "spark_catalog.default.`table.4`") - extractSourceTableNames(flint.spark, "SELECT 1") should have size 0 + extractSourceTablesFromQuery(flint.spark, "SELECT 1") should have size 0 } test("create materialized view with metadata successfully") {