From 1d0ba803f9216576c2ce0aec171028d44a3d068f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 31 Oct 2024 13:09:51 -0400 Subject: [PATCH] more --- .../src/datasource/physical_plan/statistics.rs | 4 ++-- .../src/physical_optimizer/enforce_distribution.rs | 10 ++-------- .../core/src/physical_optimizer/enforce_sorting.rs | 13 +++++++++---- .../src/physical_optimizer/projection_pushdown.rs | 4 ++-- .../replace_with_order_preserving_variants.rs | 14 +++++++++----- .../core/src/physical_optimizer/sort_pushdown.rs | 9 ++------- .../core/src/physical_optimizer/test_utils.rs | 2 +- .../src/physical_optimizer/update_aggr_exprs.rs | 11 +++++------ datafusion/core/src/physical_optimizer/utils.rs | 5 +++-- datafusion/core/src/physical_planner.rs | 2 +- 10 files changed, 36 insertions(+), 38 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs b/datafusion/core/src/datasource/physical_plan/statistics.rs index 8c6e8409101c..8e6cfd799fa2 100644 --- a/datafusion/core/src/datasource/physical_plan/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/statistics.rs @@ -36,7 +36,7 @@ use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion_common::{DataFusionError, Result}; use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; -use datafusion_physical_expr_common::sort_expr::{LexOrdering}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; /// A normalized representation of file min/max statistics that allows for efficient sorting & comparison. /// The min/max values are ordered by [`Self::sort_order`]. @@ -67,7 +67,7 @@ impl MinMaxStatistics { pub fn new_from_files<'a>( projected_sort_order: &LexOrdering, // Sort order with respect to projected schema - projected_schema: &SchemaRef, // Projected schema + projected_schema: &SchemaRef, // Projected schema projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns) files: impl IntoIterator, ) -> Result { diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 7084fed027ea..4deb042e84a3 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -936,10 +936,7 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext { let new_plan = if let Some(ordering) = ordering { // should_preserve_ordering - Arc::new(SortPreservingMergeExec::new( - ordering, - input.plan.clone(), - )) as _ + Arc::new(SortPreservingMergeExec::new(ordering, input.plan.clone())) as _ } else { // no ordering to preserve Arc::new(CoalescePartitionsExec::new(input.plan.clone())) as _ @@ -1421,7 +1418,6 @@ pub(crate) mod tests { use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::{ expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr, - PhysicalSortRequirement, }; use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_plan::PlanProperties; @@ -1494,9 +1490,7 @@ pub(crate) mod tests { if self.expr.is_empty() { vec![None] } else { - vec![Some(PhysicalSortRequirement::from_sort_exprs( - self.expr.iter(), - ))] + vec![Some(LexRequirement::from(self.expr.clone()))] } } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 23d784419f17..770e89658386 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -61,7 +61,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode}; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_expr::{Partitioning}; +use datafusion_physical_expr::Partitioning; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; @@ -221,7 +221,7 @@ fn replace_with_partial_sort( // here we're trying to find the common prefix for sorted columns that is required for the // sort and already satisfied by the given ordering let child_eq_properties = child.equivalence_properties(); - let sort_req = LexRequirement::from(sort_plan.expr()); + let sort_req = LexRequirement::from(sort_plan.expr().clone()); let mut common_prefix_length = 0; while child_eq_properties @@ -276,6 +276,7 @@ fn parallelize_sorts( // Take the initial sort expressions and requirements let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?; let sort_reqs = LexRequirement::from(sort_exprs.clone()); + let sort_exprs = sort_exprs.clone(); // If there is a connection between a `CoalescePartitionsExec` and a // global sort that satisfy the requirements (i.e. intermediate @@ -289,7 +290,7 @@ fn parallelize_sorts( requirements = add_sort_above_with_check(requirements, sort_reqs, fetch); - let spm = SortPreservingMergeExec::new(sort_exprs.clone(), requirements.plan.clone()); + let spm = SortPreservingMergeExec::new(sort_exprs, requirements.plan.clone()); Ok(Transformed::yes( PlanWithCorrespondingCoalescePartitions::new( Arc::new(spm.with_fetch(fetch)), @@ -391,7 +392,11 @@ fn analyze_immediate_sort_removal( let sort_input = sort_exec.input(); // If this sort is unnecessary, we should remove it: if sort_input.equivalence_properties().ordering_satisfy( - sort_exec.properties().output_ordering().unwrap_or_default(), + &sort_exec + .properties() + .output_ordering() + .cloned() + .unwrap_or_default(), ) { node.plan = if !sort_exec.preserve_partitioning() && sort_input.output_partitioning().partition_count() > 1 diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 5aecf036ce18..584dd7229fab 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -468,7 +468,7 @@ fn try_swapping_with_sort( } let mut updated_exprs = LexOrdering::default(); - for sort in sort.expr() { + for sort in sort.expr().iter() { let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)? else { return Ok(None); }; @@ -498,7 +498,7 @@ fn try_swapping_with_sort_preserving_merge( } let mut updated_exprs = LexOrdering::default(); - for sort in spm.expr() { + for sort in spm.expr().iter() { let Some(updated_expr) = update_expr(&sort.expr, projection.expr(), false)? else { return Ok(None); diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 930ce52e6fa2..01168b19d247 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -33,7 +33,6 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::tree_node::PlanContext; use datafusion_physical_plan::ExecutionPlanProperties; -use datafusion_physical_expr_common::sort_expr::LexOrdering; use itertools::izip; /// For a given `plan`, this object carries the information one needs from its @@ -129,11 +128,10 @@ fn plan_with_order_preserving_variants( return Ok(sort_input); } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better { let child = &sort_input.children[0].plan; - if let Some(ordering) = child.output_ordering().map(Vec::from) { + if let Some(ordering) = child.output_ordering() { // When the input of a `CoalescePartitionsExec` has an ordering, // replace it with a `SortPreservingMergeExec` if appropriate: - let spm = - SortPreservingMergeExec::new(LexOrdering::new(ordering), child.clone()); + let spm = SortPreservingMergeExec::new(ordering.clone(), child.clone()); sort_input.plan = Arc::new(spm) as _; sort_input.children[0].data = true; return Ok(sort_input); @@ -257,7 +255,13 @@ pub(crate) fn replace_with_order_preserving_variants( if alternate_plan .plan .equivalence_properties() - .ordering_satisfy(requirements.plan.output_ordering().unwrap_or_default()) + .ordering_satisfy( + &requirements + .plan + .output_ordering() + .cloned() + .unwrap_or_default(), + ) { for child in alternate_plan.children.iter_mut() { child.data = false; diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 2294a2be4779..e137ac101490 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -37,9 +37,7 @@ use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement}; -use datafusion_physical_expr_common::sort_expr::{ - LexOrdering, LexRequirement, -}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use hashbrown::HashSet; @@ -86,10 +84,7 @@ fn pushdown_sorts_helper( mut requirements: SortPushDown, ) -> Result> { let plan = &requirements.plan; - let parent_reqs = requirements - .data - .ordering_requirement - .unwrap_or_default(); + let parent_reqs = requirements.data.ordering_requirement.unwrap_or_default(); let satisfy_parent = plan .equivalence_properties() .ordering_satisfy_requirement(&parent_reqs); diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index bdf16300ea87..403936d3f355 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -253,7 +253,7 @@ pub fn bounded_window_exec( "count".to_owned(), &[col(col_name, &schema).unwrap()], &[], - sort_exprs.as_ref(), + &sort_exprs, Arc::new(WindowFrame::new(Some(false))), schema.as_ref(), false, diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index d85278556cc4..de56e231630c 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -27,6 +27,7 @@ use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement, }; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::concat_slices; use datafusion_physical_plan::windows::get_ordered_partition_by_indices; @@ -138,12 +139,10 @@ fn try_convert_aggregate_if_better( aggr_exprs .into_iter() .map(|aggr_expr| { - let aggr_sort_exprs = &aggr_expr.order_bys().unwrap_or_default(); - let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs); - let aggr_sort_reqs = - PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter()); - let reverse_aggr_req = - PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_sort_exprs.inner); + let aggr_sort_exprs = aggr_expr.order_bys().cloned().unwrap_or_default(); + let reverse_aggr_sort_exprs = reverse_order_bys(&aggr_sort_exprs); + let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs); + let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs); // If the aggregate expression benefits from input ordering, and // there is an actual ordering enabling this, try to update the diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 8007d8cc7f00..9acd3f67c272 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -27,7 +27,8 @@ use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; -use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement}; +use datafusion_physical_expr::LexRequirement; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::tree_node::PlanContext; @@ -38,7 +39,7 @@ pub fn add_sort_above( sort_requirements: LexRequirement, fetch: Option, ) -> PlanContext { - let mut sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirements); + let mut sort_expr = LexOrdering::from(sort_requirements); sort_expr.inner.retain(|sort_expr| { !node .plan diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b7784d43540f..0e42b66b9f2c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1516,7 +1516,7 @@ pub fn create_window_expr_with_name( name, &physical_args, &partition_by, - order_by.as_ref(), + &order_by, window_frame, physical_schema, ignore_nulls,