Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate LexOrderingRef and LexRequirementRef #13205

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -170,13 +170,13 @@ impl RunOpt {

async fn exec_sort(
ctx: &SessionContext,
expr: &[PhysicalSortExpr],
expr: LexOrderingRef<'_>,
test_file: &TestParquetFile,
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(ctx, None).await?;
let exec = Arc::new(SortExec::new(expr.to_owned(), scan));
let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
let elapsed = start.elapsed();
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/benches/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion::physical_plan::{
memory::MemoryExec,
};
use datafusion::prelude::SessionContext;
use datafusion_physical_expr_common::sort_expr::LexOrdering;

// Initialise the operator using the provided record batches and the sort key
// as inputs. All record batches must have the same schema.
Expand All @@ -52,7 +53,7 @@ fn sort_preserving_merge_operator(
expr: col(name, &schema).unwrap(),
options: Default::default(),
})
.collect::<Vec<_>>();
.collect::<LexOrdering>();

let exec = MemoryExec::try_new(
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};

/// Benchmarks for SortPreservingMerge stream
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use futures::StreamExt;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -257,7 +258,7 @@ impl BenchCase {
}

/// Make sort exprs for each column in `schema`
fn make_sort_exprs(schema: &Schema) -> Vec<PhysicalSortExpr> {
fn make_sort_exprs(schema: &Schema) -> LexOrdering {
schema
.fields()
.iter()
Expand Down
37 changes: 21 additions & 16 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1283,29 +1283,34 @@ mod tests {
// ok with one column
(
vec![vec![col("string_col").sort(true, false)]],
Ok(vec![vec![PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}]])
Ok(vec![LexOrdering {
inner: vec![PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}],
}
])
),
// ok with two columns, different options
(
vec![vec![
col("string_col").sort(true, false),
col("int_col").sort(false, true),
]],
Ok(vec![vec![
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),

PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
.desc()
.nulls_first()
]])
Ok(vec![LexOrdering {
inner: vec![
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
.desc()
.nulls_first()
],
}
])
),
];

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn create_ordering(

for exprs in sort_order {
// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
let mut sort_exprs = LexOrdering::default();
for sort in exprs {
match &sort.expr {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr::LexOrdering;

use log::warn;

Expand Down Expand Up @@ -307,7 +307,7 @@ impl FileScanConfig {
pub fn split_groups_by_statistics(
table_schema: &SchemaRef,
file_groups: &[Vec<PartitionedFile>],
sort_order: &[PhysicalSortExpr],
sort_order: &LexOrdering,
) -> Result<Vec<Vec<PartitionedFile>>> {
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
// First Fit:
Expand Down Expand Up @@ -1129,7 +1129,7 @@ mod tests {
let result = FileScanConfig::split_groups_by_statistics(
&table_schema,
&[partitioned_files.clone()],
&sort_order,
&LexOrdering::from(sort_order),
);
let results_by_name = result
.as_ref()
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use crate::{
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;

use futures::StreamExt;
use log::debug;
Expand Down Expand Up @@ -328,11 +329,11 @@ impl From<ObjectMeta> for FileMeta {
fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
) -> Vec<Vec<PhysicalSortExpr>> {
) -> Vec<LexOrdering> {
let mut all_orderings = vec![];
for output_ordering in &base_config.output_ordering {
let mut new_ordering = vec![];
for PhysicalSortExpr { expr, options } in output_ordering {
let mut new_ordering = LexOrdering::default();
for PhysicalSortExpr { expr, options } in output_ordering.iter() {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
let name = col.name();
if let Some((idx, _)) = projected_schema.column_with_name(name) {
Expand Down
21 changes: 11 additions & 10 deletions datafusion/core/src/datasource/physical_plan/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

use std::sync::Arc;

use crate::datasource::listing::PartitionedFile;

use arrow::{
compute::SortColumn,
row::{Row, Rows},
Expand All @@ -34,22 +36,21 @@ use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};

use crate::datasource::listing::PartitionedFile;
use datafusion_physical_expr_common::sort_expr::LexOrdering;

/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
/// The min/max values are ordered by [`Self::sort_order`].
/// Furthermore, any columns that are reversed in the sort order have their min/max values swapped.
pub(crate) struct MinMaxStatistics {
min_by_sort_order: Rows,
max_by_sort_order: Rows,
sort_order: Vec<PhysicalSortExpr>,
sort_order: LexOrdering,
}

impl MinMaxStatistics {
/// Sort order used to sort the statistics
#[allow(unused)]
pub fn sort_order(&self) -> &[PhysicalSortExpr] {
pub fn sort_order(&self) -> &LexOrdering {
&self.sort_order
}

Expand All @@ -65,8 +66,8 @@ impl MinMaxStatistics {
}

pub fn new_from_files<'a>(
projected_sort_order: &[PhysicalSortExpr], // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns)
files: impl IntoIterator<Item = &'a PartitionedFile>,
) -> Result<Self> {
Expand Down Expand Up @@ -150,7 +151,7 @@ impl MinMaxStatistics {
.unzip();

Self::new(
&min_max_sort_order,
&LexOrdering::from(min_max_sort_order),
&min_max_schema,
RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err(
|e| {
Expand All @@ -166,7 +167,7 @@ impl MinMaxStatistics {
}

pub fn new(
sort_order: &[PhysicalSortExpr],
sort_order: &LexOrdering,
schema: &SchemaRef,
min_values: RecordBatch,
max_values: RecordBatch,
Expand Down Expand Up @@ -256,7 +257,7 @@ impl MinMaxStatistics {
Ok(Self {
min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
sort_order: sort_order.to_vec(),
sort_order: LexOrdering::from(sort_order.clone()),
})
}

Expand All @@ -277,7 +278,7 @@ impl MinMaxStatistics {
}

fn sort_columns_from_physical_sort_exprs(
sort_order: &[PhysicalSortExpr],
sort_order: &LexOrdering,
) -> Option<Vec<&Column>> {
sort_order
.iter()
Expand Down
Loading
Loading