From 92accf93c46ebf2266280ca7574ff2246bbe1165 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 18 May 2024 09:29:42 +0800 Subject: [PATCH] limit distinct and fmt Signed-off-by: jayzhan211 --- .../aggregate_statistics.rs | 4 +- .../combine_partial_final_agg.rs | 24 ++- .../limited_distinct_aggregation.rs | 138 +++++++++--------- 3 files changed, 82 insertions(+), 84 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index eec8ae82c3a9..bea688e376e0 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -276,7 +276,7 @@ fn take_optimizable_max( #[cfg(test)] pub(crate) mod tests { use super::*; - + use crate::logical_expr::Operator; use crate::physical_plan::aggregates::PhysicalGroupBy; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -292,8 +292,8 @@ pub(crate) mod tests { use datafusion_functions_aggregate::count::count_udaf; use datafusion_physical_expr::expressions::cast; use datafusion_physical_expr::PhysicalExpr; - use datafusion_physical_plan::aggregates::AggregateMode; use datafusion_physical_expr_common::aggregate::create_aggregate_expr; + use datafusion_physical_plan::aggregates::AggregateMode; /// Mock data using a MemoryExec which has an exact count statistic fn mock_data() -> Result> { diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index d3dd4cbe55c6..e3375277a3e5 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -311,7 +311,11 @@ mod tests { } // Return appropriate expr depending if COUNT is for col or table (*) - fn count_expr(expr: Arc, name: &str, schema: &Schema) -> Arc { + fn count_expr( + expr: Arc, + name: &str, + schema: &Schema, + ) -> Arc { create_aggregate_expr( &count_udaf(), &[expr], @@ -328,10 +332,8 @@ mod tests { #[test] fn aggregations_not_combined() -> Result<()> { let schema = schema(); - - let aggr_expr = vec![ - count_expr(lit(1i8), "COUNT(1)", &schema), - ]; + + let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)]; let plan = final_aggregate_exec( repartition_exec(partial_aggregate_exec( @@ -351,12 +353,8 @@ mod tests { ]; assert_optimized!(expected, plan); - let aggr_expr1 = vec![ - count_expr(lit(1i8), "COUNT(1)", &schema), - ]; - let aggr_expr2 = vec![ - count_expr(lit(1i8), "COUNT(2)", &schema), - ]; + let aggr_expr1 = vec![count_expr(lit(1i8), "COUNT(1)", &schema)]; + let aggr_expr2 = vec![count_expr(lit(1i8), "COUNT(2)", &schema)]; let plan = final_aggregate_exec( partial_aggregate_exec( @@ -382,9 +380,7 @@ mod tests { #[test] fn aggregations_combined() -> Result<()> { let schema = schema(); - let aggr_expr = vec![ - count_expr(lit(1i8), "COUNT(1)", &schema), - ]; + let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)]; let plan = final_aggregate_exec( partial_aggregate_exec( diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index c32709718e50..7b9858239cae 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -193,6 +193,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { mod tests { use super::*; + use crate::physical_optimizer::aggregate_statistics::tests::TestAggregate; use crate::physical_optimizer::enforce_distribution::tests::{ parquet_exec_with_sort, schema, trim_plan_display, }; @@ -208,7 +209,8 @@ mod tests { use arrow::util::pretty::pretty_format_batches; use arrow_schema::SchemaRef; use datafusion_execution::config::SessionConfig; - use datafusion_physical_expr::expressions::col; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{cast, col}; use datafusion_physical_expr::{expressions, PhysicalExpr, PhysicalSortExpr}; use datafusion_physical_plan::aggregates::AggregateMode; use datafusion_physical_plan::displayable; @@ -505,73 +507,73 @@ mod tests { Ok(()) } - // #[test] - // fn test_has_aggregate_expression() -> Result<()> { - // let source = mock_data()?; - // let schema = source.schema(); - // let agg = TestAggregate::new_count_star(); - - // // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec - // let single_agg = AggregateExec::try_new( - // AggregateMode::Single, - // build_group_by(&schema.clone(), vec!["a".to_string()]), - // vec![agg.count_expr()], /* aggr_expr */ - // vec![None], /* filter_expr */ - // source, /* input */ - // schema.clone(), /* input_schema */ - // )?; - // let limit_exec = LocalLimitExec::new( - // Arc::new(single_agg), - // 10, // fetch - // ); - // // expected not to push the limit to the AggregateExec - // let expected = [ - // "LocalLimitExec: fetch=10", - // "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - // "MemoryExec: partitions=1, partition_sizes=[1]", - // ]; - // let plan: Arc = Arc::new(limit_exec); - // assert_plan_matches_expected(&plan, &expected)?; - // Ok(()) - // } - - // #[test] - // fn test_has_filter() -> Result<()> { - // let source = mock_data()?; - // let schema = source.schema(); - - // // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec - // // the `a > 1` filter is applied in the AggregateExec - // let filter_expr = Some(expressions::binary( - // expressions::col("a", &schema)?, - // Operator::Gt, - // cast(expressions::lit(1u32), &schema, DataType::Int32)?, - // &schema, - // )?); - // let agg = TestAggregate::new_count_star(); - // let single_agg = AggregateExec::try_new( - // AggregateMode::Single, - // build_group_by(&schema.clone(), vec!["a".to_string()]), - // vec![agg.count_expr()], /* aggr_expr */ - // vec![filter_expr], /* filter_expr */ - // source, /* input */ - // schema.clone(), /* input_schema */ - // )?; - // let limit_exec = LocalLimitExec::new( - // Arc::new(single_agg), - // 10, // fetch - // ); - // // expected not to push the limit to the AggregateExec - // // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out - // let expected = [ - // "LocalLimitExec: fetch=10", - // "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - // "MemoryExec: partitions=1, partition_sizes=[1]", - // ]; - // let plan: Arc = Arc::new(limit_exec); - // assert_plan_matches_expected(&plan, &expected)?; - // Ok(()) - // } + #[test] + fn test_has_aggregate_expression() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + let agg = TestAggregate::new_count_star(); + + // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![agg.count_expr(&schema)], /* aggr_expr */ + vec![None], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 10, // fetch + ); + // expected not to push the limit to the AggregateExec + let expected = [ + "LocalLimitExec: fetch=10", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + Ok(()) + } + + #[test] + fn test_has_filter() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + + // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec + // the `a > 1` filter is applied in the AggregateExec + let filter_expr = Some(expressions::binary( + expressions::col("a", &schema)?, + Operator::Gt, + cast(expressions::lit(1u32), &schema, DataType::Int32)?, + &schema, + )?); + let agg = TestAggregate::new_count_star(); + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![agg.count_expr(&schema)], /* aggr_expr */ + vec![filter_expr], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 10, // fetch + ); + // expected not to push the limit to the AggregateExec + // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out + let expected = [ + "LocalLimitExec: fetch=10", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + Ok(()) + } #[test] fn test_has_order_by() -> Result<()> {