Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not sort rows in FirstValueAccumulator #14402

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions datafusion/core/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,39 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});

c.bench_function("first_last_many_columns", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\
last_value(u64_wide order by f64, u64_narrow, utf8) \
FROM t GROUP BY u64_narrow",
)
})
});

c.bench_function("first_last_ignore_nulls", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \
last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \
FROM t GROUP BY u64_narrow",
)
})
});

c.bench_function("first_last_one_column", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT first_value(u64_wide order by f64), \
last_value(u64_wide order by f64) \
FROM t GROUP BY u64_narrow",
)
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
89 changes: 32 additions & 57 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::mem::size_of_val;
use std::sync::Arc;

use arrow::array::{ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn};
use arrow::compute::{self, LexicographicalComparator, SortColumn};
use arrow::datatypes::{DataType, Field};
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{
Expand Down Expand Up @@ -250,6 +250,7 @@ impl FirstValueAccumulator {
return Ok((!value.is_empty()).then_some(0));
}
}

let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
Expand All @@ -259,19 +260,12 @@ impl FirstValueAccumulator {
})
.collect::<Vec<_>>();

if self.ignore_nulls {
let indices = lexsort_to_indices(&sort_columns, None)?;
// If ignoring nulls, find the first non-null value.
for index in indices.iter().flatten() {
if !value.is_null(index as usize) {
return Ok(Some(index as usize));
}
}
Ok(None)
} else {
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
let best = (0..value.len())
.filter(|&index| !(self.ignore_nulls && value.is_null(index)))
.min_by(|&a, &b| comparator.compare(a, b));

Ok(best)
}
}

Expand Down Expand Up @@ -312,22 +306,19 @@ impl Accumulator for FirstValueAccumulator {
// last index contains is_set flag.
let is_set_idx = states.len() - 1;
let flags = states[is_set_idx].as_boolean();
let filtered_states = filter_states_according_to_is_set(states, flags)?;
let filtered_states =
filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
// 1..is_set_idx range corresponds to ordering section
let sort_cols = convert_to_sort_cols(
let sort_columns = convert_to_sort_cols(
&filtered_states[1..is_set_idx],
self.ordering_req.as_ref(),
);

let ordered_states = if sort_cols.is_empty() {
// When no ordering is given, use the existing state as is:
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
take_arrays(&filtered_states, &indices, None)?
};
if !ordered_states[0].is_empty() {
let first_row = get_row_at_idx(&ordered_states, 0)?;
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
let min = (0..filtered_states[0].len()).min_by(|&a, &b| comparator.compare(a, b));

if let Some(first_idx) = min {
let first_row = get_row_at_idx(&filtered_states, first_idx)?;
// When collecting orderings, we exclude the is_set flag from the state.
let first_ordering = &first_row[1..is_set_idx];
let sort_options = get_sort_options(self.ordering_req.as_ref());
Expand Down Expand Up @@ -559,29 +550,18 @@ impl LastValueAccumulator {
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| {
// Take the reverse ordering requirement. This enables us to
// use "fetch = 1" to get the last value.
SortColumn {
values: Arc::clone(values),
options: Some(!req.options),
}
.map(|(values, req)| SortColumn {
values: Arc::clone(values),
options: Some(req.options),
})
.collect::<Vec<_>>();

if self.ignore_nulls {
let indices = lexsort_to_indices(&sort_columns, None)?;
// If ignoring nulls, find the last non-null value.
for index in indices.iter().flatten() {
if !value.is_null(index as usize) {
return Ok(Some(index as usize));
}
}
Ok(None)
} else {
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
let best = (0..value.len())
.filter(|&index| !(self.ignore_nulls && value.is_null(index)))
Copy link
Contributor

@jayzhan211 jayzhan211 Feb 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make 'ignore nulls' a constant generic, it might be faster than we can avoid filter logic for null if we don't ignore nulls

.max_by(|&a, &b| comparator.compare(a, b));

Ok(best)
}

fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self {
Expand Down Expand Up @@ -627,24 +607,19 @@ impl Accumulator for LastValueAccumulator {
// last index contains is_set flag.
let is_set_idx = states.len() - 1;
let flags = states[is_set_idx].as_boolean();
let filtered_states = filter_states_according_to_is_set(states, flags)?;
let filtered_states =
filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
// 1..is_set_idx range corresponds to ordering section
let sort_cols = convert_to_sort_cols(
let sort_columns = convert_to_sort_cols(
&filtered_states[1..is_set_idx],
self.ordering_req.as_ref(),
);

let ordered_states = if sort_cols.is_empty() {
// When no ordering is given, use existing state as is:
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
take_arrays(&filtered_states, &indices, None)?
};
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
Comment on lines -641 to +618
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In lexsort_to_indices there are some additional optimizations when there's just one column - so I also wrote a version which reuses lexsort_to_indices - but the speed increase there is actually 20% smaller than here

let max = (0..filtered_states[0].len()).max_by(|&a, &b| comparator.compare(a, b));

if !ordered_states[0].is_empty() {
let last_idx = ordered_states[0].len() - 1;
let last_row = get_row_at_idx(&ordered_states, last_idx)?;
if let Some(last_idx) = max {
let last_row = get_row_at_idx(&filtered_states, last_idx)?;
// When collecting orderings, we exclude the is_set flag from the state.
let last_ordering = &last_row[1..is_set_idx];
let sort_options = get_sort_options(self.ordering_req.as_ref());
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3003,7 +3003,7 @@ SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
LAST_VALUE(amount ORDER BY ts ASC) AS fv2
FROM sales_global
----
30 100
30 80
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it actually makes more sense because that line has the same ORDER BY value but appears last in the table.


# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
# contradictory requirements should work in multi partitions.
Expand Down