-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add support for array_remove expression #1179
Changes from 6 commits
d477f3b
988e021
1dbb3d3
ec89c3a
04bee72
68ba5ad
866423b
1ed00e1
10a4b47
de3d599
f8485c5
3594ded
4966d7c
cefc7f8
3caf95c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -66,6 +66,7 @@ use datafusion::{ | |||
}; | ||||
use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}; | ||||
use datafusion_functions_nested::concat::ArrayAppend; | ||||
use datafusion_functions_nested::remove::array_remove_all_udf; | ||||
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; | ||||
|
||||
use crate::execution::shuffle::CompressionCodec; | ||||
|
@@ -735,6 +736,36 @@ impl PhysicalPlanner { | |||
)); | ||||
Ok(array_has_expr) | ||||
} | ||||
ExprStruct::ArrayRemove(expr) => { | ||||
let src_array_expr = | ||||
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; | ||||
let key_expr = | ||||
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; | ||||
let args = vec![Arc::clone(&src_array_expr), Arc::clone(&key_expr)]; | ||||
let return_type = src_array_expr.data_type(&input_schema)?; | ||||
|
||||
let datafusion_array_remove = array_remove_all_udf(); | ||||
|
||||
let array_remove_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new( | ||||
"array_remove", | ||||
datafusion_array_remove, | ||||
args, | ||||
return_type, | ||||
)); | ||||
let is_null_expr: Arc<dyn PhysicalExpr> = Arc::new(IsNullExpr::new(key_expr)); | ||||
|
||||
let null_literal_expr: Arc<dyn PhysicalExpr> = | ||||
Arc::new(Literal::new(ScalarValue::Null)); | ||||
|
||||
let case_expr = CaseExpr::try_new( | ||||
None, | ||||
vec![(is_null_expr, null_literal_expr)], | ||||
Some(array_remove_expr), | ||||
)?; | ||||
|
||||
Ok(Arc::new(case_expr)) | ||||
// Ok(array_remove_expr) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you remove this commented out code?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed ! 😅 |
||||
} | ||||
expr => Err(ExecutionError::GeneralError(format!( | ||||
"Not implemented: {:?}", | ||||
expr | ||||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -2529,4 +2529,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | |||||||||||
spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); | ||||||||||||
} | ||||||||||||
} | ||||||||||||
test("array_remove") { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: there should be a blank line between tests:
Suggested change
|
||||||||||||
Seq(true, false).foreach { dictionaryEnabled => | ||||||||||||
withTempDir { dir => | ||||||||||||
val path = new Path(dir.toURI.toString, "test.parquet") | ||||||||||||
makeParquetFileAllTypes(path, dictionaryEnabled, 10000) | ||||||||||||
spark.read.parquet(path.toString).createOrReplaceTempView("t1"); | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: no need for semicolon:
Suggested change
|
||||||||||||
checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), _2) from t1")) | ||||||||||||
checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 3) from t1")) | ||||||||||||
checkSparkAnswerAndOperator(sql("SELECT array_remove(array(_2, _3,_4), 10) from t1")) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add test case where key is null ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also can we tests where the array is null, array is empty, and array contains nulls. |
||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This can be inlined