Skip to content

Commit

Permalink
Combine Wildcard and QualifiedWildcard, add wildcard() expr fn
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 9, 2023
1 parent 4512805 commit 6347205
Show file tree
Hide file tree
Showing 21 changed files with 229 additions and 108 deletions.
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
| Expr::AggregateFunction { .. }
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::Wildcard { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
VisitRecursion::Stop
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,8 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard => internal_err!("Create physical name does not support wildcard"),
Expr::QualifiedWildcard { .. } => {
internal_err!("Create physical name does not support qualified wildcard")
Expr::Wildcard { .. } => {
internal_err!("Create physical name does not support wildcard")
}
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
Expand Down
29 changes: 14 additions & 15 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::Expr::Wildcard;
use datafusion_expr::{
array_agg, avg, col, count, exists, expr, in_subquery, lit, max, out_ref_col,
scalar_subquery, sum, AggregateFunction, Expr, ExprSchemable, WindowFrame,
scalar_subquery, sum, wildcard, AggregateFunction, Expr, ExprSchemable, WindowFrame,
WindowFrameBound, WindowFrameUnits, WindowFunction,
};
use datafusion_physical_expr::var_provider::{VarProvider, VarType};
Expand All @@ -64,8 +63,8 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
let df_results = ctx
.table("t1")
.await?
.aggregate(vec![col("b")], vec![count(Wildcard)])?
.sort(vec![count(Wildcard).sort(true, false)])?
.aggregate(vec![col("b")], vec![count(wildcard())])?
.sort(vec![count(wildcard()).sort(true, false)])?
.explain(false, false)?
.collect()
.await?;
Expand Down Expand Up @@ -99,8 +98,8 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
Arc::new(
ctx.table("t2")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.select(vec![count(Expr::Wildcard)])?
.aggregate(vec![], vec![count(wildcard())])?
.select(vec![count(wildcard())])?
.into_unoptimized_plan(),
// Usually, into_optimized_plan() should be used here, but due to
// https://github.com/apache/arrow-datafusion/issues/5771,
Expand Down Expand Up @@ -136,8 +135,8 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> {
.filter(exists(Arc::new(
ctx.table("t2")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.select(vec![count(Expr::Wildcard)])?
.aggregate(vec![], vec![count(wildcard())])?
.select(vec![count(wildcard())])?
.into_unoptimized_plan(),
// Usually, into_optimized_plan() should be used here, but due to
// https://github.com/apache/arrow-datafusion/issues/5771,
Expand Down Expand Up @@ -172,7 +171,7 @@ async fn test_count_wildcard_on_window() -> Result<()> {
.await?
.select(vec![Expr::WindowFunction(expr::WindowFunction::new(
WindowFunction::AggregateFunction(AggregateFunction::Count),
vec![Expr::Wildcard],
vec![wildcard()],
vec![],
vec![Expr::Sort(Sort::new(Box::new(col("a")), false, true))],
WindowFrame {
Expand Down Expand Up @@ -202,17 +201,17 @@ async fn test_count_wildcard_on_aggregate() -> Result<()> {
let sql_results = ctx
.sql("select count(*) from t1")
.await?
.select(vec![count(Expr::Wildcard)])?
.select(vec![count(wildcard())])?
.explain(false, false)?
.collect()
.await?;

// add `.select(vec![count(Expr::Wildcard)])?` to make sure we can analyze all node instead of just top node.
// add `.select(vec![count(wildcard())])?` to make sure we can analyze all node instead of just top node.
let df_results = ctx
.table("t1")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.select(vec![count(Expr::Wildcard)])?
.aggregate(vec![], vec![count(wildcard())])?
.select(vec![count(wildcard())])?
.explain(false, false)?
.collect()
.await?;
Expand Down Expand Up @@ -248,8 +247,8 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
ctx.table("t2")
.await?
.filter(out_ref_col(DataType::UInt32, "t1.a").eq(col("t2.a")))?
.aggregate(vec![], vec![count(Wildcard)])?
.select(vec![col(count(Wildcard).to_string())])?
.aggregate(vec![], vec![count(wildcard())])?
.select(vec![col(count(wildcard()).to_string())])?
.into_unoptimized_plan(),
))
.gt(lit(ScalarValue::UInt8(Some(0)))),
Expand Down
29 changes: 14 additions & 15 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,12 @@ pub enum Expr {
InSubquery(InSubquery),
/// Scalar subquery
ScalarSubquery(Subquery),
/// Represents a reference to all available fields.
/// Represents a reference to all available fields in a specific schema,
/// with an optional (schema) qualifier.
///
/// This expr has to be resolved to a list of columns before translating logical
/// plan into physical plan.
Wildcard,
/// Represents a reference to all available fields in a specific schema.
///
/// This expr has to be resolved to a list of columns before translating logical
/// plan into physical plan.
QualifiedWildcard { qualifier: String },
Wildcard { qualifier: Option<String> },
/// List of grouping set expressions. Only valid in the context of an aggregate
/// GROUP BY expression list
GroupingSet(GroupingSet),
Expand Down Expand Up @@ -729,15 +725,14 @@ impl Expr {
Expr::Negative(..) => "Negative",
Expr::Not(..) => "Not",
Expr::Placeholder(_) => "Placeholder",
Expr::QualifiedWildcard { .. } => "QualifiedWildcard",
Expr::ScalarFunction(..) => "ScalarFunction",
Expr::ScalarSubquery { .. } => "ScalarSubquery",
Expr::ScalarUDF(..) => "ScalarUDF",
Expr::ScalarVariable(..) => "ScalarVariable",
Expr::Sort { .. } => "Sort",
Expr::TryCast { .. } => "TryCast",
Expr::WindowFunction { .. } => "WindowFunction",
Expr::Wildcard => "Wildcard",
Expr::Wildcard { .. } => "Wildcard",
}
}

Expand Down Expand Up @@ -1292,8 +1287,10 @@ impl fmt::Display for Expr {
write!(f, "{expr} IN ([{}])", expr_vec_fmt!(list))
}
}
Expr::Wildcard => write!(f, "*"),
Expr::QualifiedWildcard { qualifier } => write!(f, "{qualifier}.*"),
Expr::Wildcard { qualifier } => match qualifier {
Some(qualifier) => write!(f, "{qualifier}.*"),
None => write!(f, "*"),
},
Expr::GetIndexedField(GetIndexedField { field, expr }) => match field {
GetFieldAccess::NamedStructField { name } => {
write!(f, "({expr})[{name}]")
Expand Down Expand Up @@ -1613,10 +1610,12 @@ fn create_name(e: &Expr) -> Result<String> {
Expr::Sort { .. } => {
internal_err!("Create name does not support sort expression")
}
Expr::Wildcard => Ok("*".to_string()),
Expr::QualifiedWildcard { .. } => {
internal_err!("Create name does not support qualified wildcard")
}
Expr::Wildcard { qualifier } => match qualifier {
Some(qualifier) => internal_err!(
"Create name does not support qualified wildcard, got {qualifier}"
),
None => Ok("*".to_string()),
},
Expr::Placeholder(Placeholder { id, .. }) => Ok((*id).to_string()),
}
}
Expand Down
13 changes: 13 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ pub fn placeholder(id: impl Into<String>) -> Expr {
})
}

/// Create an '*' [`Expr::Wildcard`] expression that matches all columns
///
/// # Example
///
/// ```rust
/// # use datafusion_expr::{wildcard};
/// let p = wildcard();
/// assert_eq!(p.to_string(), "*")
/// ```
pub fn wildcard() -> Expr {
Expr::Wildcard { qualifier: None }
}

/// Return a new expression `left <op> right`
pub fn binary_expr(left: Expr, op: Operator, right: Expr) -> Expr {
Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
Expand Down
15 changes: 6 additions & 9 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ impl ExprSchemable for Expr {
plan_datafusion_err!("Placeholder type could not be resolved")
})
}
Expr::Wildcard => {
Expr::Wildcard { qualifier } => {
// Wildcard do not really have a type and do not appear in projections
Ok(DataType::Null)
match qualifier {
Some(_) => internal_err!("QualifiedWildcard expressions are not valid in a logical query plan"),
None => Ok(DataType::Null)
}
}
Expr::QualifiedWildcard { .. } => internal_err!(
"QualifiedWildcard expressions are not valid in a logical query plan"
),
Expr::GroupingSet(_) => {
// grouping sets do not really have a type and do not appear in projections
Ok(DataType::Null)
Expand Down Expand Up @@ -270,12 +270,9 @@ impl ExprSchemable for Expr {
| Expr::SimilarTo(Like { expr, pattern, .. }) => {
Ok(expr.nullable(input_schema)? || pattern.nullable(input_schema)?)
}
Expr::Wildcard => internal_err!(
Expr::Wildcard { .. } => internal_err!(
"Wildcard expressions are not valid in a logical query plan"
),
Expr::QualifiedWildcard { .. } => internal_err!(
"QualifiedWildcard expressions are not valid in a logical query plan"
),
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
field_for_index(expr, field, input_schema).map(|x| x.is_nullable())
}
Expand Down
13 changes: 9 additions & 4 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,11 +1287,16 @@ pub fn project(
for e in expr {
let e = e.into();
match e {
Expr::Wildcard => {
Expr::Wildcard { qualifier: None } => {
projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
}
Expr::QualifiedWildcard { ref qualifier } => projected_expr
.extend(expand_qualified_wildcard(qualifier, input_schema, None)?),
Expr::Wildcard {
qualifier: Some(qualifier),
} => projected_expr.extend(expand_qualified_wildcard(
&qualifier,
input_schema,
None,
)?),
_ => projected_expr
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
}
Expand Down Expand Up @@ -1590,7 +1595,7 @@ mod tests {

let plan = table_scan(Some("t1"), &employee_schema(), None)?
.join_using(t2, JoinType::Inner, vec!["id"])?
.project(vec![Expr::Wildcard])?
.project(vec![Expr::Wildcard { qualifier: None }])?
.build()?;

// id column should only show up once in projection
Expand Down
8 changes: 2 additions & 6 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ impl TreeNode for Expr {
| Expr::Literal(_)
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::Wildcard {..}
| Expr::Placeholder (_) => vec![],
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
vec![left.as_ref().clone(), right.as_ref().clone()]
Expand Down Expand Up @@ -350,10 +349,7 @@ impl TreeNode for Expr {
transform_vec(list, &mut transform)?,
negated,
)),
Expr::Wildcard => Expr::Wildcard,
Expr::QualifiedWildcard { qualifier } => {
Expr::QualifiedWildcard { qualifier }
}
Expr::Wildcard { qualifier } => Expr::Wildcard { qualifier },
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
Expr::GetIndexedField(GetIndexedField::new(
transform_boxed(expr, &mut transform)?,
Expand Down
3 changes: 1 addition & 2 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
| Expr::Exists { .. }
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::Wildcard { .. }
| Expr::GetIndexedField { .. }
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn { .. } => {}
Expand Down
Loading

0 comments on commit 6347205

Please sign in to comment.