Skip to content

Commit

Permalink
parquet: Add tests for pruning on Int8/Int16/Int64 columns (#9778)
Browse files Browse the repository at this point in the history
* parquet: Add tests for Bloom filters on Int8/Int16/Int64 columns

* Document int_tests macro

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
progval and alamb authored Mar 31, 2024
1 parent 57c0bc6 commit 2cb6f73
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 311 deletions.
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ ctor = { workspace = true }
doc-comment = { workspace = true }
env_logger = { workspace = true }
half = { workspace = true, default-features = true }
paste = "^1.0"
postgres-protocol = "0.6.4"
postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] }
rand = { workspace = true, features = ["small_rng"] }
Expand Down
48 changes: 34 additions & 14 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use arrow::array::Decimal128Array;
use arrow::{
array::{
Array, ArrayRef, BinaryArray, Date32Array, Date64Array, FixedSizeBinaryArray,
Float64Array, Int32Array, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down Expand Up @@ -62,7 +63,7 @@ fn init() {
enum Scenario {
Timestamps,
Dates,
Int32,
Int,
Int32Range,
Float64,
Decimal,
Expand Down Expand Up @@ -389,12 +390,31 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
/// Return record batch with i32 sequence
///
/// Columns are named
/// "i" -> Int32Array
fn make_int32_batch(start: i32, end: i32) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let v: Vec<i32> = (start..end).collect();
let array = Arc::new(Int32Array::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
/// "i8" -> Int8Array
/// "i16" -> Int16Array
/// "i32" -> Int32Array
/// "i64" -> Int64Array
fn make_int_batches(start: i8, end: i8) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("i8", DataType::Int8, true),
Field::new("i16", DataType::Int16, true),
Field::new("i32", DataType::Int32, true),
Field::new("i64", DataType::Int64, true),
]));
let v8: Vec<i8> = (start..end).collect();
let v16: Vec<i16> = (start as _..end as _).collect();
let v32: Vec<i32> = (start as _..end as _).collect();
let v64: Vec<i64> = (start as _..end as _).collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(Int8Array::from(v8)) as ArrayRef,
Arc::new(Int16Array::from(v16)) as ArrayRef,
Arc::new(Int32Array::from(v32)) as ArrayRef,
Arc::new(Int64Array::from(v64)) as ArrayRef,
],
)
.unwrap()
}

fn make_int32_range(start: i32, end: i32) -> RecordBatch {
Expand Down Expand Up @@ -589,12 +609,12 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_date_batch(TimeDelta::try_days(3600).unwrap()),
]
}
Scenario::Int32 => {
Scenario::Int => {
vec![
make_int32_batch(-5, 0),
make_int32_batch(-4, 1),
make_int32_batch(0, 5),
make_int32_batch(5, 10),
make_int_batches(-5, 0),
make_int_batches(-4, 1),
make_int_batches(0, 5),
make_int_batches(5, 10),
]
}
Scenario::Int32Range => {
Expand Down
276 changes: 141 additions & 135 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,112 +371,149 @@ async fn prune_date64() {
assert_eq!(output.result_rows, 1, "{}", output.description());
}

#[tokio::test]
// null count min max
// page-0 0 -5 -1
// page-1 0 -4 0
// page-2 0 0 4
// page-3 0 5 9
async fn prune_int32_lt() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where i < 1",
Some(0),
Some(5),
11,
)
.await;
// result of sql "SELECT * FROM t where i < 1" is same as
// "SELECT * FROM t where -i > -1"
test_prune(
Scenario::Int32,
"SELECT * FROM t where -i > -1",
Some(0),
Some(5),
11,
)
.await;
}

#[tokio::test]
async fn prune_int32_gt() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where i > 8",
Some(0),
Some(15),
1,
)
.await;

test_prune(
Scenario::Int32,
"SELECT * FROM t where -i < -8",
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn prune_int32_eq() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where i = 1",
Some(0),
Some(15),
1,
)
.await;
}
#[tokio::test]
async fn prune_int32_scalar_fun_and_eq() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where abs(i) = 1 and i = 1",
Some(0),
Some(15),
1,
)
.await;
macro_rules! int_tests {
($bits:expr) => {
paste::item! {
#[tokio::test]
// null count min max
// page-0 0 -5 -1
// page-1 0 -4 0
// page-2 0 0 4
// page-3 0 5 9
async fn [<prune_int $bits _lt>]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} < 1", $bits),
Some(0),
Some(5),
11,
)
.await;
// result of sql "SELECT * FROM t where i < 1" is same as
// "SELECT * FROM t where -i > -1"
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where -i{} > -1", $bits),
Some(0),
Some(5),
11,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _gt >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} > 8", $bits),
Some(0),
Some(15),
1,
)
.await;

test_prune(
Scenario::Int,
&format!("SELECT * FROM t where -i{} < -8", $bits),
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _eq >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} = 1", $bits),
Some(0),
Some(15),
1,
)
.await;
}
#[tokio::test]
async fn [<prune_int $bits _scalar_fun_and_eq >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where abs(i{}) = 1 and i{} = 1", $bits, $bits),
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _scalar_fun >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where abs(i{}) = 1", $bits),
Some(0),
Some(0),
3,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _complex_expr>]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{}+1 = 1", $bits),
Some(0),
Some(0),
2,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _complex_expr_subtract >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where 1-i{} > 1", $bits),
Some(0),
Some(0),
9,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _eq_in_list >]() {
// result of sql "SELECT * FROM t where in (1)"
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} in (1)", $bits),
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _eq_in_list_negated >]() {
// result of sql "SELECT * FROM t where not in (1)" prune nothing
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} not in (1)", $bits),
Some(0),
Some(0),
19,
)
.await;
}
}
}
}

#[tokio::test]
async fn prune_int32_scalar_fun() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where abs(i) = 1",
Some(0),
Some(0),
3,
)
.await;
}

#[tokio::test]
async fn prune_int32_complex_expr() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where i+1 = 1",
Some(0),
Some(0),
2,
)
.await;
}

#[tokio::test]
async fn prune_int32_complex_expr_subtract() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where 1-i > 1",
Some(0),
Some(0),
9,
)
.await;
}
int_tests!(8);
int_tests!(16);
int_tests!(32);
int_tests!(64);

#[tokio::test]
// null count min max
Expand Down Expand Up @@ -556,37 +593,6 @@ async fn prune_f64_complex_expr_subtract() {
.await;
}

#[tokio::test]
// null count min max
// page-0 0 -5 -1
// page-1 0 -4 0
// page-2 0 0 4
// page-3 0 5 9
async fn prune_int32_eq_in_list() {
// result of sql "SELECT * FROM t where in (1)"
test_prune(
Scenario::Int32,
"SELECT * FROM t where i in (1)",
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn prune_int32_eq_in_list_negated() {
// result of sql "SELECT * FROM t where not in (1)" prune nothing
test_prune(
Scenario::Int32,
"SELECT * FROM t where i not in (1)",
Some(0),
Some(0),
19,
)
.await;
}

#[tokio::test]
async fn prune_decimal_lt() {
// The data type of decimal_col is decimal(9,2)
Expand Down
Loading

0 comments on commit 2cb6f73

Please sign in to comment.