Skip to content

Commit

Permalink
Extract parquet statistics to its own module, add tests (apache#8294)
Browse files Browse the repository at this point in the history
* Extract parquet statistics to its own module, add tests

* Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

Co-authored-by: Raphael Taylor-Davies <[email protected]>

* rename enum

* Improve API

* Add test for reading struct array statistics

* Add test for column after statistics

* improve tests

* simplify

* clippy

* Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

* Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

* Add test showing incorrect statistics

* Rework statistics

* Fix clippy

* Update documentation and make it clear the statistics are not publically accessable

* Add link to upstream arrow ticket

---------

Co-authored-by: Raphael Taylor-Davies <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
3 people authored Nov 29, 2023
1 parent 167b5b7 commit 06bbe12
Show file tree
Hide file tree
Showing 4 changed files with 951 additions and 166 deletions.
24 changes: 2 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ mod metrics;
pub mod page_filter;
mod row_filter;
mod row_groups;
mod statistics;

pub use metrics::ParquetFileMetrics;

Expand Down Expand Up @@ -506,6 +507,7 @@ impl FileOpener for ParquetOpener {
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
predicate,
Expand Down Expand Up @@ -718,28 +720,6 @@ pub async fn plan_to_parquet(
Ok(())
}

// Copy from the arrow-rs
// https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55
// Convert the byte slice to fixed length byte array with the length of 16
fn sign_extend_be(b: &[u8]) -> [u8; 16] {
assert!(b.len() <= 16, "Array too large, expected less than 16");
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
*d = *s;
}
result
}

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(sign_extend_be(b))
}

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use parquet::{
};
use std::sync::Arc;

use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;
Expand Down
Loading

0 comments on commit 06bbe12

Please sign in to comment.