diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 731672ceb8b8..95aae71c779e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -66,6 +66,7 @@ mod metrics; pub mod page_filter; mod row_filter; mod row_groups; +mod statistics; pub use metrics::ParquetFileMetrics; @@ -506,6 +507,7 @@ impl FileOpener for ParquetOpener { let file_metadata = builder.metadata().clone(); let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let mut row_groups = row_groups::prune_row_groups_by_statistics( + builder.parquet_schema(), file_metadata.row_groups(), file_range, predicate, @@ -718,28 +720,6 @@ pub async fn plan_to_parquet( Ok(()) } -// Copy from the arrow-rs -// https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55 -// Convert the byte slice to fixed length byte array with the length of 16 -fn sign_extend_be(b: &[u8]) -> [u8; 16] { - assert!(b.len() <= 16, "Array too large, expected less than 16"); - let is_negative = (b[0] & 128u8) == 128u8; - let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] }; - for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) { - *d = *s; - } - result -} - -// Convert the bytes array to i128. -// The endian of the input bytes array must be big-endian. -pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { - // The bytes array are from parquet file and must be the big-endian. - // The endian is defined by parquet format, and the reference document - // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 - i128::from_be_bytes(sign_extend_be(b)) -} - // Convert parquet column schema to arrow data type, and just consider the // decimal data type. pub(crate) fn parquet_to_arrow_decimal_type( diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index b5b5f154f7a0..42bfef35996e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -39,9 +39,8 @@ use parquet::{ }; use std::sync::Arc; -use crate::datasource::physical_plan::parquet::{ - from_bytes_to_i128, parquet_to_arrow_decimal_type, -}; +use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; +use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 0079368f9cdd..0ab2046097c4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -15,25 +15,25 @@ // specific language governing permissions and limitations // under the License. -use arrow::{ - array::ArrayRef, - datatypes::{DataType, Schema}, -}; +use arrow::{array::ArrayRef, datatypes::Schema}; +use arrow_schema::FieldRef; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{Column, DataFusionError, Result, ScalarValue}; +use parquet::file::metadata::ColumnChunkMetaData; +use parquet::schema::types::SchemaDescriptor; use parquet::{ arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder}, bloom_filter::Sbbf, - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, + file::metadata::RowGroupMetaData, }; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use crate::datasource::{ - listing::FileRange, - physical_plan::parquet::{from_bytes_to_i128, parquet_to_arrow_decimal_type}, +use crate::datasource::listing::FileRange; +use crate::datasource::physical_plan::parquet::statistics::{ + max_statistics, min_statistics, parquet_column, }; use crate::logical_expr::Operator; use crate::physical_expr::expressions as phys_expr; @@ -51,7 +51,11 @@ use super::ParquetFileMetrics; /// /// If an index IS present in the returned Vec it means the predicate /// did not filter out that row group. +/// +/// Note: This method currently ignores ColumnOrder +/// pub(crate) fn prune_row_groups_by_statistics( + parquet_schema: &SchemaDescriptor, groups: &[RowGroupMetaData], range: Option, predicate: Option<&PruningPredicate>, @@ -74,8 +78,9 @@ pub(crate) fn prune_row_groups_by_statistics( if let Some(predicate) = predicate { let pruning_stats = RowGroupPruningStatistics { + parquet_schema, row_group_metadata: metadata, - parquet_schema: predicate.schema().as_ref(), + arrow_schema: predicate.schema().as_ref(), }; match predicate.prune(&pruning_stats) { Ok(values) => { @@ -296,146 +301,33 @@ impl BloomFilterPruningPredicate { } } -/// Wraps parquet statistics in a way -/// that implements [`PruningStatistics`] +/// Wraps [`RowGroupMetaData`] in a way that implements [`PruningStatistics`] +/// +/// Note: This should be implemented for an array of [`RowGroupMetaData`] instead +/// of per row-group struct RowGroupPruningStatistics<'a> { + parquet_schema: &'a SchemaDescriptor, row_group_metadata: &'a RowGroupMetaData, - parquet_schema: &'a Schema, + arrow_schema: &'a Schema, } -/// Extract the min/max statistics from a `ParquetStatistics` object -macro_rules! get_statistic { - ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{ - if !$column_statistics.has_min_max_set() { - return None; - } - match $column_statistics { - ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))), - ParquetStatistics::Int32(s) => { - match $target_arrow_type { - // int32 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(*s.$func() as i128), - precision, - scale, - )) - } - _ => Some(ScalarValue::Int32(Some(*s.$func()))), - } - } - ParquetStatistics::Int64(s) => { - match $target_arrow_type { - // int64 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(*s.$func() as i128), - precision, - scale, - )) - } - _ => Some(ScalarValue::Int64(Some(*s.$func()))), - } - } - // 96 bit ints not supported - ParquetStatistics::Int96(_) => None, - ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))), - ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))), - ParquetStatistics::ByteArray(s) => { - match $target_arrow_type { - // decimal data type - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(from_bytes_to_i128(s.$bytes_func())), - precision, - scale, - )) - } - _ => { - let s = std::str::from_utf8(s.$bytes_func()) - .map(|s| s.to_string()) - .ok(); - Some(ScalarValue::Utf8(s)) - } - } - } - // type not supported yet - ParquetStatistics::FixedLenByteArray(s) => { - match $target_arrow_type { - // just support the decimal data type - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(from_bytes_to_i128(s.$bytes_func())), - precision, - scale, - )) - } - _ => None, - } - } - } - }}; -} - -// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate -macro_rules! get_min_max_values { - ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ - let (_column_index, field) = - if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { - (v, f) - } else { - // Named column was not present - return None; - }; - - let data_type = field.data_type(); - // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type - let null_scalar: ScalarValue = data_type.try_into().ok()?; - - $self.row_group_metadata - .columns() - .iter() - .find(|c| c.column_descr().name() == &$column.name) - .and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None}) - .map(|(stats, column_descr)| - { - let target_data_type = parquet_to_arrow_decimal_type(column_descr); - get_statistic!(stats, $func, $bytes_func, target_data_type) - }) - .flatten() - // column either didn't have statistics at all or didn't have min/max values - .or_else(|| Some(null_scalar.clone())) - .and_then(|s| s.to_array().ok()) - }} -} - -// Extract the null count value on the ParquetStatistics -macro_rules! get_null_count_values { - ($self:expr, $column:expr) => {{ - let value = ScalarValue::UInt64( - if let Some(col) = $self - .row_group_metadata - .columns() - .iter() - .find(|c| c.column_descr().name() == &$column.name) - { - col.statistics().map(|s| s.null_count()) - } else { - Some($self.row_group_metadata.num_rows() as u64) - }, - ); - - value.to_array().ok() - }}; +impl<'a> RowGroupPruningStatistics<'a> { + /// Lookups up the parquet column by name + fn column(&self, name: &str) -> Option<(&ColumnChunkMetaData, &FieldRef)> { + let (idx, field) = parquet_column(self.parquet_schema, self.arrow_schema, name)?; + Some((self.row_group_metadata.column(idx), field)) + } } impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn min_values(&self, column: &Column) -> Option { - get_min_max_values!(self, column, min, min_bytes) + let (column, field) = self.column(&column.name)?; + min_statistics(field.data_type(), std::iter::once(column.statistics())).ok() } fn max_values(&self, column: &Column) -> Option { - get_min_max_values!(self, column, max, max_bytes) + let (column, field) = self.column(&column.name)?; + max_statistics(field.data_type(), std::iter::once(column.statistics())).ok() } fn num_containers(&self) -> usize { @@ -443,7 +335,9 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } fn null_counts(&self, column: &Column) -> Option { - get_null_count_values!(self, column) + let (c, _) = self.column(&column.name)?; + let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count())); + scalar.to_array().ok() } } @@ -463,6 +357,7 @@ mod tests { use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use datafusion_sql::planner::ContextProvider; + use parquet::arrow::arrow_to_parquet_schema; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::basic::LogicalType; use parquet::data_type::{ByteArray, FixedLenByteArray}; @@ -540,6 +435,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2], None, Some(&pruning_predicate), @@ -574,6 +470,7 @@ mod tests { // is null / undefined so the first row group can't be filtered out assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2], None, Some(&pruning_predicate), @@ -621,6 +518,7 @@ mod tests { // when conditions are joined using AND assert_eq!( prune_row_groups_by_statistics( + &schema_descr, groups, None, Some(&pruning_predicate), @@ -639,6 +537,7 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out assert_eq!( prune_row_groups_by_statistics( + &schema_descr, groups, None, Some(&pruning_predicate), @@ -678,6 +577,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); + let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); let expr = logical2physical(&expr, &schema); let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); @@ -687,6 +587,7 @@ mod tests { // First row group was filtered out because it contains no null value on "c2". assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &groups, None, Some(&pruning_predicate), @@ -706,6 +607,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); + let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); let expr = col("c1") .gt(lit(15)) .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); @@ -718,6 +620,7 @@ mod tests { // pass predicates. Ideally these should both be false assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &groups, None, Some(&pruning_predicate), @@ -776,6 +679,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -839,6 +743,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3, rgm4], None, Some(&pruning_predicate), @@ -886,6 +791,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -956,6 +862,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -1015,6 +922,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -1028,7 +936,6 @@ mod tests { schema_descr: &SchemaDescPtr, column_statistics: Vec, ) -> RowGroupMetaData { - use parquet::file::metadata::ColumnChunkMetaData; let mut columns = vec![]; for (i, s) in column_statistics.iter().enumerate() { let column = ColumnChunkMetaData::builder(schema_descr.column(i)) @@ -1046,7 +953,7 @@ mod tests { } fn get_test_schema_descr(fields: Vec) -> SchemaDescPtr { - use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; + use parquet::schema::types::Type as SchemaType; let schema_fields = fields .iter() .map(|field| { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs new file mode 100644 index 000000000000..4e472606da51 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -0,0 +1,899 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`min_statistics`] and [`max_statistics`] convert statistics in parquet format to arrow [`ArrayRef`]. + +// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 + +use arrow::{array::ArrayRef, datatypes::DataType}; +use arrow_array::new_empty_array; +use arrow_schema::{FieldRef, Schema}; +use datafusion_common::{Result, ScalarValue}; +use parquet::file::statistics::Statistics as ParquetStatistics; +use parquet::schema::types::SchemaDescriptor; + +// Convert the bytes array to i128. +// The endian of the input bytes array must be big-endian. +pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { + // The bytes array are from parquet file and must be the big-endian. + // The endian is defined by parquet format, and the reference document + // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 + i128::from_be_bytes(sign_extend_be(b)) +} + +// Copy from arrow-rs +// https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55 +// Convert the byte slice to fixed length byte array with the length of 16 +fn sign_extend_be(b: &[u8]) -> [u8; 16] { + assert!(b.len() <= 16, "Array too large, expected less than 16"); + let is_negative = (b[0] & 128u8) == 128u8; + let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] }; + for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) { + *d = *s; + } + result +} + +/// Extract a single min/max statistics from a [`ParquetStatistics`] object +/// +/// * `$column_statistics` is the `ParquetStatistics` object +/// * `$func is the function` (`min`/`max`) to call to get the value +/// * `$bytes_func` is the function (`min_bytes`/`max_bytes`) to call to get the value as bytes +/// * `$target_arrow_type` is the [`DataType`] of the target statistics +macro_rules! get_statistic { + ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{ + if !$column_statistics.has_min_max_set() { + return None; + } + match $column_statistics { + ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))), + ParquetStatistics::Int32(s) => { + match $target_arrow_type { + // int32 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(*s.$func() as i128), + *precision, + *scale, + )) + } + _ => Some(ScalarValue::Int32(Some(*s.$func()))), + } + } + ParquetStatistics::Int64(s) => { + match $target_arrow_type { + // int64 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(*s.$func() as i128), + *precision, + *scale, + )) + } + _ => Some(ScalarValue::Int64(Some(*s.$func()))), + } + } + // 96 bit ints not supported + ParquetStatistics::Int96(_) => None, + ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))), + ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))), + ParquetStatistics::ByteArray(s) => { + match $target_arrow_type { + // decimal data type + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(from_bytes_to_i128(s.$bytes_func())), + *precision, + *scale, + )) + } + _ => { + let s = std::str::from_utf8(s.$bytes_func()) + .map(|s| s.to_string()) + .ok(); + Some(ScalarValue::Utf8(s)) + } + } + } + // type not supported yet + ParquetStatistics::FixedLenByteArray(s) => { + match $target_arrow_type { + // just support the decimal data type + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(from_bytes_to_i128(s.$bytes_func())), + *precision, + *scale, + )) + } + _ => None, + } + } + } + }}; +} + +/// Lookups up the parquet column by name +/// +/// Returns the parquet column index and the corresponding arrow field +pub(crate) fn parquet_column<'a>( + parquet_schema: &SchemaDescriptor, + arrow_schema: &'a Schema, + name: &str, +) -> Option<(usize, &'a FieldRef)> { + let (root_idx, field) = arrow_schema.fields.find(name)?; + if field.data_type().is_nested() { + // Nested fields are not supported and require non-trivial logic + // to correctly walk the parquet schema accounting for the + // logical type rules - + // + // For example a ListArray could correspond to anything from 1 to 3 levels + // in the parquet schema + return None; + } + + // This could be made more efficient (#TBD) + let parquet_idx = (0..parquet_schema.columns().len()) + .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; + Some((parquet_idx, field)) +} + +/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] +pub(crate) fn min_statistics<'a, I: Iterator>>( + data_type: &DataType, + iterator: I, +) -> Result { + let scalars = iterator + .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type)))); + collect_scalars(data_type, scalars) +} + +/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] +pub(crate) fn max_statistics<'a, I: Iterator>>( + data_type: &DataType, + iterator: I, +) -> Result { + let scalars = iterator + .map(|x| x.and_then(|s| get_statistic!(s, max, max_bytes, Some(data_type)))); + collect_scalars(data_type, scalars) +} + +/// Builds an array from an iterator of ScalarValue +fn collect_scalars>>( + data_type: &DataType, + iterator: I, +) -> Result { + let mut scalars = iterator.peekable(); + match scalars.peek().is_none() { + true => Ok(new_empty_array(data_type)), + false => { + let null = ScalarValue::try_from(data_type)?; + ScalarValue::iter_to_array(scalars.map(|x| x.unwrap_or_else(|| null.clone()))) + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow_array::{ + new_null_array, Array, BinaryArray, BooleanArray, Decimal128Array, Float32Array, + Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray, + TimestampNanosecondArray, + }; + use arrow_schema::{Field, SchemaRef}; + use bytes::Bytes; + use datafusion_common::test_util::parquet_test_data; + use parquet::arrow::arrow_reader::ArrowReaderBuilder; + use parquet::arrow::arrow_writer::ArrowWriter; + use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; + use parquet::file::properties::{EnabledStatistics, WriterProperties}; + use std::path::PathBuf; + use std::sync::Arc; + + // TODO error cases (with parquet statistics that are mismatched in expected type) + + #[test] + fn roundtrip_empty() { + let empty_bool_array = new_empty_array(&DataType::Boolean); + Test { + input: empty_bool_array.clone(), + expected_min: empty_bool_array.clone(), + expected_max: empty_bool_array.clone(), + } + .run() + } + + #[test] + fn roundtrip_bool() { + Test { + input: bool_array([ + // row group 1 + Some(true), + None, + Some(true), + // row group 2 + Some(true), + Some(false), + None, + // row group 3 + None, + None, + None, + ]), + expected_min: bool_array([Some(true), Some(false), None]), + expected_max: bool_array([Some(true), Some(true), None]), + } + .run() + } + + #[test] + fn roundtrip_int32() { + Test { + input: i32_array([ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(0), + Some(5), + None, + // row group 3 + None, + None, + None, + ]), + expected_min: i32_array([Some(1), Some(0), None]), + expected_max: i32_array([Some(3), Some(5), None]), + } + .run() + } + + #[test] + fn roundtrip_int64() { + Test { + input: i64_array([ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(0), + Some(5), + None, + // row group 3 + None, + None, + None, + ]), + expected_min: i64_array([Some(1), Some(0), None]), + expected_max: i64_array(vec![Some(3), Some(5), None]), + } + .run() + } + + #[test] + fn roundtrip_f32() { + Test { + input: f32_array([ + // row group 1 + Some(1.0), + None, + Some(3.0), + // row group 2 + Some(-1.0), + Some(5.0), + None, + // row group 3 + None, + None, + None, + ]), + expected_min: f32_array([Some(1.0), Some(-1.0), None]), + expected_max: f32_array([Some(3.0), Some(5.0), None]), + } + .run() + } + + #[test] + fn roundtrip_f64() { + Test { + input: f64_array([ + // row group 1 + Some(1.0), + None, + Some(3.0), + // row group 2 + Some(-1.0), + Some(5.0), + None, + // row group 3 + None, + None, + None, + ]), + expected_min: f64_array([Some(1.0), Some(-1.0), None]), + expected_max: f64_array([Some(3.0), Some(5.0), None]), + } + .run() + } + + #[test] + #[should_panic( + expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Int64, got TimestampNanosecond(NULL, None)" + )] + // Due to https://github.com/apache/arrow-datafusion/issues/8295 + fn roundtrip_timestamp() { + Test { + input: timestamp_array([ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ]), + expected_min: timestamp_array([Some(1), Some(5), None]), + expected_max: timestamp_array([Some(3), Some(9), None]), + } + .run() + } + + #[test] + fn roundtrip_decimal() { + Test { + input: Arc::new( + Decimal128Array::from(vec![ + // row group 1 + Some(100), + None, + Some(22000), + // row group 2 + Some(500000), + Some(330000), + None, + // row group 3 + None, + None, + None, + ]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + expected_min: Arc::new( + Decimal128Array::from(vec![Some(100), Some(330000), None]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + expected_max: Arc::new( + Decimal128Array::from(vec![Some(22000), Some(500000), None]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + } + .run() + } + + #[test] + fn roundtrip_utf8() { + Test { + input: utf8_array([ + // row group 1 + Some("A"), + None, + Some("Q"), + // row group 2 + Some("ZZ"), + Some("AA"), + None, + // row group 3 + None, + None, + None, + ]), + expected_min: utf8_array([Some("A"), Some("AA"), None]), + expected_max: utf8_array([Some("Q"), Some("ZZ"), None]), + } + .run() + } + + #[test] + fn roundtrip_struct() { + let mut test = Test { + input: struct_array(vec![ + // row group 1 + (Some(true), Some(1)), + (None, None), + (Some(true), Some(3)), + // row group 2 + (Some(true), Some(0)), + (Some(false), Some(5)), + (None, None), + // row group 3 + (None, None), + (None, None), + (None, None), + ]), + expected_min: struct_array(vec![ + (Some(true), Some(1)), + (Some(true), Some(0)), + (None, None), + ]), + + expected_max: struct_array(vec![ + (Some(true), Some(3)), + (Some(true), Some(0)), + (None, None), + ]), + }; + // Due to https://github.com/apache/arrow-datafusion/issues/8334, + // statistics for struct arrays are not supported + test.expected_min = + new_null_array(test.input.data_type(), test.expected_min.len()); + test.expected_max = + new_null_array(test.input.data_type(), test.expected_min.len()); + test.run() + } + + #[test] + #[should_panic( + expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Utf8, got Binary(NULL)" + )] + // Due to https://github.com/apache/arrow-datafusion/issues/8295 + fn roundtrip_binary() { + Test { + input: Arc::new(BinaryArray::from_opt_vec(vec![ + // row group 1 + Some(b"A"), + None, + Some(b"Q"), + // row group 2 + Some(b"ZZ"), + Some(b"AA"), + None, + // row group 3 + None, + None, + None, + ])), + expected_min: Arc::new(BinaryArray::from_opt_vec(vec![ + Some(b"A"), + Some(b"AA"), + None, + ])), + expected_max: Arc::new(BinaryArray::from_opt_vec(vec![ + Some(b"Q"), + Some(b"ZZ"), + None, + ])), + } + .run() + } + + #[test] + fn struct_and_non_struct() { + // Ensures that statistics for an array that appears *after* a struct + // array are not wrong + let struct_col = struct_array(vec![ + // row group 1 + (Some(true), Some(1)), + (None, None), + (Some(true), Some(3)), + ]); + let int_col = i32_array([Some(100), Some(200), Some(300)]); + let expected_min = i32_array([Some(100)]); + let expected_max = i32_array(vec![Some(300)]); + + // use a name that shadows a name in the struct column + match struct_col.data_type() { + DataType::Struct(fields) => { + assert_eq!(fields.get(1).unwrap().name(), "int_col") + } + _ => panic!("unexpected data type for struct column"), + }; + + let input_batch = RecordBatch::try_from_iter([ + ("struct_col", struct_col), + ("int_col", int_col), + ]) + .unwrap(); + + let schema = input_batch.schema(); + + let metadata = parquet_metadata(schema.clone(), input_batch); + let parquet_schema = metadata.file_metadata().schema_descr(); + + // read the int_col statistics + let (idx, _) = parquet_column(parquet_schema, &schema, "int_col").unwrap(); + assert_eq!(idx, 2); + + let row_groups = metadata.row_groups(); + let iter = row_groups.iter().map(|x| x.column(idx).statistics()); + + let min = min_statistics(&DataType::Int32, iter.clone()).unwrap(); + assert_eq!( + &min, + &expected_min, + "Min. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + + let max = max_statistics(&DataType::Int32, iter).unwrap(); + assert_eq!( + &max, + &expected_max, + "Max. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + } + + #[test] + fn nan_in_stats() { + // /parquet-testing/data/nan_in_stats.parquet + // row_groups: 1 + // "x": Double({min: Some(1.0), max: Some(NaN), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + + TestFile::new("nan_in_stats.parquet") + .with_column(ExpectedColumn { + name: "x", + expected_min: Arc::new(Float64Array::from(vec![Some(1.0)])), + expected_max: Arc::new(Float64Array::from(vec![Some(f64::NAN)])), + }) + .run(); + } + + #[test] + fn alltypes_plain() { + // /parquet-testing/data/datapage_v1-snappy-compressed-checksum.parquet + // row_groups: 1 + // (has no statistics) + TestFile::new("alltypes_plain.parquet") + // No column statistics should be read as NULL, but with the right type + .with_column(ExpectedColumn { + name: "id", + expected_min: i32_array([None]), + expected_max: i32_array([None]), + }) + .with_column(ExpectedColumn { + name: "bool_col", + expected_min: bool_array([None]), + expected_max: bool_array([None]), + }) + .run(); + } + + #[test] + fn alltypes_tiny_pages() { + // /parquet-testing/data/alltypes_tiny_pages.parquet + // row_groups: 1 + // "id": Int32({min: Some(0), max: Some(7299), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "bool_col": Boolean({min: Some(false), max: Some(true), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "tinyint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "smallint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "int_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "bigint_col": Int64({min: Some(0), max: Some(90), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "float_col": Float({min: Some(0.0), max: Some(9.9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "double_col": Double({min: Some(0.0), max: Some(90.89999999999999), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "date_string_col": ByteArray({min: Some(ByteArray { data: "01/01/09" }), max: Some(ByteArray { data: "12/31/10" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "string_col": ByteArray({min: Some(ByteArray { data: "0" }), max: Some(ByteArray { data: "9" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "timestamp_col": Int96({min: None, max: None, distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) + // "year": Int32({min: Some(2009), max: Some(2010), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "month": Int32({min: Some(1), max: Some(12), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + TestFile::new("alltypes_tiny_pages.parquet") + .with_column(ExpectedColumn { + name: "id", + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(7299)]), + }) + .with_column(ExpectedColumn { + name: "bool_col", + expected_min: bool_array([Some(false)]), + expected_max: bool_array([Some(true)]), + }) + .with_column(ExpectedColumn { + name: "tinyint_col", + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(9)]), + }) + .with_column(ExpectedColumn { + name: "smallint_col", + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(9)]), + }) + .with_column(ExpectedColumn { + name: "int_col", + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(9)]), + }) + .with_column(ExpectedColumn { + name: "bigint_col", + expected_min: i64_array([Some(0)]), + expected_max: i64_array([Some(90)]), + }) + .with_column(ExpectedColumn { + name: "float_col", + expected_min: f32_array([Some(0.0)]), + expected_max: f32_array([Some(9.9)]), + }) + .with_column(ExpectedColumn { + name: "double_col", + expected_min: f64_array([Some(0.0)]), + expected_max: f64_array([Some(90.89999999999999)]), + }) + .with_column(ExpectedColumn { + name: "date_string_col", + expected_min: utf8_array([Some("01/01/09")]), + expected_max: utf8_array([Some("12/31/10")]), + }) + .with_column(ExpectedColumn { + name: "string_col", + expected_min: utf8_array([Some("0")]), + expected_max: utf8_array([Some("9")]), + }) + // File has no min/max for timestamp_col + .with_column(ExpectedColumn { + name: "timestamp_col", + expected_min: timestamp_array([None]), + expected_max: timestamp_array([None]), + }) + .with_column(ExpectedColumn { + name: "year", + expected_min: i32_array([Some(2009)]), + expected_max: i32_array([Some(2010)]), + }) + .with_column(ExpectedColumn { + name: "month", + expected_min: i32_array([Some(1)]), + expected_max: i32_array([Some(12)]), + }) + .run(); + } + + #[test] + fn fixed_length_decimal_legacy() { + // /parquet-testing/data/fixed_length_decimal_legacy.parquet + // row_groups: 1 + // "value": FixedLenByteArray({min: Some(FixedLenByteArray(ByteArray { data: Some(ByteBufferPtr { data: b"\0\0\0\0\0\xc8" }) })), max: Some(FixedLenByteArray(ByteArray { data: "\0\0\0\0\t`" })), distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) + + TestFile::new("fixed_length_decimal_legacy.parquet") + .with_column(ExpectedColumn { + name: "value", + expected_min: Arc::new( + Decimal128Array::from(vec![Some(200)]) + .with_precision_and_scale(13, 2) + .unwrap(), + ), + expected_max: Arc::new( + Decimal128Array::from(vec![Some(2400)]) + .with_precision_and_scale(13, 2) + .unwrap(), + ), + }) + .run(); + } + + const ROWS_PER_ROW_GROUP: usize = 3; + + /// Writes the input batch into a parquet file, with every every three rows as + /// their own row group, and compares the min/maxes to the expected values + struct Test { + input: ArrayRef, + expected_min: ArrayRef, + expected_max: ArrayRef, + } + + impl Test { + fn run(self) { + let Self { + input, + expected_min, + expected_max, + } = self; + + let input_batch = RecordBatch::try_from_iter([("c1", input)]).unwrap(); + + let schema = input_batch.schema(); + + let metadata = parquet_metadata(schema.clone(), input_batch); + let parquet_schema = metadata.file_metadata().schema_descr(); + + let row_groups = metadata.row_groups(); + + for field in schema.fields() { + if field.data_type().is_nested() { + let lookup = parquet_column(parquet_schema, &schema, field.name()); + assert_eq!(lookup, None); + continue; + } + + let (idx, f) = + parquet_column(parquet_schema, &schema, field.name()).unwrap(); + assert_eq!(f, field); + + let iter = row_groups.iter().map(|x| x.column(idx).statistics()); + let min = min_statistics(f.data_type(), iter.clone()).unwrap(); + assert_eq!( + &min, + &expected_min, + "Min. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + + let max = max_statistics(f.data_type(), iter).unwrap(); + assert_eq!( + &max, + &expected_max, + "Max. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + } + } + } + + /// Write the specified batches out as parquet and return the metadata + fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) -> Arc { + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .set_max_row_group_size(ROWS_PER_ROW_GROUP) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::try_new(Bytes::from(buffer)).unwrap(); + reader.metadata().clone() + } + + /// Formats the statistics nicely for display + struct DisplayStats<'a>(&'a [RowGroupMetaData]); + impl<'a> std::fmt::Display for DisplayStats<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let row_groups = self.0; + writeln!(f, " row_groups: {}", row_groups.len())?; + for rg in row_groups { + for col in rg.columns() { + if let Some(statistics) = col.statistics() { + writeln!(f, " {}: {:?}", col.column_path(), statistics)?; + } + } + } + Ok(()) + } + } + + struct ExpectedColumn { + name: &'static str, + expected_min: ArrayRef, + expected_max: ArrayRef, + } + + /// Reads statistics out of the specified, and compares them to the expected values + struct TestFile { + file_name: &'static str, + expected_columns: Vec, + } + + impl TestFile { + fn new(file_name: &'static str) -> Self { + Self { + file_name, + expected_columns: Vec::new(), + } + } + + fn with_column(mut self, column: ExpectedColumn) -> Self { + self.expected_columns.push(column); + self + } + + /// Reads the specified parquet file and validates that the exepcted min/max + /// values for the specified columns are as expected. + fn run(self) { + let path = PathBuf::from(parquet_test_data()).join(self.file_name); + let file = std::fs::File::open(path).unwrap(); + let reader = ArrowReaderBuilder::try_new(file).unwrap(); + let arrow_schema = reader.schema(); + let metadata = reader.metadata(); + let row_groups = metadata.row_groups(); + let parquet_schema = metadata.file_metadata().schema_descr(); + + for expected_column in self.expected_columns { + let ExpectedColumn { + name, + expected_min, + expected_max, + } = expected_column; + + let (idx, field) = + parquet_column(parquet_schema, arrow_schema, name).unwrap(); + + let iter = row_groups.iter().map(|x| x.column(idx).statistics()); + let actual_min = min_statistics(field.data_type(), iter.clone()).unwrap(); + assert_eq!(&expected_min, &actual_min, "column {name}"); + + let actual_max = max_statistics(field.data_type(), iter).unwrap(); + assert_eq!(&expected_max, &actual_max, "column {name}"); + } + } + } + + fn bool_array(input: impl IntoIterator>) -> ArrayRef { + let array: BooleanArray = input.into_iter().collect(); + Arc::new(array) + } + + fn i32_array(input: impl IntoIterator>) -> ArrayRef { + let array: Int32Array = input.into_iter().collect(); + Arc::new(array) + } + + fn i64_array(input: impl IntoIterator>) -> ArrayRef { + let array: Int64Array = input.into_iter().collect(); + Arc::new(array) + } + + fn f32_array(input: impl IntoIterator>) -> ArrayRef { + let array: Float32Array = input.into_iter().collect(); + Arc::new(array) + } + + fn f64_array(input: impl IntoIterator>) -> ArrayRef { + let array: Float64Array = input.into_iter().collect(); + Arc::new(array) + } + + fn timestamp_array(input: impl IntoIterator>) -> ArrayRef { + let array: TimestampNanosecondArray = input.into_iter().collect(); + Arc::new(array) + } + + fn utf8_array<'a>(input: impl IntoIterator>) -> ArrayRef { + let array: StringArray = input + .into_iter() + .map(|s| s.map(|s| s.to_string())) + .collect(); + Arc::new(array) + } + + // returns a struct array with columns "bool_col" and "int_col" with the specified values + fn struct_array(input: Vec<(Option, Option)>) -> ArrayRef { + let boolean: BooleanArray = input.iter().map(|(b, _i)| b).collect(); + let int: Int32Array = input.iter().map(|(_b, i)| i).collect(); + + let nullable = true; + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("bool_col", DataType::Boolean, nullable)), + Arc::new(boolean) as ArrayRef, + ), + ( + Arc::new(Field::new("int_col", DataType::Int32, nullable)), + Arc::new(int) as ArrayRef, + ), + ]); + Arc::new(struct_array) + } +}