Skip to content

Commit

Permalink
Convert LexOrdering type to struct. (apache#13146)
Browse files Browse the repository at this point in the history
* Conversion types for LexOrdering and LexOrderingRef to structs.

* Format and fix type errors. Adjusted expected output when using `LexOrdering`.

* Updated usage of `FromIterator` and removed `empty()` in favor of `default()`.

* Adjusted chained `map` and `flatten` calls to `flat_map`, and swapped `unwrap_or` to `unwrap_or_default`.

* Adjusted slt files to include a space after commas, when relating to LexOrdering and LexOrderingRef.

* Removed unnecessary path prefixes in `sort_expr`.

* Fixed tpch slt files.

* Removed LexOrderingRef struct.

* Removed dereferences to `LexOrderingRef` left over from the struct removal.

* Removed remaining usage of the raw `LexOrderingRef` type.

* Formatting.

* Apply suggestions from code review, along with formatting.

* Merged with main.

* Merged with main.

---------

Co-authored-by: nglime <[email protected]>
  • Loading branch information
ngli-me and nglime authored Nov 1, 2024
1 parent 9ff0800 commit 752561a
Show file tree
Hide file tree
Showing 103 changed files with 1,238 additions and 1,022 deletions.
6 changes: 3 additions & 3 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -170,13 +170,13 @@ impl RunOpt {

async fn exec_sort(
ctx: &SessionContext,
expr: &[PhysicalSortExpr],
expr: LexOrderingRef<'_>,
test_file: &TestParquetFile,
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(ctx, None).await?;
let exec = Arc::new(SortExec::new(expr.to_owned(), scan));
let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
let elapsed = start.elapsed();
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/benches/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion::physical_plan::{
memory::MemoryExec,
};
use datafusion::prelude::SessionContext;
use datafusion_physical_expr_common::sort_expr::LexOrdering;

// Initialise the operator using the provided record batches and the sort key
// as inputs. All record batches must have the same schema.
Expand All @@ -52,7 +53,7 @@ fn sort_preserving_merge_operator(
expr: col(name, &schema).unwrap(),
options: Default::default(),
})
.collect::<Vec<_>>();
.collect::<LexOrdering>();

let exec = MemoryExec::try_new(
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};

/// Benchmarks for SortPreservingMerge stream
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use futures::StreamExt;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -257,7 +258,7 @@ impl BenchCase {
}

/// Make sort exprs for each column in `schema`
fn make_sort_exprs(schema: &Schema) -> Vec<PhysicalSortExpr> {
fn make_sort_exprs(schema: &Schema) -> LexOrdering {
schema
.fields()
.iter()
Expand Down
37 changes: 21 additions & 16 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1283,29 +1283,34 @@ mod tests {
// ok with one column
(
vec![vec![col("string_col").sort(true, false)]],
Ok(vec![vec![PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}]])
Ok(vec![LexOrdering {
inner: vec![PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}],
}
])
),
// ok with two columns, different options
(
vec![vec![
col("string_col").sort(true, false),
col("int_col").sort(false, true),
]],
Ok(vec![vec![
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),

PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
.desc()
.nulls_first()
]])
Ok(vec![LexOrdering {
inner: vec![
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
.desc()
.nulls_first()
],
}
])
),
];

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn create_ordering(

for exprs in sort_order {
// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
let mut sort_exprs = LexOrdering::default();
for sort in exprs {
match &sort.expr {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr_common::sort_expr::LexOrderingRef;

use log::warn;

Expand Down Expand Up @@ -307,7 +308,7 @@ impl FileScanConfig {
pub fn split_groups_by_statistics(
table_schema: &SchemaRef,
file_groups: &[Vec<PartitionedFile>],
sort_order: &[PhysicalSortExpr],
sort_order: LexOrderingRef,
) -> Result<Vec<Vec<PartitionedFile>>> {
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
// First Fit:
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use crate::{
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;

use futures::StreamExt;
use log::debug;
Expand Down Expand Up @@ -328,11 +329,11 @@ impl From<ObjectMeta> for FileMeta {
fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
) -> Vec<Vec<PhysicalSortExpr>> {
) -> Vec<LexOrdering> {
let mut all_orderings = vec![];
for output_ordering in &base_config.output_ordering {
let mut new_ordering = vec![];
for PhysicalSortExpr { expr, options } in output_ordering {
let mut new_ordering = LexOrdering::default();
for PhysicalSortExpr { expr, options } in output_ordering.iter() {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
let name = col.name();
if let Some((idx, _)) = projected_schema.column_with_name(name) {
Expand Down
19 changes: 10 additions & 9 deletions datafusion/core/src/datasource/physical_plan/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

use std::sync::Arc;

use crate::datasource::listing::PartitionedFile;

use arrow::{
compute::SortColumn,
row::{Row, Rows},
Expand All @@ -34,22 +36,21 @@ use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};

use crate::datasource::listing::PartitionedFile;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};

/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
/// The min/max values are ordered by [`Self::sort_order`].
/// Furthermore, any columns that are reversed in the sort order have their min/max values swapped.
pub(crate) struct MinMaxStatistics {
min_by_sort_order: Rows,
max_by_sort_order: Rows,
sort_order: Vec<PhysicalSortExpr>,
sort_order: LexOrdering,
}

impl MinMaxStatistics {
/// Sort order used to sort the statistics
#[allow(unused)]
pub fn sort_order(&self) -> &[PhysicalSortExpr] {
pub fn sort_order(&self) -> LexOrderingRef {
&self.sort_order
}

Expand All @@ -65,8 +66,8 @@ impl MinMaxStatistics {
}

pub fn new_from_files<'a>(
projected_sort_order: &[PhysicalSortExpr], // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projected_sort_order: LexOrderingRef, // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns)
files: impl IntoIterator<Item = &'a PartitionedFile>,
) -> Result<Self> {
Expand Down Expand Up @@ -166,7 +167,7 @@ impl MinMaxStatistics {
}

pub fn new(
sort_order: &[PhysicalSortExpr],
sort_order: LexOrderingRef,
schema: &SchemaRef,
min_values: RecordBatch,
max_values: RecordBatch,
Expand Down Expand Up @@ -256,7 +257,7 @@ impl MinMaxStatistics {
Ok(Self {
min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
sort_order: sort_order.to_vec(),
sort_order: LexOrdering::from_ref(sort_order),
})
}

Expand All @@ -277,7 +278,7 @@ impl MinMaxStatistics {
}

fn sort_columns_from_physical_sort_exprs(
sort_order: &[PhysicalSortExpr],
sort_order: LexOrderingRef,
) -> Option<Vec<&Column>> {
sort_order
.iter()
Expand Down
Loading

0 comments on commit 752561a

Please sign in to comment.