Skip to content

Commit

Permalink
add test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Nov 8, 2024
1 parent dd4535b commit efd2207
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ object FlintSparkMaterializedView extends Logging {
}

/**
* Get source tables from Flint metadata properties field. TODO: test case
* Get source tables from Flint metadata properties field.
*
* @param metadata
* Flint metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTablesFromQuery, getFlintIndexName}
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTablesFromQuery, getFlintIndexName, getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler
import org.scalatest.matchers.must.Matchers._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
Expand Down Expand Up @@ -75,6 +75,70 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
extractSourceTablesFromQuery(flint.spark, "SELECT 1") should have size 0
}

test("get source table names from index metadata successfully") {
val mv = FlintSparkMaterializedView(
"spark_catalog.default.mv",
s"SELECT 1 FROM $testTable",
Array(testTable),
Map("1" -> "integer"))
val metadata = mv.metadata()
metadata.properties.get("sourceTables") shouldBe a[Array[String]]
getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable)
}

test("get source table names from deserialized metadata successfully") {
val metadata = FlintOpenSearchIndexMetadataService.deserialize(s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": {
| "sourceTables": [
| "$testTable"
| ]
| }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin)
metadata.properties.get("sourceTables") shouldBe a[java.util.ArrayList[String]]
getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable)
}

test("get empty source tables from invalid field in metadata") {
val metadataWrongType = FlintOpenSearchIndexMetadataService.deserialize(s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": {
| "sourceTables": "$testTable"
| }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin)
val metadataMissingField = FlintOpenSearchIndexMetadataService.deserialize(s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": { }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin)

getSourceTablesFromMetadata(metadataWrongType) shouldBe empty
getSourceTablesFromMetadata(metadataMissingField) shouldBe empty
}

test("create materialized view with metadata successfully") {
withTempDir { checkpointDir =>
val indexOptions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService}
import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.scalatest.Entry
Expand Down Expand Up @@ -166,7 +167,25 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
}
}

test(s"write metadata cache to materialized view index mappings with source tables") {
test("write metadata cache with source tables from index metadata") {
val mv = FlintSparkMaterializedView(
"spark_catalog.default.mv",
s"SELECT 1 FROM $testTable",
Array(testTable),
Map("1" -> "integer"))
val metadata = mv.metadata().copy(latestLogEntry = Some(flintMetadataLogEntry))

flintClient.createIndex(testFlintIndex, metadata)
flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata)

val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(testTable)
}

test("write metadata cache with source tables from deserialized metadata") {
val testTable2 = "spark_catalog.default.metadatacache_test2"
val content =
s""" {
Expand Down

0 comments on commit efd2207

Please sign in to comment.