diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 1d8d87ada784..ebe94450c1f8 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -173,6 +173,39 @@ fn criterion_benchmark(c: &mut Criterion) { ) }) }); + + c.bench_function("first_last_many_columns", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\ + last_value(u64_wide order by f64, u64_narrow, utf8) \ + FROM t GROUP BY u64_narrow", + ) + }) + }); + + c.bench_function("first_last_ignore_nulls", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \ + last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \ + FROM t GROUP BY u64_narrow", + ) + }) + }); + + c.bench_function("first_last_one_column", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT first_value(u64_wide order by f64), \ + last_value(u64_wide order by f64) \ + FROM t GROUP BY u64_narrow", + ) + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 8ef139ae6123..7c3728226132 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -23,7 +23,7 @@ use std::mem::size_of_val; use std::sync::Arc; use arrow::array::{ArrayRef, AsArray, BooleanArray}; -use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn}; +use arrow::compute::{self, LexicographicalComparator, SortColumn}; use arrow::datatypes::{DataType, Field}; use datafusion_common::utils::{compare_rows, get_row_at_idx}; use datafusion_common::{ @@ -250,6 +250,7 @@ impl FirstValueAccumulator { return Ok((!value.is_empty()).then_some(0)); } } + let sort_columns = ordering_values .iter() .zip(self.ordering_req.iter()) @@ -259,19 +260,12 @@ impl FirstValueAccumulator { }) .collect::>(); - if self.ignore_nulls { - let indices = lexsort_to_indices(&sort_columns, None)?; - // If ignoring nulls, find the first non-null value. - for index in indices.iter().flatten() { - if !value.is_null(index as usize) { - return Ok(Some(index as usize)); - } - } - Ok(None) - } else { - let indices = lexsort_to_indices(&sort_columns, Some(1))?; - Ok((!indices.is_empty()).then_some(indices.value(0) as _)) - } + let comparator = LexicographicalComparator::try_new(&sort_columns)?; + let best = (0..value.len()) + .filter(|&index| !(self.ignore_nulls && value.is_null(index))) + .min_by(|&a, &b| comparator.compare(a, b)); + + Ok(best) } } @@ -312,22 +306,19 @@ impl Accumulator for FirstValueAccumulator { // last index contains is_set flag. let is_set_idx = states.len() - 1; let flags = states[is_set_idx].as_boolean(); - let filtered_states = filter_states_according_to_is_set(states, flags)?; + let filtered_states = + filter_states_according_to_is_set(&states[0..is_set_idx], flags)?; // 1..is_set_idx range corresponds to ordering section - let sort_cols = convert_to_sort_cols( + let sort_columns = convert_to_sort_cols( &filtered_states[1..is_set_idx], self.ordering_req.as_ref(), ); - let ordered_states = if sort_cols.is_empty() { - // When no ordering is given, use the existing state as is: - filtered_states - } else { - let indices = lexsort_to_indices(&sort_cols, None)?; - take_arrays(&filtered_states, &indices, None)? - }; - if !ordered_states[0].is_empty() { - let first_row = get_row_at_idx(&ordered_states, 0)?; + let comparator = LexicographicalComparator::try_new(&sort_columns)?; + let min = (0..filtered_states[0].len()).min_by(|&a, &b| comparator.compare(a, b)); + + if let Some(first_idx) = min { + let first_row = get_row_at_idx(&filtered_states, first_idx)?; // When collecting orderings, we exclude the is_set flag from the state. let first_ordering = &first_row[1..is_set_idx]; let sort_options = get_sort_options(self.ordering_req.as_ref()); @@ -559,29 +550,18 @@ impl LastValueAccumulator { let sort_columns = ordering_values .iter() .zip(self.ordering_req.iter()) - .map(|(values, req)| { - // Take the reverse ordering requirement. This enables us to - // use "fetch = 1" to get the last value. - SortColumn { - values: Arc::clone(values), - options: Some(!req.options), - } + .map(|(values, req)| SortColumn { + values: Arc::clone(values), + options: Some(req.options), }) .collect::>(); - if self.ignore_nulls { - let indices = lexsort_to_indices(&sort_columns, None)?; - // If ignoring nulls, find the last non-null value. - for index in indices.iter().flatten() { - if !value.is_null(index as usize) { - return Ok(Some(index as usize)); - } - } - Ok(None) - } else { - let indices = lexsort_to_indices(&sort_columns, Some(1))?; - Ok((!indices.is_empty()).then_some(indices.value(0) as _)) - } + let comparator = LexicographicalComparator::try_new(&sort_columns)?; + let best = (0..value.len()) + .filter(|&index| !(self.ignore_nulls && value.is_null(index))) + .max_by(|&a, &b| comparator.compare(a, b)); + + Ok(best) } fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self { @@ -627,24 +607,19 @@ impl Accumulator for LastValueAccumulator { // last index contains is_set flag. let is_set_idx = states.len() - 1; let flags = states[is_set_idx].as_boolean(); - let filtered_states = filter_states_according_to_is_set(states, flags)?; + let filtered_states = + filter_states_according_to_is_set(&states[0..is_set_idx], flags)?; // 1..is_set_idx range corresponds to ordering section - let sort_cols = convert_to_sort_cols( + let sort_columns = convert_to_sort_cols( &filtered_states[1..is_set_idx], self.ordering_req.as_ref(), ); - let ordered_states = if sort_cols.is_empty() { - // When no ordering is given, use existing state as is: - filtered_states - } else { - let indices = lexsort_to_indices(&sort_cols, None)?; - take_arrays(&filtered_states, &indices, None)? - }; + let comparator = LexicographicalComparator::try_new(&sort_columns)?; + let max = (0..filtered_states[0].len()).max_by(|&a, &b| comparator.compare(a, b)); - if !ordered_states[0].is_empty() { - let last_idx = ordered_states[0].len() - 1; - let last_row = get_row_at_idx(&ordered_states, last_idx)?; + if let Some(last_idx) = max { + let last_row = get_row_at_idx(&filtered_states, last_idx)?; // When collecting orderings, we exclude the is_set flag from the state. let last_ordering = &last_row[1..is_set_idx]; let sort_options = get_sort_options(self.ordering_req.as_ref()); diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 056f88450c9f..66dd99d2ef63 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -3003,7 +3003,7 @@ SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, LAST_VALUE(amount ORDER BY ts ASC) AS fv2 FROM sales_global ---- -30 100 +30 80 # Conversion in between FIRST_VALUE and LAST_VALUE to resolve # contradictory requirements should work in multi partitions.