Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/partitioning_doc
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 4, 2023
2 parents 7ca8a9b + 2af326a commit 8fccbae
Show file tree
Hide file tree
Showing 53 changed files with 4,140 additions and 3,956 deletions.
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ If there are user-facing changes then we may require documentation to be updated

<!--
If there are any breaking changes to public APIs, please add the `api change` label.
-->
-->
9 changes: 3 additions & 6 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ use crate::physical_plan::{
use arrow_schema::SchemaRef;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
PhysicalSortExpr,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};

use futures::StreamExt;
use object_store::{GetResultPayload, ObjectStore};
Expand Down Expand Up @@ -106,8 +103,8 @@ impl ExecutionPlan for ArrowExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use crate::physical_plan::{

use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -101,8 +99,8 @@ impl ExecutionPlan for AvroExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ use crate::physical_plan::{
use arrow::csv;
use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use bytes::{Buf, Bytes};
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -159,8 +157,8 @@ impl ExecutionPlan for CsvExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use crate::physical_plan::{
use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use bytes::{Buf, Bytes};
use futures::{ready, stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -122,8 +120,8 @@ impl ExecutionPlan for NdJsonExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ use crate::{
use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::ArrowError;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
PhysicalExpr, PhysicalSortExpr,
EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalSortExpr,
};

use bytes::Bytes;
Expand Down Expand Up @@ -315,8 +314,8 @@ impl ExecutionPlan for ParquetExec {
.map(|ordering| ordering.as_slice())
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
Expand Down
104 changes: 61 additions & 43 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ use crate::physical_plan::{
use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::{
map_columns_before_projection, ordering_satisfy_requirement_concrete,
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr,
};
use datafusion_physical_expr::{expr_list_eq_strict_order, PhysicalExpr};
use datafusion_physical_plan::unbounded_output;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};

Expand Down Expand Up @@ -498,7 +497,7 @@ fn reorder_aggregate_keys(

if parent_required.len() != output_exprs.len()
|| !agg_exec.group_by().null_expr().is_empty()
|| expr_list_eq_strict_order(&output_exprs, parent_required)
|| physical_exprs_equal(&output_exprs, parent_required)
{
Ok(PlanWithKeyRequirements::new(agg_plan))
} else {
Expand Down Expand Up @@ -564,13 +563,11 @@ fn reorder_aggregate_keys(
Arc::new(Column::new(
name,
agg_schema.index_of(name).unwrap(),
))
as Arc<dyn PhysicalExpr>,
)) as _,
name.to_owned(),
)
})
.collect::<Vec<_>>();
let agg_schema = new_final_agg.schema();
let agg_fields = agg_schema.fields();
for (idx, field) in
agg_fields.iter().enumerate().skip(output_columns.len())
Expand Down Expand Up @@ -706,10 +703,9 @@ pub(crate) fn reorder_join_keys_to_inputs(
) {
if !new_positions.is_empty() {
let new_join_on = new_join_conditions(&left_keys, &right_keys);
let mut new_sort_options = vec![];
for idx in 0..sort_options.len() {
new_sort_options.push(sort_options[new_positions[idx]])
}
let new_sort_options = (0..sort_options.len())
.map(|idx| sort_options[new_positions[idx]])
.collect();
return Ok(Arc::new(SortMergeJoinExec::try_new(
left.clone(),
right.clone(),
Expand Down Expand Up @@ -757,39 +753,40 @@ fn try_reorder(
expected: &[Arc<dyn PhysicalExpr>],
equivalence_properties: &EquivalenceProperties,
) -> Option<(JoinKeyPairs, Vec<usize>)> {
let eq_groups = equivalence_properties.eq_group();
let mut normalized_expected = vec![];
let mut normalized_left_keys = vec![];
let mut normalized_right_keys = vec![];
if join_keys.left_keys.len() != expected.len() {
return None;
}
if expr_list_eq_strict_order(expected, &join_keys.left_keys)
|| expr_list_eq_strict_order(expected, &join_keys.right_keys)
if physical_exprs_equal(expected, &join_keys.left_keys)
|| physical_exprs_equal(expected, &join_keys.right_keys)
{
return Some((join_keys, vec![]));
} else if !equivalence_properties.classes().is_empty() {
} else if !equivalence_properties.eq_group().is_empty() {
normalized_expected = expected
.iter()
.map(|e| equivalence_properties.normalize_expr(e.clone()))
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(normalized_expected.len(), expected.len());

normalized_left_keys = join_keys
.left_keys
.iter()
.map(|e| equivalence_properties.normalize_expr(e.clone()))
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len());

normalized_right_keys = join_keys
.right_keys
.iter()
.map(|e| equivalence_properties.normalize_expr(e.clone()))
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len());

if expr_list_eq_strict_order(&normalized_expected, &normalized_left_keys)
|| expr_list_eq_strict_order(&normalized_expected, &normalized_right_keys)
if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
|| physical_exprs_equal(&normalized_expected, &normalized_right_keys)
{
return Some((join_keys, vec![]));
}
Expand Down Expand Up @@ -870,7 +867,7 @@ fn new_join_conditions(
r_key.as_any().downcast_ref::<Column>().unwrap().clone(),
)
})
.collect::<Vec<_>>()
.collect()
}

/// Updates `dist_onward` such that, to keep track of
Expand Down Expand Up @@ -935,9 +932,9 @@ fn add_roundrobin_on_top(
let should_preserve_ordering = input.output_ordering().is_some();

let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition = RepartitionExec::try_new(input, partitioning)?
.with_preserve_order(should_preserve_ordering);
let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
let repartition = RepartitionExec::try_new(input, partitioning)?;
let new_plan = Arc::new(repartition.with_preserve_order(should_preserve_ordering))
as Arc<dyn ExecutionPlan>;

// update distribution onward with new operator
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Expand Down Expand Up @@ -1011,9 +1008,9 @@ fn add_hash_on_top(
input
};
let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(new_plan, partitioning)?
.with_preserve_order(should_preserve_ordering);
new_plan = Arc::new(repartition) as _;
let repartition = RepartitionExec::try_new(new_plan, partitioning)?;
new_plan =
Arc::new(repartition.with_preserve_order(should_preserve_ordering)) as _;

// update distribution onward with new operator
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Expand Down Expand Up @@ -1302,16 +1299,12 @@ fn ensure_distribution(

// There is an ordering requirement of the operator:
if let Some(required_input_ordering) = required_input_ordering {
let existing_ordering = child.output_ordering().unwrap_or(&[]);
// Either:
// - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
// - using order preserving variant is not desirable.
let ordering_satisfied = ordering_satisfy_requirement_concrete(
existing_ordering,
required_input_ordering,
|| child.equivalence_properties(),
|| child.ordering_equivalence_properties(),
);
let ordering_satisfied = child
.equivalence_properties()
.ordering_satisfy_requirement(required_input_ordering);
if !ordering_satisfied || !order_preserving_variants_desirable {
replace_order_preserving_variants(&mut child, dist_onward)?;
// If ordering requirements were satisfied before repartitioning,
Expand Down Expand Up @@ -3763,14 +3756,14 @@ mod tests {
fn repartition_transitively_past_sort_with_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_exec(sort_key, filter_exec(parquet_exec()), false);

let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"SortExec: expr=[c@2 ASC]",
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
Expand All @@ -3780,7 +3773,7 @@ mod tests {
assert_optimized!(expected, plan.clone(), true);

let expected_first_sort_enforcement = &[
"SortExec: expr=[c@2 ASC]",
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
// Expect repartition on the input of the filter (as it can benefit from additional parallelism)
Expand Down Expand Up @@ -4357,29 +4350,54 @@ mod tests {
fn do_not_preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));

let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"SortExec: expr=[c@2 ASC]",
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];

assert_optimized!(expected, physical_plan.clone(), true);

let expected = &[
"SortExec: expr=[c@2 ASC]",
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, physical_plan, false);

Ok(())
}

#[test]
fn no_need_for_sort_after_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));

let expected = &[
// After CoalescePartitionsExec c is still constant. Hence c@2 ASC ordering is already satisfied.
"CoalescePartitionsExec",
// Since after this stage c is constant. c@2 ASC ordering is already satisfied.
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];

assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);

Ok(())
Expand Down
Loading

0 comments on commit 8fccbae

Please sign in to comment.