Skip to content

Commit

Permalink
limit distinct and fmt
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed May 18, 2024
1 parent ef06589 commit 92accf9
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ fn take_optimizable_max(
#[cfg(test)]
pub(crate) mod tests {
use super::*;

use crate::logical_expr::Operator;
use crate::physical_plan::aggregates::PhysicalGroupBy;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -292,8 +292,8 @@ pub(crate) mod tests {
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::expressions::cast;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::aggregates::AggregateMode;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use datafusion_physical_plan::aggregates::AggregateMode;

/// Mock data using a MemoryExec which has an exact count statistic
fn mock_data() -> Result<Arc<MemoryExec>> {
Expand Down
24 changes: 10 additions & 14 deletions datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,11 @@ mod tests {
}

// Return appropriate expr depending if COUNT is for col or table (*)
fn count_expr(expr: Arc<dyn PhysicalExpr>, name: &str, schema: &Schema) -> Arc<dyn AggregateExpr> {
fn count_expr(
expr: Arc<dyn PhysicalExpr>,
name: &str,
schema: &Schema,
) -> Arc<dyn AggregateExpr> {
create_aggregate_expr(
&count_udaf(),
&[expr],
Expand All @@ -328,10 +332,8 @@ mod tests {
#[test]
fn aggregations_not_combined() -> Result<()> {
let schema = schema();

let aggr_expr = vec![
count_expr(lit(1i8), "COUNT(1)", &schema),
];

let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];

let plan = final_aggregate_exec(
repartition_exec(partial_aggregate_exec(
Expand All @@ -351,12 +353,8 @@ mod tests {
];
assert_optimized!(expected, plan);

let aggr_expr1 = vec![
count_expr(lit(1i8), "COUNT(1)", &schema),
];
let aggr_expr2 = vec![
count_expr(lit(1i8), "COUNT(2)", &schema),
];
let aggr_expr1 = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];
let aggr_expr2 = vec![count_expr(lit(1i8), "COUNT(2)", &schema)];

let plan = final_aggregate_exec(
partial_aggregate_exec(
Expand All @@ -382,9 +380,7 @@ mod tests {
#[test]
fn aggregations_combined() -> Result<()> {
let schema = schema();
let aggr_expr = vec![
count_expr(lit(1i8), "COUNT(1)", &schema),
];
let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];

let plan = final_aggregate_exec(
partial_aggregate_exec(
Expand Down
138 changes: 70 additions & 68 deletions datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
mod tests {

use super::*;
use crate::physical_optimizer::aggregate_statistics::tests::TestAggregate;
use crate::physical_optimizer::enforce_distribution::tests::{
parquet_exec_with_sort, schema, trim_plan_display,
};
Expand All @@ -208,7 +209,8 @@ mod tests {
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use datafusion_execution::config::SessionConfig;
use datafusion_physical_expr::expressions::col;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{cast, col};
use datafusion_physical_expr::{expressions, PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::aggregates::AggregateMode;
use datafusion_physical_plan::displayable;
Expand Down Expand Up @@ -505,73 +507,73 @@ mod tests {
Ok(())
}

// #[test]
// fn test_has_aggregate_expression() -> Result<()> {
// let source = mock_data()?;
// let schema = source.schema();
// let agg = TestAggregate::new_count_star();

// // `SELECT <aggregate with no expressions> FROM MemoryExec LIMIT 10;`, Single AggregateExec
// let single_agg = AggregateExec::try_new(
// AggregateMode::Single,
// build_group_by(&schema.clone(), vec!["a".to_string()]),
// vec![agg.count_expr()], /* aggr_expr */
// vec![None], /* filter_expr */
// source, /* input */
// schema.clone(), /* input_schema */
// )?;
// let limit_exec = LocalLimitExec::new(
// Arc::new(single_agg),
// 10, // fetch
// );
// // expected not to push the limit to the AggregateExec
// let expected = [
// "LocalLimitExec: fetch=10",
// "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]",
// "MemoryExec: partitions=1, partition_sizes=[1]",
// ];
// let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
// assert_plan_matches_expected(&plan, &expected)?;
// Ok(())
// }

// #[test]
// fn test_has_filter() -> Result<()> {
// let source = mock_data()?;
// let schema = source.schema();

// // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec
// // the `a > 1` filter is applied in the AggregateExec
// let filter_expr = Some(expressions::binary(
// expressions::col("a", &schema)?,
// Operator::Gt,
// cast(expressions::lit(1u32), &schema, DataType::Int32)?,
// &schema,
// )?);
// let agg = TestAggregate::new_count_star();
// let single_agg = AggregateExec::try_new(
// AggregateMode::Single,
// build_group_by(&schema.clone(), vec!["a".to_string()]),
// vec![agg.count_expr()], /* aggr_expr */
// vec![filter_expr], /* filter_expr */
// source, /* input */
// schema.clone(), /* input_schema */
// )?;
// let limit_exec = LocalLimitExec::new(
// Arc::new(single_agg),
// 10, // fetch
// );
// // expected not to push the limit to the AggregateExec
// // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out
// let expected = [
// "LocalLimitExec: fetch=10",
// "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]",
// "MemoryExec: partitions=1, partition_sizes=[1]",
// ];
// let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
// assert_plan_matches_expected(&plan, &expected)?;
// Ok(())
// }
#[test]
fn test_has_aggregate_expression() -> Result<()> {
let source = mock_data()?;
let schema = source.schema();
let agg = TestAggregate::new_count_star();

// `SELECT <aggregate with no expressions> FROM MemoryExec LIMIT 10;`, Single AggregateExec
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr(&schema)], /* aggr_expr */
vec![None], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
10, // fetch
);
// expected not to push the limit to the AggregateExec
let expected = [
"LocalLimitExec: fetch=10",
"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]",
"MemoryExec: partitions=1, partition_sizes=[1]",
];
let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
assert_plan_matches_expected(&plan, &expected)?;
Ok(())
}

#[test]
fn test_has_filter() -> Result<()> {
let source = mock_data()?;
let schema = source.schema();

// `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec
// the `a > 1` filter is applied in the AggregateExec
let filter_expr = Some(expressions::binary(
expressions::col("a", &schema)?,
Operator::Gt,
cast(expressions::lit(1u32), &schema, DataType::Int32)?,
&schema,
)?);
let agg = TestAggregate::new_count_star();
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr(&schema)], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
10, // fetch
);
// expected not to push the limit to the AggregateExec
// TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out
let expected = [
"LocalLimitExec: fetch=10",
"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]",
"MemoryExec: partitions=1, partition_sizes=[1]",
];
let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
assert_plan_matches_expected(&plan, &expected)?;
Ok(())
}

#[test]
fn test_has_order_by() -> Result<()> {
Expand Down

0 comments on commit 92accf9

Please sign in to comment.