Skip to content

Commit

Permalink
Add stacker and recursive (#13310)
Browse files Browse the repository at this point in the history
* Add stacker and recursive

* add test

* fine tune set_expr_to_plan min stack size

* format tomls

* add recursive annotation to more custom recursive rules

* fix comments

* fix comment about min stack requirement

---------

Co-authored-by: blaginin <[email protected]>
  • Loading branch information
peter-toth and blaginin authored Nov 11, 2024
1 parent 5309d83 commit bab02b6
Show file tree
Hide file tree
Showing 18 changed files with 109 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pbjson = { version = "0.7.0" }
prost = "0.13.1"
prost-derive = "0.13.1"
rand = "0.8"
recursive = "0.1.1"
regex = "1.8"
rstest = "0.23.0"
serde_json = "1"
Expand Down
47 changes: 47 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.22.0", optional = true }
recursive = { workspace = true }
sqlparser = { workspace = true }
tokio = { workspace = true }

Expand Down
20 changes: 20 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! [`TreeNode`] for visiting and rewriting expression and plan trees

use recursive::recursive;
use std::sync::Arc;

use crate::Result;
Expand Down Expand Up @@ -123,6 +124,7 @@ pub trait TreeNode: Sized {
/// TreeNodeVisitor::f_up(ChildNode2)
/// TreeNodeVisitor::f_up(ParentNode)
/// ```
#[recursive]
fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>(
&'n self,
visitor: &mut V,
Expand Down Expand Up @@ -172,6 +174,7 @@ pub trait TreeNode: Sized {
/// TreeNodeRewriter::f_up(ChildNode2)
/// TreeNodeRewriter::f_up(ParentNode)
/// ```
#[recursive]
fn rewrite<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand All @@ -194,6 +197,7 @@ pub trait TreeNode: Sized {
&'n self,
mut f: F,
) -> Result<TreeNodeRecursion> {
#[recursive]
fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result<TreeNodeRecursion>>(
node: &'n N,
f: &mut F,
Expand Down Expand Up @@ -228,6 +232,7 @@ pub trait TreeNode: Sized {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_down_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
node: N,
f: &mut F,
Expand All @@ -251,6 +256,7 @@ pub trait TreeNode: Sized {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_up_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
node: N,
f: &mut F,
Expand Down Expand Up @@ -365,6 +371,7 @@ pub trait TreeNode: Sized {
mut f_down: FD,
mut f_up: FU,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_down_up_impl<
N: TreeNode,
FD: FnMut(N) -> Result<Transformed<N>>,
Expand Down Expand Up @@ -2079,4 +2086,17 @@ pub(crate) mod tests {

Ok(())
}

#[test]
fn test_large_tree() {
let mut item = TestTreeNode::new_leaf("initial".to_string());
for i in 0..3000 {
item = TestTreeNode::new(vec![item], format!("parent-{}", i));
}

let mut visitor =
TestVisitor::new(Box::new(visit_continue), Box::new(visit_continue));

item.visit(&mut visitor).unwrap();
}
}
1 change: 1 addition & 0 deletions datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
indexmap = { workspace = true }
paste = "^1.0"
recursive = { workspace = true }
serde_json = { workspace = true }
sqlparser = { workspace = true }
strum = { version = "0.26.1", features = ["derive"] }
Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort, Subquery,
SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values, Window,
};
use recursive::recursive;
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -745,6 +746,7 @@ impl LogicalPlan {

/// Visits a plan similarly to [`Self::visit`], including subqueries that
/// may appear in expressions such as `IN (SELECT ...)`.
#[recursive]
pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
&self,
visitor: &mut V,
Expand All @@ -761,6 +763,7 @@ impl LogicalPlan {
/// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
#[recursive]
pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand All @@ -779,6 +782,7 @@ impl LogicalPlan {
&self,
mut f: F,
) -> Result<TreeNodeRecursion> {
#[recursive]
fn apply_with_subqueries_impl<
F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
>(
Expand Down Expand Up @@ -814,6 +818,7 @@ impl LogicalPlan {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_down_with_subqueries_impl<
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
>(
Expand All @@ -839,6 +844,7 @@ impl LogicalPlan {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_up_with_subqueries_impl<
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
>(
Expand Down Expand Up @@ -866,6 +872,7 @@ impl LogicalPlan {
mut f_down: FD,
mut f_up: FU,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_down_up_with_subqueries_impl<
FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ indexmap = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
paste = "1.0.14"
recursive = { workspace = true }
regex = { workspace = true }
regex-syntax = "0.8.0"

Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/analyzer/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::analyzer::check_plan;
use crate::utils::collect_subquery_cols;
use recursive::recursive;

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{plan_err, Result};
Expand Down Expand Up @@ -128,6 +129,7 @@ fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> {
}

// Recursively check the unsupported outer references in the sub query plan.
#[recursive]
fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> {
if !can_contain_outer_ref && inner_plan.contains_outer_reference() {
return plan_err!("Accessing outer reference columns is not allowed in the plan");
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;

use crate::optimizer::ApplyOrder;
use crate::utils::NamePreserver;
Expand Down Expand Up @@ -531,6 +532,7 @@ impl OptimizerRule for CommonSubexprEliminate {
None
}

#[recursive]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down
5 changes: 3 additions & 2 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
// under the License.

//! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available.
use std::sync::Arc;

use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;
use std::sync::Arc;

use crate::join_key_set::JoinKeySet;
use datafusion_common::tree_node::{Transformed, TreeNode};
Expand Down Expand Up @@ -80,6 +80,7 @@ impl OptimizerRule for EliminateCrossJoin {
true
}

#[recursive]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down
7 changes: 4 additions & 3 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

mod required_indices;

use std::collections::HashSet;
use std::sync::Arc;

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;
use std::collections::HashSet;
use std::sync::Arc;

use datafusion_common::{
get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column,
Expand Down Expand Up @@ -110,6 +110,7 @@ impl OptimizerRule for OptimizeProjections {
/// columns.
/// - `Ok(None)`: Signal that the given logical plan did not require any change.
/// - `Err(error)`: An error occurred during the optimization process.
#[recursive]
fn optimize_projections(
plan: LogicalPlan,
config: &dyn OptimizerConfig,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ datafusion-expr-common = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
itertools = { workspace = true }
recursive = { workspace = true }

[dev-dependencies]
datafusion-functions-aggregate = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
// under the License.

//! Utilizing exact statistics from sources to avoid scanning data
use std::sync::Arc;

use datafusion_common::config::ConfigOptions;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
Expand All @@ -27,6 +25,8 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
use datafusion_physical_plan::{expressions, ExecutionPlan};
use recursive::recursive;
use std::sync::Arc;

use crate::PhysicalOptimizerRule;

Expand All @@ -42,6 +42,7 @@ impl AggregateStatistics {
}

impl PhysicalOptimizerRule for AggregateStatistics {
#[recursive]
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
indexmap = { workspace = true }
log = { workspace = true }
recursive = { workspace = true }
regex = { workspace = true }
sqlparser = { workspace = true }
strum = { version = "0.26.1", features = ["derive"] }
Expand Down
Loading

0 comments on commit bab02b6

Please sign in to comment.