Skip to content

Commit

Permalink
match Array when reading sourceTables
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Nov 7, 2024
1 parent 137e5a8 commit dd4535b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
Expand Down Expand Up @@ -141,9 +141,9 @@ object FlintSparkIndexFactory extends Logging {
}

private def getMvSourceTables(spark: SparkSession, metadata: FlintMetadata): Array[String] = {
val sourceTables = getArrayString(metadata.properties, "sourceTables")
val sourceTables = getSourceTablesFromMetadata(metadata)
if (sourceTables.isEmpty) {
FlintSparkMaterializedView.extractSourceTableNames(spark, metadata.source)
FlintSparkMaterializedView.extractSourceTablesFromQuery(spark, metadata.source)
} else {
sourceTables
}
Expand All @@ -161,12 +161,4 @@ object FlintSparkIndexFactory extends Logging {
Some(value.asInstanceOf[String])
}
}

private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = {
map.get(key) match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.FlintSparkIndexOptions
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser

import org.apache.spark.internal.Logging

/**
* Flint metadata cache defines metadata required to store in read cache for frontend user to
* access.
Expand Down Expand Up @@ -47,7 +45,7 @@ case class FlintMetadataCache(
}
}

object FlintMetadataCache extends Logging {
object FlintMetadataCache {

val metadataCacheVersion = "1.0"

Expand All @@ -63,17 +61,7 @@ object FlintMetadataCache extends Logging {
None
}
val sourceTables = metadata.kind match {
case MV_INDEX_TYPE =>
val sourceTables = metadata.properties.get("sourceTables")
logInfo(s"sourceTables: $sourceTables")
sourceTables match {
case list: java.util.ArrayList[_] =>
logInfo("sourceTables is ArrayList")
list.toArray.map(_.toString)
case _ =>
logInfo(s"sourceTables is not ArrayList. It is of type: ${sourceTables.getClass.getName}")
Array.empty[String]
}
case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata)
case _ => Array(metadata.source)
}
val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
*/
private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = {
val metadataCacheProperties = FlintMetadataCache(metadata).toMap
logInfo(s"metadataCacheProperties: $metadataCacheProperties")

if (includeSpec) {
(metadataCacheProperties ++ metadata.properties.asScala).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.flint.spark.mv

import java.util.Locale

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.convert.ImplicitConversions.`map AsScala`

Expand All @@ -18,6 +19,7 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
Expand Down Expand Up @@ -147,7 +149,7 @@ case class FlintSparkMaterializedView(
}
}

object FlintSparkMaterializedView {
object FlintSparkMaterializedView extends Logging {

/** MV index type name */
val MV_INDEX_TYPE = "mv"
Expand Down Expand Up @@ -179,13 +181,43 @@ object FlintSparkMaterializedView {
* @return
* source table names
*/
def extractSourceTableNames(spark: SparkSession, query: String): Array[String] = {
spark.sessionState.sqlParser
def extractSourceTablesFromQuery(spark: SparkSession, query: String): Array[String] = {
logInfo(s"Extracting source tables from query $query")
val sourceTables = spark.sessionState.sqlParser
.parsePlan(query)
.collect { case relation: UnresolvedRelation =>
qualifyTableName(spark, relation.tableName)
}
.toArray
logInfo(s"Extracted tables: [${sourceTables.mkString(", ")}]")
sourceTables
}

/**
* Get source tables from Flint metadata properties field. TODO: test case
*
* @param metadata
* Flint metadata
* @return
* source table names
*/
def getSourceTablesFromMetadata(metadata: FlintMetadata): Array[String] = {
logInfo(s"Getting source tables from metadata $metadata")
metadata.properties.get("sourceTables") match {
case list: java.util.ArrayList[_] =>
logInfo(s"sourceTables is java.util.ArrayList: [${list.asScala.mkString(", ")}]")
list.toArray.map(_.toString)
case array: Array[_] =>
logInfo(s"sourceTables is Array: [${array.mkString(", ")}]")
array.map(_.toString)
case null =>
logInfo("sourceTables property does not exist")
Array.empty[String]
case _ =>
logInfo(
s"sourceTables is of type: ${metadata.properties.get("sourceTables").getClass.getName}")
Array.empty[String]
}
}

/** Builder class for MV build */
Expand Down Expand Up @@ -217,7 +249,7 @@ object FlintSparkMaterializedView {
*/
def query(query: String): Builder = {
this.query = query
this.sourceTables = extractSourceTableNames(flint.spark, query)
this.sourceTables = extractSourceTablesFromQuery(flint.spark, query)
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTableNames, getFlintIndexName}
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTablesFromQuery, getFlintIndexName}
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler
import org.scalatest.matchers.must.Matchers._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
Expand Down Expand Up @@ -65,14 +65,14 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| FROM spark_catalog.default.`table/3`
| INNER JOIN spark_catalog.default.`table.4`
|""".stripMargin
extractSourceTableNames(flint.spark, testComplexQuery) should contain theSameElementsAs
extractSourceTablesFromQuery(flint.spark, testComplexQuery) should contain theSameElementsAs
Array(
"spark_catalog.default.table1",
"spark_catalog.default.table2",
"spark_catalog.default.`table/3`",
"spark_catalog.default.`table.4`")

extractSourceTableNames(flint.spark, "SELECT 1") should have size 0
extractSourceTablesFromQuery(flint.spark, "SELECT 1") should have size 0
}

test("create materialized view with metadata successfully") {
Expand Down

0 comments on commit dd4535b

Please sign in to comment.