Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/partitioning_doc
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 3, 2023
2 parents b3eabfe + 8c42d94 commit a447f89
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 153 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl FileFormat for ArrowFormat {
const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
/// See https://github.com/apache/arrow-rs/issues/5021
/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
/// See <https://github.com/apache/arrow-rs/issues/5021>
async fn infer_schema_from_file_stream(
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
Expand Down
60 changes: 27 additions & 33 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,33 @@ use datafusion_physical_expr::expressions::{col, Sum};
use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

#[cfg(test)]
#[allow(clippy::items_after_test_module)]
mod tests {
use super::*;

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn aggregate_test() {
let test_cases = vec![
vec!["a"],
vec!["b", "a"],
vec!["c", "a"],
vec!["c", "b", "a"],
vec!["d", "a"],
vec!["d", "b", "a"],
vec!["d", "c", "a"],
vec!["d", "c", "b", "a"],
];
let n = 300;
let distincts = vec![10, 20];
for distinct in distincts {
let mut handles = Vec::new();
for i in 0..n {
let test_idx = i % test_cases.len();
let group_by_columns = test_cases[test_idx].clone();
let job = tokio::spawn(run_aggregate_test(
make_staggered_batches::<true>(1000, distinct, i as u64),
group_by_columns,
));
handles.push(job);
}
for job in handles {
job.await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn aggregate_test() {
let test_cases = vec![
vec!["a"],
vec!["b", "a"],
vec!["c", "a"],
vec!["c", "b", "a"],
vec!["d", "a"],
vec!["d", "b", "a"],
vec!["d", "c", "a"],
vec!["d", "c", "b", "a"],
];
let n = 300;
let distincts = vec![10, 20];
for distinct in distincts {
let mut handles = Vec::new();
for i in 0..n {
let test_idx = i % test_cases.len();
let group_by_columns = test_cases[test_idx].clone();
let job = tokio::spawn(run_aggregate_test(
make_staggered_batches::<true>(1000, distinct, i as u64),
group_by_columns,
));
handles.push(job);
}
for job in handles {
job.await.unwrap();
}
}
}
Expand Down
192 changes: 93 additions & 99 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,108 +44,102 @@ use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

#[cfg(test)]
#[allow(clippy::items_after_test_module)]
mod tests {
use super::*;

use datafusion_physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};
use datafusion_physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};

#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
// make_staggered_batches gives result sorted according to a, b, c
// In the test cases first entry represents partition by columns
// Second entry represents order by columns.
// Third entry represents search mode.
// In sorted mode physical plans are in the form for WindowAggExec
//```
// WindowAggExec
// MemoryExec]
// ```
// and in the form for BoundedWindowAggExec
// ```
// BoundedWindowAggExec
// MemoryExec
// ```
// In Linear and PartiallySorted mode physical plans are in the form for WindowAggExec
//```
// WindowAggExec
// SortExec(required by window function)
// MemoryExec]
// ```
// and in the form for BoundedWindowAggExec
// ```
// BoundedWindowAggExec
// MemoryExec
// ```
let test_cases = vec![
(vec!["a"], vec!["a"], Sorted),
(vec!["a"], vec!["b"], Sorted),
(vec!["a"], vec!["a", "b"], Sorted),
(vec!["a"], vec!["b", "c"], Sorted),
(vec!["a"], vec!["a", "b", "c"], Sorted),
(vec!["b"], vec!["a"], Linear),
(vec!["b"], vec!["a", "b"], Linear),
(vec!["b"], vec!["a", "c"], Linear),
(vec!["b"], vec!["a", "b", "c"], Linear),
(vec!["c"], vec!["a"], Linear),
(vec!["c"], vec!["a", "b"], Linear),
(vec!["c"], vec!["a", "c"], Linear),
(vec!["c"], vec!["a", "b", "c"], Linear),
(vec!["b", "a"], vec!["a"], Sorted),
(vec!["b", "a"], vec!["b"], Sorted),
(vec!["b", "a"], vec!["c"], Sorted),
(vec!["b", "a"], vec!["a", "b"], Sorted),
(vec!["b", "a"], vec!["b", "c"], Sorted),
(vec!["b", "a"], vec!["a", "c"], Sorted),
(vec!["b", "a"], vec!["a", "b", "c"], Sorted),
(vec!["c", "b"], vec!["a"], Linear),
(vec!["c", "b"], vec!["a", "b"], Linear),
(vec!["c", "b"], vec!["a", "c"], Linear),
(vec!["c", "b"], vec!["a", "b", "c"], Linear),
(vec!["c", "a"], vec!["a"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b", "c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "c"], PartiallySorted(vec![1])),
(
vec!["c", "a"],
vec!["a", "b", "c"],
PartiallySorted(vec![1]),
),
(vec!["c", "b", "a"], vec!["a"], Sorted),
(vec!["c", "b", "a"], vec!["b"], Sorted),
(vec!["c", "b", "a"], vec!["c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "b"], Sorted),
(vec!["c", "b", "a"], vec!["b", "c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "b", "c"], Sorted),
];
let n = 300;
let n_distincts = vec![10, 20];
for n_distinct in n_distincts {
let mut handles = Vec::new();
for i in 0..n {
let idx = i % test_cases.len();
let (pb_cols, ob_cols, search_mode) = test_cases[idx].clone();
let job = tokio::spawn(run_window_test(
make_staggered_batches::<true>(1000, n_distinct, i as u64),
i as u64,
pb_cols,
ob_cols,
search_mode,
));
handles.push(job);
}
for job in handles {
job.await.unwrap()?;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
// make_staggered_batches gives result sorted according to a, b, c
// In the test cases first entry represents partition by columns
// Second entry represents order by columns.
// Third entry represents search mode.
// In sorted mode physical plans are in the form for WindowAggExec
//```
// WindowAggExec
// MemoryExec]
// ```
// and in the form for BoundedWindowAggExec
// ```
// BoundedWindowAggExec
// MemoryExec
// ```
// In Linear and PartiallySorted mode physical plans are in the form for WindowAggExec
//```
// WindowAggExec
// SortExec(required by window function)
// MemoryExec]
// ```
// and in the form for BoundedWindowAggExec
// ```
// BoundedWindowAggExec
// MemoryExec
// ```
let test_cases = vec![
(vec!["a"], vec!["a"], Sorted),
(vec!["a"], vec!["b"], Sorted),
(vec!["a"], vec!["a", "b"], Sorted),
(vec!["a"], vec!["b", "c"], Sorted),
(vec!["a"], vec!["a", "b", "c"], Sorted),
(vec!["b"], vec!["a"], Linear),
(vec!["b"], vec!["a", "b"], Linear),
(vec!["b"], vec!["a", "c"], Linear),
(vec!["b"], vec!["a", "b", "c"], Linear),
(vec!["c"], vec!["a"], Linear),
(vec!["c"], vec!["a", "b"], Linear),
(vec!["c"], vec!["a", "c"], Linear),
(vec!["c"], vec!["a", "b", "c"], Linear),
(vec!["b", "a"], vec!["a"], Sorted),
(vec!["b", "a"], vec!["b"], Sorted),
(vec!["b", "a"], vec!["c"], Sorted),
(vec!["b", "a"], vec!["a", "b"], Sorted),
(vec!["b", "a"], vec!["b", "c"], Sorted),
(vec!["b", "a"], vec!["a", "c"], Sorted),
(vec!["b", "a"], vec!["a", "b", "c"], Sorted),
(vec!["c", "b"], vec!["a"], Linear),
(vec!["c", "b"], vec!["a", "b"], Linear),
(vec!["c", "b"], vec!["a", "c"], Linear),
(vec!["c", "b"], vec!["a", "b", "c"], Linear),
(vec!["c", "a"], vec!["a"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b", "c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "c"], PartiallySorted(vec![1])),
(
vec!["c", "a"],
vec!["a", "b", "c"],
PartiallySorted(vec![1]),
),
(vec!["c", "b", "a"], vec!["a"], Sorted),
(vec!["c", "b", "a"], vec!["b"], Sorted),
(vec!["c", "b", "a"], vec!["c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "b"], Sorted),
(vec!["c", "b", "a"], vec!["b", "c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "b", "c"], Sorted),
];
let n = 300;
let n_distincts = vec![10, 20];
for n_distinct in n_distincts {
let mut handles = Vec::new();
for i in 0..n {
let idx = i % test_cases.len();
let (pb_cols, ob_cols, search_mode) = test_cases[idx].clone();
let job = tokio::spawn(run_window_test(
make_staggered_batches::<true>(1000, n_distinct, i as u64),
i as u64,
pb_cols,
ob_cols,
search_mode,
));
handles.push(job);
}
for job in handles {
job.await.unwrap()?;
}
Ok(())
}
Ok(())
}

fn get_random_function(
Expand Down
75 changes: 56 additions & 19 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,35 +725,31 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
}

fn align_array_dimensions(args: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
// Find the maximum number of dimensions
let max_ndim: u64 = (*args
let args_ndim = args
.iter()
.map(|arr| compute_array_ndims(Some(arr.clone())))
.collect::<Result<Vec<Option<u64>>>>()?
.iter()
.max()
.unwrap())
.unwrap();
.map(|arg| compute_array_ndims(Some(arg.to_owned())))
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|x| x.unwrap_or(0))
.collect::<Vec<_>>();
let max_ndim = args_ndim.iter().max().unwrap_or(&0);

// Align the dimensions of the arrays
let aligned_args: Result<Vec<ArrayRef>> = args
.into_iter()
.map(|array| {
let ndim = compute_array_ndims(Some(array.clone()))?.unwrap();
.zip(args_ndim.iter())
.map(|(array, ndim)| {
if ndim < max_ndim {
let mut aligned_array = array.clone();
for _ in 0..(max_ndim - ndim) {
let data_type = aligned_array.as_ref().data_type().clone();
let offsets: Vec<i32> =
(0..downcast_arg!(aligned_array, ListArray).offsets().len())
.map(|i| i as i32)
.collect();
let field = Arc::new(Field::new("item", data_type, true));
let data_type = aligned_array.data_type().to_owned();
let array_lengths = vec![1; aligned_array.len()];
let offsets = OffsetBuffer::<i32>::from_lengths(array_lengths);

aligned_array = Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(aligned_array.clone()),
Arc::new(Field::new("item", data_type, true)),
offsets,
aligned_array,
None,
)?)
}
Expand Down Expand Up @@ -1923,6 +1919,47 @@ mod tests {
use arrow::datatypes::Int64Type;
use datafusion_common::cast::as_uint64_array;

#[test]
fn test_align_array_dimensions() {
let array1d_1 =
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1), Some(2), Some(3)]),
Some(vec![Some(4), Some(5)]),
]));
let array1d_2 =
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(6), Some(7), Some(8)]),
]));

let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as ArrayRef;
let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as ArrayRef;

let res =
align_array_dimensions(vec![array1d_1.to_owned(), array2d_2.to_owned()])
.unwrap();

let expected = as_list_array(&array2d_1).unwrap();
let expected_dim = compute_array_ndims(Some(array2d_1.to_owned())).unwrap();
assert_ne!(as_list_array(&res[0]).unwrap(), expected);
assert_eq!(
compute_array_ndims(Some(res[0].clone())).unwrap(),
expected_dim
);

let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef;
let array3d_2 = array_into_list_array(array2d_2.to_owned());
let res =
align_array_dimensions(vec![array1d_1, Arc::new(array3d_2.clone())]).unwrap();

let expected = as_list_array(&array3d_1).unwrap();
let expected_dim = compute_array_ndims(Some(array3d_1.to_owned())).unwrap();
assert_ne!(as_list_array(&res[0]).unwrap(), expected);
assert_eq!(
compute_array_ndims(Some(res[0].clone())).unwrap(),
expected_dim
);
}

#[test]
fn test_array() {
// make_array(1, 2, 3) = [1, 2, 3]
Expand Down

0 comments on commit a447f89

Please sign in to comment.