Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 31, 2024
1 parent 5f41292 commit 1d0ba80
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 38 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/physical_plan/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
use datafusion_physical_expr_common::sort_expr::{LexOrdering};
use datafusion_physical_expr_common::sort_expr::LexOrdering;

/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
/// The min/max values are ordered by [`Self::sort_order`].
Expand Down Expand Up @@ -67,7 +67,7 @@ impl MinMaxStatistics {

pub fn new_from_files<'a>(
projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projected_schema: &SchemaRef, // Projected schema
projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns)
files: impl IntoIterator<Item = &'a PartitionedFile>,
) -> Result<Self> {
Expand Down
10 changes: 2 additions & 8 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,10 +936,7 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext {

let new_plan = if let Some(ordering) = ordering {
// should_preserve_ordering
Arc::new(SortPreservingMergeExec::new(
ordering,
input.plan.clone(),
)) as _
Arc::new(SortPreservingMergeExec::new(ordering, input.plan.clone())) as _
} else {
// no ordering to preserve
Arc::new(CoalescePartitionsExec::new(input.plan.clone())) as _
Expand Down Expand Up @@ -1421,7 +1418,6 @@ pub(crate) mod tests {
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr,
PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::PlanProperties;
Expand Down Expand Up @@ -1494,9 +1490,7 @@ pub(crate) mod tests {
if self.expr.is_empty() {
vec![None]
} else {
vec![Some(PhysicalSortRequirement::from_sort_exprs(
self.expr.iter(),
))]
vec![Some(LexRequirement::from(self.expr.clone()))]
}
}

Expand Down
13 changes: 9 additions & 4 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode};

use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Partitioning};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
Expand Down Expand Up @@ -221,7 +221,7 @@ fn replace_with_partial_sort(
// here we're trying to find the common prefix for sorted columns that is required for the
// sort and already satisfied by the given ordering
let child_eq_properties = child.equivalence_properties();
let sort_req = LexRequirement::from(sort_plan.expr());
let sort_req = LexRequirement::from(sort_plan.expr().clone());

let mut common_prefix_length = 0;
while child_eq_properties
Expand Down Expand Up @@ -276,6 +276,7 @@ fn parallelize_sorts(
// Take the initial sort expressions and requirements
let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
let sort_reqs = LexRequirement::from(sort_exprs.clone());
let sort_exprs = sort_exprs.clone();

// If there is a connection between a `CoalescePartitionsExec` and a
// global sort that satisfy the requirements (i.e. intermediate
Expand All @@ -289,7 +290,7 @@ fn parallelize_sorts(

requirements = add_sort_above_with_check(requirements, sort_reqs, fetch);

let spm = SortPreservingMergeExec::new(sort_exprs.clone(), requirements.plan.clone());
let spm = SortPreservingMergeExec::new(sort_exprs, requirements.plan.clone());
Ok(Transformed::yes(
PlanWithCorrespondingCoalescePartitions::new(
Arc::new(spm.with_fetch(fetch)),
Expand Down Expand Up @@ -391,7 +392,11 @@ fn analyze_immediate_sort_removal(
let sort_input = sort_exec.input();
// If this sort is unnecessary, we should remove it:
if sort_input.equivalence_properties().ordering_satisfy(
sort_exec.properties().output_ordering().unwrap_or_default(),
&sort_exec
.properties()
.output_ordering()
.cloned()
.unwrap_or_default(),
) {
node.plan = if !sort_exec.preserve_partitioning()
&& sort_input.output_partitioning().partition_count() > 1
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ fn try_swapping_with_sort(
}

let mut updated_exprs = LexOrdering::default();
for sort in sort.expr() {
for sort in sort.expr().iter() {
let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)? else {
return Ok(None);
};
Expand Down Expand Up @@ -498,7 +498,7 @@ fn try_swapping_with_sort_preserving_merge(
}

let mut updated_exprs = LexOrdering::default();
for sort in spm.expr() {
for sort in spm.expr().iter() {
let Some(updated_expr) = update_expr(&sort.expr, projection.expr(), false)?
else {
return Ok(None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;

/// For a given `plan`, this object carries the information one needs from its
Expand Down Expand Up @@ -129,11 +128,10 @@ fn plan_with_order_preserving_variants(
return Ok(sort_input);
} else if is_coalesce_partitions(&sort_input.plan) && is_spm_better {
let child = &sort_input.children[0].plan;
if let Some(ordering) = child.output_ordering().map(Vec::from) {
if let Some(ordering) = child.output_ordering() {
// When the input of a `CoalescePartitionsExec` has an ordering,
// replace it with a `SortPreservingMergeExec` if appropriate:
let spm =
SortPreservingMergeExec::new(LexOrdering::new(ordering), child.clone());
let spm = SortPreservingMergeExec::new(ordering.clone(), child.clone());
sort_input.plan = Arc::new(spm) as _;
sort_input.children[0].data = true;
return Ok(sort_input);
Expand Down Expand Up @@ -257,7 +255,13 @@ pub(crate) fn replace_with_order_preserving_variants(
if alternate_plan
.plan
.equivalence_properties()
.ordering_satisfy(requirements.plan.output_ordering().unwrap_or_default())
.ordering_satisfy(
&requirements
.plan
.output_ordering()
.cloned()
.unwrap_or_default(),
)
{
for child in alternate_plan.children.iter_mut() {
child.data = false;
Expand Down
9 changes: 2 additions & 7 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement};
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement,
};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};

use hashbrown::HashSet;

Expand Down Expand Up @@ -86,10 +84,7 @@ fn pushdown_sorts_helper(
mut requirements: SortPushDown,
) -> Result<Transformed<SortPushDown>> {
let plan = &requirements.plan;
let parent_reqs = requirements
.data
.ordering_requirement
.unwrap_or_default();
let parent_reqs = requirements.data.ordering_requirement.unwrap_or_default();
let satisfy_parent = plan
.equivalence_properties()
.ordering_satisfy_requirement(&parent_reqs);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub fn bounded_window_exec(
"count".to_owned(),
&[col(col_name, &schema).unwrap()],
&[],
sort_exprs.as_ref(),
&sort_exprs,
Arc::new(WindowFrame::new(Some(false))),
schema.as_ref(),
false,
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::concat_slices;
use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
Expand Down Expand Up @@ -138,12 +139,10 @@ fn try_convert_aggregate_if_better(
aggr_exprs
.into_iter()
.map(|aggr_expr| {
let aggr_sort_exprs = &aggr_expr.order_bys().unwrap_or_default();
let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
let aggr_sort_reqs =
PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter());
let reverse_aggr_req =
PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_sort_exprs.inner);
let aggr_sort_exprs = aggr_expr.order_bys().cloned().unwrap_or_default();
let reverse_aggr_sort_exprs = reverse_order_bys(&aggr_sort_exprs);
let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs);
let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs);

// If the aggregate expression benefits from input ordering, and
// there is an actual ordering enabling this, try to update the
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::tree_node::PlanContext;

Expand All @@ -38,7 +39,7 @@ pub fn add_sort_above<T: Clone + Default>(
sort_requirements: LexRequirement,
fetch: Option<usize>,
) -> PlanContext<T> {
let mut sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirements);
let mut sort_expr = LexOrdering::from(sort_requirements);
sort_expr.inner.retain(|sort_expr| {
!node
.plan
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ pub fn create_window_expr_with_name(
name,
&physical_args,
&partition_by,
order_by.as_ref(),
&order_by,
window_frame,
physical_schema,
ignore_nulls,
Expand Down

0 comments on commit 1d0ba80

Please sign in to comment.