From d477f3ba798970c5445858a75523b1e623dfd064 Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Wed, 18 Dec 2024 22:05:25 +0530 Subject: [PATCH 01/10] wip: array remove --- native/core/src/execution/planner.rs | 19 ++++++++++++++++++ native/proto/src/proto/expr.proto | 1 + .../apache/comet/serde/QueryPlanSerde.scala | 6 ++++++ .../apache/comet/CometExpressionSuite.scala | 20 +++++++++++++++++++ 4 files changed, 46 insertions(+) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 3ac830c04..0865e6bd3 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -66,6 +66,7 @@ use datafusion::{ }; use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}; use datafusion_functions_nested::concat::ArrayAppend; +use datafusion_functions_nested::remove::array_remove_all_udf; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use crate::execution::spark_plan::SparkPlan; @@ -719,6 +720,24 @@ impl PhysicalPlanner { expr.legacy_negative_index, ))) } + ExprStruct::ArrayRemove(expr) => { + let src_array_expr = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let key_expr = + self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; + let args = vec![Arc::clone(&src_array_expr), key_expr]; + let return_type = src_array_expr.data_type(&input_schema)?; + + let datafusion_array_remove = array_remove_all_udf(); + + let array_remove_expr: Arc = Arc::new(ScalarFunctionExpr::new( + "array_remove", + datafusion_array_remove, + args, + return_type, + )); + Ok(array_remove_expr) + } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", expr diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 7a8ea78d5..450477b40 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -84,6 +84,7 @@ message Expr { GetArrayStructFields get_array_struct_fields = 57; BinaryExpr array_append = 58; ArrayInsert array_insert = 59; + BinaryExpr array_remove = 60; } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b33f6b5a6..a9b9204d4 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2266,6 +2266,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo(expr, "unsupported arguments for GetArrayStructFields", child) None } + case expr if expr.prettyName == "array_remove" => + createBinaryExpr( + expr.children(0), + expr.children(1), + inputs, + (builder, binaryExpr) => builder.setArrayRemove(binaryExpr)) case _ if expr.prettyName == "array_append" => createBinaryExpr( expr.children(0), diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index cce7cb20a..825fb1c1b 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2517,4 +2517,24 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkAnswer(df.select("arrUnsupportedArgs")) } } + test("array_remove") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + + // Test basic array_remove functionality + checkSparkAnswerAndOperator(sql("SELECT array_remove(array(1, 2, 3, null, 3), 2)")) + + // Test removing multiple occurrences + checkSparkAnswerAndOperator(sql("SELECT array_remove(array(1, 3, 3, null, 3), 3)")) + + // Test removing null + checkSparkAnswerAndOperator(sql("SELECT array_remove(array(1, 2, null, 4, null), null)")) + + // Test when element doesn't exist + checkSparkAnswerAndOperator(sql("SELECT array_remove(array(1, 2, 3), 5)")) + } + } + } } From 988e021593313f72af4d4c027f25d71e7068afe4 Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Fri, 20 Dec 2024 16:03:34 +0530 Subject: [PATCH 02/10] added comet expression test --- .../org/apache/comet/CometExpressionSuite.scala | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 825fb1c1b..b40650a7a 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2522,18 +2522,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllTypes(path, dictionaryEnabled, 10000) - - // Test basic array_remove functionality - checkSparkAnswerAndOperator(sql("SELECT array_remove(array(1, 2, 3, null, 3), 2)")) - - // Test removing multiple occurrences - checkSparkAnswerAndOperator(sql("SELECT array_remove(array(1, 3, 3, null, 3), 3)")) - - // Test removing null - checkSparkAnswerAndOperator(sql("SELECT array_remove(array(1, 2, null, 4, null), null)")) - - // Test when element doesn't exist - checkSparkAnswerAndOperator(sql("SELECT array_remove(array(1, 2, 3), 5)")) + spark.read.parquet(path.toString).createOrReplaceTempView("t1"); + checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 2) from t1")) + checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 3) from t1")) + checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 5) from t1")) } } } From 1dbb3d35ff959170c70828286346db317ced3b36 Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Sat, 21 Dec 2024 11:32:17 +0530 Subject: [PATCH 03/10] updated test cases --- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index b40650a7a..19fbd97d7 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2523,9 +2523,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllTypes(path, dictionaryEnabled, 10000) spark.read.parquet(path.toString).createOrReplaceTempView("t1"); - checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 2) from t1")) + checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), _2) from t1")) checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 3) from t1")) - checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 5) from t1")) + checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 10) from t1")) } } } From 68ba5add45d191cc0b1c0bb0057a10ac2876d979 Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Tue, 7 Jan 2025 22:11:08 +0530 Subject: [PATCH 04/10] fixed array_remove function for null values --- native/core/src/execution/planner.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 8b95b85c8..f6c86a0fe 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -741,7 +741,7 @@ impl PhysicalPlanner { self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let key_expr = self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; - let args = vec![Arc::clone(&src_array_expr), key_expr]; + let args = vec![Arc::clone(&src_array_expr), Arc::clone(&key_expr)]; let return_type = src_array_expr.data_type(&input_schema)?; let datafusion_array_remove = array_remove_all_udf(); @@ -752,7 +752,19 @@ impl PhysicalPlanner { args, return_type, )); - Ok(array_remove_expr) + let is_null_expr: Arc = Arc::new(IsNullExpr::new(key_expr)); + + let null_literal_expr: Arc = + Arc::new(Literal::new(ScalarValue::Null)); + + let case_expr = CaseExpr::try_new( + None, + vec![(is_null_expr, null_literal_expr)], + Some(array_remove_expr), + )?; + + Ok(Arc::new(case_expr)) + // Ok(array_remove_expr) } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", From 866423bc88c9bac96d836a4c2ec9065b20fecfa2 Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Tue, 7 Jan 2025 23:58:14 +0530 Subject: [PATCH 05/10] removed commented code --- native/core/src/execution/planner.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f6c86a0fe..f1c584e8c 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -764,7 +764,6 @@ impl PhysicalPlanner { )?; Ok(Arc::new(case_expr)) - // Ok(array_remove_expr) } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", From 10a4b47b6ea1e93b74e313d181ce6f2078045607 Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Wed, 8 Jan 2025 00:07:47 +0530 Subject: [PATCH 06/10] remove unnecessary code --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 024e0ef3d..84fffe77d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2865,11 +2865,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case RightOuter => JoinType.RightOuter case FullOuter => JoinType.FullOuter case LeftSemi => JoinType.LeftSemi - // TODO: DF SMJ with join condition fails TPCH q21 - case LeftAnti if condition.isEmpty => JoinType.LeftAnti - case LeftAnti => - withInfo(join, "LeftAnti SMJ join with condition is not supported") - return None + case LeftAnti => JoinType.LeftAnti case _ => // Spark doesn't support other join types withInfo(op, s"Unsupported join type ${join.joinType}") From de3d5997b61e0d911f82419c8696fa06765aa7cb Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Wed, 8 Jan 2025 00:14:22 +0530 Subject: [PATCH 07/10] updated the test for 'array_remove' --- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index ff553ca96..9c2d9545b 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2535,8 +2535,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllTypes(path, dictionaryEnabled, 10000) spark.read.parquet(path.toString).createOrReplaceTempView("t1"); - checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), _2) from t1")) - checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 3) from t1")) + checkSparkAnswerAndOperator( + sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null")) + checkSparkAnswerAndOperator( + sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 10) from t1")) } } From f8485c514ed96e1bf64f37c825d1384cd2335f93 Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Fri, 10 Jan 2025 00:07:40 +0530 Subject: [PATCH 08/10] added test for array_remove in case the input array is null --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 9c2d9545b..4eb32feef 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2539,7 +2539,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null")) checkSparkAnswerAndOperator( sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) - checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 10) from t1")) + checkSparkAnswerAndOperator(sql( + "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1")) } } } From 3594ded24807c1e90994de3ea99b8c64f9ada12d Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Fri, 10 Jan 2025 00:10:57 +0530 Subject: [PATCH 09/10] wip: case array is empty --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 4eb32feef..46e447294 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2541,6 +2541,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) checkSparkAnswerAndOperator(sql( "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1")) + checkSparkAnswerAndOperator(sql( + "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE array() END, _3) FROM t1")) } } } From 4966d7c02f7bcde68ad9f9f1bac8c52305664007 Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Fri, 10 Jan 2025 11:05:53 +0530 Subject: [PATCH 10/10] removed test case for empty array --- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 46e447294..8c2759a38 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2529,20 +2529,19 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); } } + test("array_remove") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1"); + spark.read.parquet(path.toString).createOrReplaceTempView("t1") checkSparkAnswerAndOperator( sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null")) checkSparkAnswerAndOperator( sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) checkSparkAnswerAndOperator(sql( "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1")) - checkSparkAnswerAndOperator(sql( - "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE array() END, _3) FROM t1")) } } }