Skip to content

Commit

Permalink
implement proto
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 9, 2023
1 parent 71a2742 commit b643d72
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 34 deletions.
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard { .. } => internal_err!("Create physical name does not support wildcard"),
Expr::Wildcard { .. } => {
internal_err!("Create physical name does not support wildcard")
}
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
}
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::{array_agg, avg, col, count, exists, expr, in_subquery, lit, max, out_ref_col, scalar_subquery, sum, AggregateFunction, Expr, ExprSchemable, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunction, wildcard};
use datafusion_expr::{
array_agg, avg, col, count, exists, expr, in_subquery, lit, max, out_ref_col,
scalar_subquery, sum, wildcard, AggregateFunction, Expr, ExprSchemable, WindowFrame,
WindowFrameBound, WindowFrameUnits, WindowFunction,
};
use datafusion_physical_expr::var_provider::{VarProvider, VarType};

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,7 @@ mod tests {

let plan = table_scan(Some("t1"), &employee_schema(), None)?
.join_using(t2, JoinType::Inner, vec!["id"])?
.project(vec![Expr::Wildcard{qualifier: None}])?
.project(vec![Expr::Wildcard { qualifier: None }])?
.build()?;

// id column should only show up once in projection
Expand Down
42 changes: 25 additions & 17 deletions datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,17 @@ impl TreeNodeRewriter for CountWildcardRewriter {
order_by,
window_frame,
}) if args.len() == 1 => match args[0] {
Expr::Wildcard { qualifier: None } => Expr::WindowFunction(expr::WindowFunction {
fun: window_function::WindowFunction::AggregateFunction(
aggregate_function::AggregateFunction::Count,
),
args: vec![lit(COUNT_STAR_EXPANSION)],
partition_by,
order_by,
window_frame,
}),
Expr::Wildcard { qualifier: None } => {
Expr::WindowFunction(expr::WindowFunction {
fun: window_function::WindowFunction::AggregateFunction(
aggregate_function::AggregateFunction::Count,
),
args: vec![lit(COUNT_STAR_EXPANSION)],
partition_by,
order_by,
window_frame,
})
}

_ => old_expr,
},
Expand All @@ -148,13 +150,15 @@ impl TreeNodeRewriter for CountWildcardRewriter {
filter,
order_by,
}) if args.len() == 1 => match args[0] {
Expr::Wildcard { qualifier: None } => Expr::AggregateFunction(AggregateFunction {
fun: aggregate_function::AggregateFunction::Count,
args: vec![lit(COUNT_STAR_EXPANSION)],
distinct,
filter,
order_by,
}),
Expr::Wildcard { qualifier: None } => {
Expr::AggregateFunction(AggregateFunction {
fun: aggregate_function::AggregateFunction::Count,
args: vec![lit(COUNT_STAR_EXPANSION)],
distinct,
filter,
order_by,
})
}
_ => old_expr,
},

Expand Down Expand Up @@ -219,7 +223,11 @@ mod tests {
use arrow::datatypes::DataType;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::expr::Sort;
use datafusion_expr::{col, count, exists, expr, in_subquery, lit, logical_plan::LogicalPlanBuilder, max, out_ref_col, scalar_subquery, AggregateFunction, Expr, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunction, wildcard};
use datafusion_expr::{
col, count, exists, expr, in_subquery, lit, logical_plan::LogicalPlanBuilder,
max, out_ref_col, scalar_subquery, wildcard, AggregateFunction, Expr,
WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunction,
};

fn assert_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
assert_analyzed_plan_eq_display_indent(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/analyzer/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ fn generate_projection_expr(
));
}
} else {
exprs.push(Expr::Wildcard { qualifier: None});
exprs.push(Expr::Wildcard { qualifier: None });
}
Ok(exprs)
}
Expand Down
6 changes: 5 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ message LogicalExprNode {
SortExprNode sort = 12;
NegativeNode negative = 13;
InListNode in_list = 14;
bool wildcard = 15;
Wildcard wildcard = 15;
ScalarFunctionNode scalar_function = 16;
TryCastNode try_cast = 17;

Expand Down Expand Up @@ -399,6 +399,10 @@ message LogicalExprNode {
}
}

message Wildcard {
optional string qualifier = 1;
}

message PlaceholderNode {
string id = 1;
ArrowType data_type = 2;
Expand Down
94 changes: 93 additions & 1 deletion datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,9 @@ pub fn parse_expr(
.collect::<Result<Vec<_>, _>>()?,
in_list.negated,
))),
ExprType::Wildcard(_) => Ok(Expr::Wildcard),
ExprType::Wildcard(protobuf::Wildcard { qualifier }) => Ok(Expr::Wildcard {
qualifier: qualifier.clone(),
}),
ExprType::ScalarFunction(expr) => {
let scalar_function = protobuf::ScalarFunction::try_from(expr.fun)
.map_err(|_| Error::unknown("ScalarFunction", expr.fun))?;
Expand Down
11 changes: 4 additions & 7 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,8 +960,10 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
expr_type: Some(ExprType::InList(expr)),
}
}
Expr::Wildcard => Self {
expr_type: Some(ExprType::Wildcard(true)),
Expr::Wildcard { qualifier } => Self {
expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
qualifier: qualifier.clone(),
})),
},
Expr::ScalarSubquery(_)
| Expr::InSubquery(_)
Expand Down Expand Up @@ -1052,11 +1054,6 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
})),
}
}

Expr::QualifiedWildcard { .. } => return Err(Error::General(
"Proto serialization error: Expr::QualifiedWildcard { .. } not supported"
.to_string(),
)),
};

Ok(expr_node)
Expand Down
12 changes: 11 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,17 @@ fn roundtrip_inlist() {

#[test]
fn roundtrip_wildcard() {
let test_expr = Expr::Wildcard;
let test_expr = Expr::Wildcard { qualifier: None };

let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
}

#[test]
fn roundtrip_qualified_wildcard() {
let test_expr = Expr::Wildcard {
qualifier: Some("foo".into()),
};

let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
Expand Down

0 comments on commit b643d72

Please sign in to comment.