From 4651aab15b48a3c8e3c132f77572688bdf10b582 Mon Sep 17 00:00:00 2001 From: Nick Del Nano Date: Tue, 7 Jan 2025 13:09:29 -0800 Subject: [PATCH 1/5] Update SmokeTest tests to reflect Getting Started Spark docs --- .../java/org/apache/iceberg/spark/SmokeTest.java | 14 ++++++-------- .../java/org/apache/iceberg/spark/SmokeTest.java | 14 ++++++-------- .../java/org/apache/iceberg/spark/SmokeTest.java | 16 ++++++---------- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/spark/v3.3/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java b/spark/v3.3/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java index 51e3721aea4f..9545892d18da 100644 --- a/spark/v3.3/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java +++ b/spark/v3.3/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java @@ -40,8 +40,6 @@ public void dropTable() { } // Run through our Doc's Getting Started Example - // TODO Update doc example so that it can actually be run, modifications were required for this - // test suite to run @Test public void testGettingStarted() throws IOException { // Creating a table @@ -66,25 +64,25 @@ public void testGettingStarted() throws IOException { sql( "CREATE TABLE updates (id bigint, data string) USING parquet LOCATION '%s'", temp.newFolder()); - sql("INSERT INTO updates VALUES (1, 'x'), (2, 'x'), (4, 'z')"); + sql("INSERT INTO updates VALUES (1, 'x'), (2, 'y'), (4, 'z')"); sql( "MERGE INTO %s t USING (SELECT * FROM updates) u ON t.id = u.id\n" + "WHEN MATCHED THEN UPDATE SET t.data = u.data\n" + "WHEN NOT MATCHED THEN INSERT *", tableName); + + // Reading Assert.assertEquals( "Table should now have 5 rows", 5L, scalarSql("SELECT COUNT(*) FROM %s", tableName)); Assert.assertEquals( "Record 1 should now have data x", "x", scalarSql("SELECT data FROM %s WHERE id = 1", tableName)); - - // Reading Assert.assertEquals( - "There should be 2 records with data x", - 2L, - scalarSql("SELECT count(1) as count FROM %s WHERE data = 'x' GROUP BY data ", tableName)); + "Record 2 should now have data y", + "y", + scalarSql("SELECT data FROM %s WHERE id = 2", tableName)); // Not supported because of Spark limitation if (!catalogName.equals("spark_catalog")) { diff --git a/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java b/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java index 20be98d17bb2..ada2db3a6e5e 100644 --- a/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java +++ b/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java @@ -42,8 +42,6 @@ public void dropTable() { } // Run through our Doc's Getting Started Example - // TODO Update doc example so that it can actually be run, modifications were required for this - // test suite to run @Test public void testGettingStarted() throws IOException { // Creating a table @@ -68,25 +66,25 @@ public void testGettingStarted() throws IOException { sql( "CREATE TABLE updates (id bigint, data string) USING parquet LOCATION '%s'", temp.newFolder()); - sql("INSERT INTO updates VALUES (1, 'x'), (2, 'x'), (4, 'z')"); + sql("INSERT INTO updates VALUES (1, 'x'), (2, 'y'), (4, 'z')"); sql( "MERGE INTO %s t USING (SELECT * FROM updates) u ON t.id = u.id\n" + "WHEN MATCHED THEN UPDATE SET t.data = u.data\n" + "WHEN NOT MATCHED THEN INSERT *", tableName); + + // Reading Assert.assertEquals( "Table should now have 5 rows", 5L, scalarSql("SELECT COUNT(*) FROM %s", tableName)); Assert.assertEquals( "Record 1 should now have data x", "x", scalarSql("SELECT data FROM %s WHERE id = 1", tableName)); - - // Reading Assert.assertEquals( - "There should be 2 records with data x", - 2L, - scalarSql("SELECT count(1) as count FROM %s WHERE data = 'x' GROUP BY data ", tableName)); + "Record 2 should now have data y", + "y", + scalarSql("SELECT data FROM %s WHERE id = 2", tableName)); // Not supported because of Spark limitation if (!catalogName.equals("spark_catalog")) { diff --git a/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java b/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java index ec445774a452..34d2011f0760 100644 --- a/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java +++ b/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java @@ -38,8 +38,6 @@ public void dropTable() { } // Run through our Doc's Getting Started Example - // TODO Update doc example so that it can actually be run, modifications were required for this - // test suite to run @TestTemplate public void testGettingStarted() throws IOException { // Creating a table @@ -66,26 +64,24 @@ public void testGettingStarted() throws IOException { sql( "CREATE TABLE updates (id bigint, data string) USING parquet LOCATION '%s'", Files.createTempDirectory(temp, "junit")); - sql("INSERT INTO updates VALUES (1, 'x'), (2, 'x'), (4, 'z')"); + sql("INSERT INTO updates VALUES (1, 'x'), (2, 'y'), (4, 'z')"); sql( "MERGE INTO %s t USING (SELECT * FROM updates) u ON t.id = u.id\n" + "WHEN MATCHED THEN UPDATE SET t.data = u.data\n" + "WHEN NOT MATCHED THEN INSERT *", tableName); + + // Reading assertThat(scalarSql("SELECT COUNT(*) FROM %s", tableName)) .as("Table should now have 5 rows") .isEqualTo(5L); assertThat(scalarSql("SELECT data FROM %s WHERE id = 1", tableName)) .as("Record 1 should now have data x") .isEqualTo("x"); - - // Reading - assertThat( - scalarSql( - "SELECT count(1) as count FROM %s WHERE data = 'x' GROUP BY data ", tableName)) - .as("There should be 2 records with data x") - .isEqualTo(2L); + assertThat(scalarSql("SELECT data FROM %s WHERE id = 2", tableName)) + .as("Record 2 should now have data y") + .isEqualTo("y"); // Not supported because of Spark limitation if (!catalogName.equals("spark_catalog")) { From 3c979494c38229ae96c7db1cb59e877bfbbdb793 Mon Sep 17 00:00:00 2001 From: Nick Del Nano Date: Tue, 7 Jan 2025 14:20:41 -0800 Subject: [PATCH 2/5] Update Getting Started Spark docs to have valid examples --- docs/docs/spark-getting-started.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/docs/spark-getting-started.md b/docs/docs/spark-getting-started.md index 2bcdbd23eb1e..33c33c421dc1 100644 --- a/docs/docs/spark-getting-started.md +++ b/docs/docs/spark-getting-started.md @@ -77,21 +77,24 @@ Once your table is created, insert data using [`INSERT INTO`](spark-writes.md#in ```sql INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c'); -INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1; ``` Iceberg also adds row-level SQL updates to Spark, [`MERGE INTO`](spark-writes.md#merge-into) and [`DELETE FROM`](spark-writes.md#delete-from): ```sql -MERGE INTO local.db.target t USING (SELECT * FROM updates) u ON t.id = u.id -WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count +CREATE TABLE local.db.updates (id bigint, data string) USING iceberg; +INSERT INTO local.db.updates VALUES (1, 'x'), (2, 'y'), (4, 'z'); + +MERGE INTO local.db.table t +USING (SELECT * FROM local.db.updates) u ON t.id = u.id +WHEN MATCHED THEN UPDATE SET t.data = u.data WHEN NOT MATCHED THEN INSERT *; ``` Iceberg supports writing DataFrames using the new [v2 DataFrame write API](spark-writes.md#writing-with-dataframes): ```scala -spark.table("source").select("id", "data") +spark.table("local.db.updates").select("id", "data") .writeTo("local.db.table").append() ``` @@ -160,7 +163,7 @@ This type conversion table describes how Spark types are converted to the Iceber | map | map | | !!! info - The table is based on representing conversion during creating table. In fact, broader supports are applied on write. Here're some points on write: + The table is based on type conversions during table creation. Broader type conversions are applied on write: * Iceberg numeric types (`integer`, `long`, `float`, `double`, `decimal`) support promotion during writes. e.g. You can write Spark types `short`, `byte`, `integer`, `long` to Iceberg type `long`. * You can write to Iceberg `fixed` type using Spark `binary` type. Note that assertion on the length will be performed. From ea5951c526c233277cd406b67057d60c6c9f9657 Mon Sep 17 00:00:00 2001 From: Nick Del Nano Date: Fri, 10 Jan 2025 13:44:59 -0800 Subject: [PATCH 3/5] Review feedback on Getting Started Spark docs --- docs/docs/spark-getting-started.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/docs/spark-getting-started.md b/docs/docs/spark-getting-started.md index 33c33c421dc1..a52b8ff6a5be 100644 --- a/docs/docs/spark-getting-started.md +++ b/docs/docs/spark-getting-started.md @@ -83,12 +83,16 @@ Iceberg also adds row-level SQL updates to Spark, [`MERGE INTO`](spark-writes.md ```sql CREATE TABLE local.db.updates (id bigint, data string) USING iceberg; + INSERT INTO local.db.updates VALUES (1, 'x'), (2, 'y'), (4, 'z'); MERGE INTO local.db.table t USING (SELECT * FROM local.db.updates) u ON t.id = u.id WHEN MATCHED THEN UPDATE SET t.data = u.data WHEN NOT MATCHED THEN INSERT *; + +-- ((1, 'x'), (2, 'y'), (3, 'c'), (4, 'z')) +SELECT * FROM local.db.table; ``` Iceberg supports writing DataFrames using the new [v2 DataFrame write API](spark-writes.md#writing-with-dataframes): @@ -163,7 +167,7 @@ This type conversion table describes how Spark types are converted to the Iceber | map | map | | !!! info - The table is based on type conversions during table creation. Broader type conversions are applied on write: + Broader type conversions are applied on write: * Iceberg numeric types (`integer`, `long`, `float`, `double`, `decimal`) support promotion during writes. e.g. You can write Spark types `short`, `byte`, `integer`, `long` to Iceberg type `long`. * You can write to Iceberg `fixed` type using Spark `binary` type. Note that assertion on the length will be performed. From 98c99f1010e1c78b973461f20c0fe874c535d38e Mon Sep 17 00:00:00 2001 From: Nick Del Nano Date: Sat, 11 Jan 2025 23:15:17 -0800 Subject: [PATCH 4/5] run linter --- .../integration/java/org/apache/iceberg/spark/SmokeTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java b/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java index 34d2011f0760..3e49cd0fdd8d 100644 --- a/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java +++ b/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java @@ -80,8 +80,8 @@ public void testGettingStarted() throws IOException { .as("Record 1 should now have data x") .isEqualTo("x"); assertThat(scalarSql("SELECT data FROM %s WHERE id = 2", tableName)) - .as("Record 2 should now have data y") - .isEqualTo("y"); + .as("Record 2 should now have data y") + .isEqualTo("y"); // Not supported because of Spark limitation if (!catalogName.equals("spark_catalog")) { From a9cbadd3dc11ed082e42fdae12a640373027bb38 Mon Sep 17 00:00:00 2001 From: Nick Del Nano Date: Fri, 24 Jan 2025 08:54:38 -0800 Subject: [PATCH 5/5] run spotless on all Spark versions --- .../java/org/apache/iceberg/spark/SmokeTest.java | 6 +++--- .../java/org/apache/iceberg/spark/SmokeTest.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spark/v3.3/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java b/spark/v3.3/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java index 9545892d18da..757d4dada23a 100644 --- a/spark/v3.3/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java +++ b/spark/v3.3/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java @@ -80,9 +80,9 @@ public void testGettingStarted() throws IOException { "x", scalarSql("SELECT data FROM %s WHERE id = 1", tableName)); Assert.assertEquals( - "Record 2 should now have data y", - "y", - scalarSql("SELECT data FROM %s WHERE id = 2", tableName)); + "Record 2 should now have data y", + "y", + scalarSql("SELECT data FROM %s WHERE id = 2", tableName)); // Not supported because of Spark limitation if (!catalogName.equals("spark_catalog")) { diff --git a/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java b/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java index ada2db3a6e5e..2494f14604d2 100644 --- a/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java +++ b/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java @@ -82,9 +82,9 @@ public void testGettingStarted() throws IOException { "x", scalarSql("SELECT data FROM %s WHERE id = 1", tableName)); Assert.assertEquals( - "Record 2 should now have data y", - "y", - scalarSql("SELECT data FROM %s WHERE id = 2", tableName)); + "Record 2 should now have data y", + "y", + scalarSql("SELECT data FROM %s WHERE id = 2", tableName)); // Not supported because of Spark limitation if (!catalogName.equals("spark_catalog")) {