diff --git a/crates/polars-expr/src/expressions/binary.rs b/crates/polars-expr/src/expressions/binary.rs index a446276ce9b6..b3ba90a21447 100644 --- a/crates/polars-expr/src/expressions/binary.rs +++ b/crates/polars-expr/src/expressions/binary.rs @@ -287,13 +287,8 @@ impl PhysicalExpr for BinaryExpr { _ => return None, }; - let Some(value) = other.evaluate_inline() else { - return None; - }; - - let Some(value) = value.as_scalar_column() else { - return None; - }; + let value = other.evaluate_inline()?; + let value = value.as_scalar_column()?; let scalar = value.scalar(); use Operator as O; diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index c41ca33ff294..6f39e634ebc2 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -297,7 +297,13 @@ fn rg_to_dfs_prefiltered( } } - let do_parquet_expr = std::env::var("POLARS_PARQUET_EXPR").as_deref() == Ok("1"); + let do_parquet_expr = std::env::var("POLARS_NO_PARQUET_EXPR").as_deref() != Ok("1") + && live_columns.len() == 1 + && !schema + .get(live_columns[0].as_str()) + .unwrap() + .dtype() + .is_nested(); let column_exprs = do_parquet_expr.then(|| { live_columns .iter() @@ -381,7 +387,9 @@ fn rg_to_dfs_prefiltered( let (mut series, pred_true_mask) = column_idx_to_series(col_idx, part.as_slice(), filter, schema, store)?; - debug_assert!(pred_true_mask.is_empty() || pred_true_mask.len() == md.num_rows()); + debug_assert!( + pred_true_mask.is_empty() || pred_true_mask.len() == md.num_rows() + ); match equals_scalar { None => { try_set_sorted_flag(&mut series, col_idx, &sorting_map); @@ -400,51 +408,72 @@ fn rg_to_dfs_prefiltered( // Apply the predicate to the live columns and save the dataframe and the bitmask let md = &file_metadata.row_groups[rg_idx]; + let filter_mask: Bitmap; let mut df: DataFrame; - let filter_mask = match &filters[0] { - None => { - // Create without hive columns - the first merge phase does not handle hive partitions. This also saves - // some unnecessary filtering. - df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) }; - - materialize_hive_partitions( - &mut df, - schema.as_ref(), - hive_partition_columns, - md.num_rows(), + if let Some(Some(f)) = filters.first() { + if f.set_bits() == 0 { + if config::verbose() { + eprintln!("parquet filter mask found that row group can be skipped"); + } + + return Ok(None); + } + + if let Some(rc) = &row_index { + df = unsafe { DataFrame::new_no_checks(md.num_rows(), vec![]) }; + df.with_row_index_mut( + rc.name.clone(), + Some(rg_offsets[rg_idx] + rc.offset), ); + df = df.filter(&BooleanChunked::from_chunk_iter( + PlSmallStr::EMPTY, + [BooleanArray::new(ArrowDataType::Boolean, f.clone(), None)], + ))?; + unsafe { df.column_extend_unchecked(live_columns) }; + } else { + df = DataFrame::new(live_columns).unwrap(); + } + filter_mask = f.clone(); + } else { + df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns.clone()) }; + + materialize_hive_partitions( + &mut df, + schema.as_ref(), + hive_partition_columns, + md.num_rows(), + ); - let s = predicate.evaluate_io(&df)?; - let mask = s.bool().expect("filter predicates was not of type boolean"); + let s = predicate.evaluate_io(&df)?; + let mask = s.bool().expect("filter predicates was not of type boolean"); - if let Some(rc) = &row_index { - df.with_row_index_mut( - rc.name.clone(), - Some(rg_offsets[rg_idx] + rc.offset), - ); - } - df = df.filter(mask)?; + // Create without hive columns - the first merge phase does not handle hive partitions. This also saves + // some unnecessary filtering. + df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) }; - let mut filter_mask = MutableBitmap::with_capacity(mask.len()); + if let Some(rc) = &row_index { + df.with_row_index_mut( + rc.name.clone(), + Some(rg_offsets[rg_idx] + rc.offset), + ); + } + df = df.filter(mask)?; - // We need to account for the validity of the items - for chunk in mask.downcast_iter() { - match chunk.validity() { - None => filter_mask.extend_from_bitmap(chunk.values()), - Some(validity) => { - filter_mask.extend_from_bitmap(&(validity & chunk.values())) - }, - } + let mut mut_filter_mask = MutableBitmap::with_capacity(mask.len()); + + // We need to account for the validity of the items + for chunk in mask.downcast_iter() { + match chunk.validity() { + None => mut_filter_mask.extend_from_bitmap(chunk.values()), + Some(validity) => { + mut_filter_mask.extend_from_bitmap(&(validity & chunk.values())) + }, } + } - filter_mask.freeze() - }, - Some(f) => { - df = unsafe { DataFrame::new_no_checks(f.set_bits(), live_columns) }; - f.clone() - }, - }; + filter_mask = mut_filter_mask.freeze(); + } debug_assert_eq!(md.num_rows(), filter_mask.len()); debug_assert_eq!(df.height(), filter_mask.set_bits()); diff --git a/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/mod.rs index a74413ee9169..84908554435e 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/mod.rs @@ -56,6 +56,7 @@ impl IndexMapping for usize { } } +#[allow(clippy::too_many_arguments)] pub fn decode_dict( values: HybridRleDecoder<'_>, dict: &[T], @@ -81,6 +82,7 @@ pub fn decode_dict( } #[inline(never)] +#[allow(clippy::too_many_arguments)] pub fn decode_dict_dispatch>( mut values: HybridRleDecoder<'_>, dict: D, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/predicate.rs b/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/predicate.rs index 07db87fab9f9..2af9c3cf933d 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/predicate.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/predicate.rs @@ -38,12 +38,10 @@ pub fn decode>( .set_bits(); target.resize(target.len() + num_values, dict.get(needle as u32).unwrap()); } + } else if predicate.include_values { + decode_multiple_values(values, dict, dict_mask, target, pred_true_mask)?; } else { - if predicate.include_values { - decode_multiple_values(values, dict, dict_mask, target, pred_true_mask)?; - } else { - decode_multiple_no_values(values, dict_mask, pred_true_mask)?; - } + decode_multiple_no_values(values, dict_mask, pred_true_mask)?; } assert_eq!(expected_pred_true_mask_len, pred_true_mask.len()); diff --git a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs index 039d2e90ab5a..2c8e606d0059 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs @@ -174,7 +174,7 @@ impl utils::Decoded for (FSBVec, MutableBitmap) { fn len(&self) -> usize { self.0.len() } - + fn extend_nulls(&mut self, n: usize) { match &mut self.0 { FSBVec::Size1(v) => v.resize(v.len() + n, Zeroable::zeroed()), @@ -190,6 +190,7 @@ impl utils::Decoded for (FSBVec, MutableBitmap) { } } +#[allow(clippy::too_many_arguments)] fn decode_fsb_plain( size: usize, values: &[u8], @@ -547,8 +548,13 @@ impl Decoder for BinaryDecoder { additional: &dyn arrow::array::Array, is_optional: bool, ) -> ParquetResult<()> { - let additional = additional.as_any().downcast_ref::().unwrap(); - decoded.0.extend_from_byte_slice(additional.values().as_slice()); + let additional = additional + .as_any() + .downcast_ref::() + .unwrap(); + decoded + .0 + .extend_from_byte_slice(additional.values().as_slice()); match additional.validity() { Some(v) => decoded.1.extend_from_bitmap(v), None if is_optional => decoded.1.extend_constant(additional.len(), true), diff --git a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs index c8cde37226d9..fd491ef4da7d 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs @@ -194,6 +194,7 @@ pub fn column_iter_to_arrays( field: Field, filter: Option, ) -> PolarsResult<(Box, Bitmap)> { - let (_, array, pred_true_mask) = columns_to_iter_recursive(columns, types, field, vec![], filter)?; + let (_, array, pred_true_mask) = + columns_to_iter_recursive(columns, types, field, vec![], filter)?; Ok((array, pred_true_mask)) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs index 681edf4c7cb9..b6df03d894c4 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs @@ -389,7 +389,7 @@ pub fn columns_to_iter_recursive( // Is this inefficient? Yes. Is this how we are going to do it for now? Yes. let Some(last_field) = fields.last() else { - return Err(ParquetError::not_supported("Struct has zero fields").into()); + return Err(ParquetError::not_supported("Struct has zero fields")); }; let field_to_nested_array = diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs index a01c90432d26..1a16b8ff1e56 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs @@ -601,7 +601,10 @@ impl PageNestedDecoder { }) } - pub fn collect(mut self, filter: Option) -> ParquetResult<(NestedState, D::Output, Bitmap)> { + pub fn collect( + mut self, + filter: Option, + ) -> ParquetResult<(NestedState, D::Output, Bitmap)> { // @TODO: We should probably count the filter so that we don't overallocate let mut target = self.decoder.with_capacity(self.iter.total_num_values()); // @TODO: Self capacity @@ -719,7 +722,10 @@ impl PageNestedDecoder { Ok((nested_state, array, Bitmap::new())) } - pub fn collect_boxed(self, filter: Option) -> ParquetResult<(NestedState, Box, Bitmap)> { + pub fn collect_boxed( + self, + filter: Option, + ) -> ParquetResult<(NestedState, Box, Bitmap)> { use arrow::array::IntoBoxedArray; let (nested, array, ptm) = self.collect(filter)?; Ok((nested, array.into_boxed(), ptm)) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/null.rs b/crates/polars-parquet/src/arrow/read/deserialize/null.rs index 05ec35f4d9e9..ac639dba2b98 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/null.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/null.rs @@ -85,7 +85,6 @@ impl utils::Decoder for NullDecoder { Ok(()) } - fn finalize( &self, dtype: ArrowDataType, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs index 49c574471880..1e97f45c38e6 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs @@ -222,7 +222,10 @@ where additional: &dyn arrow::array::Array, is_optional: bool, ) -> ParquetResult<()> { - let additional = additional.as_any().downcast_ref::>().unwrap(); + let additional = additional + .as_any() + .downcast_ref::>() + .unwrap(); decoded.0.extend(additional.values().iter().copied()); match additional.validity() { Some(v) => decoded.1.extend_from_bitmap(v), diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/mod.rs index 6f4311d7229b..c9a840b80dea 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/mod.rs @@ -14,6 +14,7 @@ use crate::read::{Filter, ParquetError}; mod predicate; mod required; +#[allow(clippy::too_many_arguments)] pub fn decode>( values: &[u8], is_optional: bool, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/required.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/required.rs index 3d133daf2705..d3a6196a8e23 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/required.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/required.rs @@ -1,8 +1,7 @@ use arrow::types::AlignedBytes; -use crate::parquet::error::ParquetResult; - use super::ArrayChunks; +use crate::parquet::error::ParquetResult; #[inline(never)] pub fn decode( @@ -32,4 +31,3 @@ pub fn decode( Ok(()) } - diff --git a/crates/polars-parquet/src/arrow/read/deserialize/simple.rs b/crates/polars-parquet/src/arrow/read/deserialize/simple.rs index 4938f0450146..35ed6a2071bf 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/simple.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/simple.rs @@ -233,9 +233,9 @@ pub fn page_iter_to_array( ) }, (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => { - return Err(ParquetError::not_supported( + return Err(ParquetError::not_supported(format!( "Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}", - )); + ))); }, (PhysicalType::Int32, Date64) => PageDecoder::new( pages, diff --git a/crates/polars-parquet/src/arrow/read/expr.rs b/crates/polars-parquet/src/arrow/read/expr.rs index 3b59211e48b9..8a18e724cf56 100644 --- a/crates/polars-parquet/src/arrow/read/expr.rs +++ b/crates/polars-parquet/src/arrow/read/expr.rs @@ -35,16 +35,36 @@ impl ParquetScalar { pub(crate) fn to_aligned_bytes(&self) -> Option { match self { - Self::Int8(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), - Self::Int16(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), - Self::Int32(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), - Self::Int64(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), - Self::UInt8(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), - Self::UInt16(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), - Self::UInt32(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), - Self::UInt64(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), - Self::Float32(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), - Self::Float64(v) => ::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned), + Self::Int8(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), + Self::Int16(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), + Self::Int32(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), + Self::Int64(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), + Self::UInt8(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), + Self::UInt16(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), + Self::UInt32(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), + Self::UInt64(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), + Self::Float32(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), + Self::Float64(v) => ::try_from(&v.to_le_bytes()) + .ok() + .map(B::from_unaligned), _ => None, } } diff --git a/crates/polars-parquet/src/parquet/page/mod.rs b/crates/polars-parquet/src/parquet/page/mod.rs index d8f39ddfda2e..ac0889d30a7e 100644 --- a/crates/polars-parquet/src/parquet/page/mod.rs +++ b/crates/polars-parquet/src/parquet/page/mod.rs @@ -6,8 +6,8 @@ use crate::parquet::metadata::Descriptor; pub use crate::parquet::parquet_bridge::{DataPageHeaderExt, PageType}; use crate::parquet::statistics::Statistics; pub use crate::parquet::thrift_format::{ - DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, PageHeader as ParquetPageHeader, - Encoding as FormatEncoding, + DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, Encoding as FormatEncoding, + PageHeader as ParquetPageHeader, }; pub enum PageResult {