Skip to content

Commit

Permalink
finalize
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Jan 15, 2025
1 parent 067c954 commit 96400e4
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 76 deletions.
9 changes: 2 additions & 7 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,8 @@ impl PhysicalExpr for BinaryExpr {
_ => return None,
};

let Some(value) = other.evaluate_inline() else {
return None;
};

let Some(value) = value.as_scalar_column() else {
return None;
};
let value = other.evaluate_inline()?;
let value = value.as_scalar_column()?;

let scalar = value.scalar();
use Operator as O;
Expand Down
105 changes: 67 additions & 38 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,13 @@ fn rg_to_dfs_prefiltered(
}
}

let do_parquet_expr = std::env::var("POLARS_PARQUET_EXPR").as_deref() == Ok("1");
let do_parquet_expr = std::env::var("POLARS_NO_PARQUET_EXPR").as_deref() != Ok("1")
&& live_columns.len() == 1
&& !schema
.get(live_columns[0].as_str())
.unwrap()
.dtype()
.is_nested();
let column_exprs = do_parquet_expr.then(|| {
live_columns
.iter()
Expand Down Expand Up @@ -381,7 +387,9 @@ fn rg_to_dfs_prefiltered(
let (mut series, pred_true_mask) =
column_idx_to_series(col_idx, part.as_slice(), filter, schema, store)?;

debug_assert!(pred_true_mask.is_empty() || pred_true_mask.len() == md.num_rows());
debug_assert!(
pred_true_mask.is_empty() || pred_true_mask.len() == md.num_rows()
);
match equals_scalar {
None => {
try_set_sorted_flag(&mut series, col_idx, &sorting_map);
Expand All @@ -400,51 +408,72 @@ fn rg_to_dfs_prefiltered(

// Apply the predicate to the live columns and save the dataframe and the bitmask
let md = &file_metadata.row_groups[rg_idx];
let filter_mask: Bitmap;
let mut df: DataFrame;

let filter_mask = match &filters[0] {
None => {
// Create without hive columns - the first merge phase does not handle hive partitions. This also saves
// some unnecessary filtering.
df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) };

materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
if let Some(Some(f)) = filters.first() {
if f.set_bits() == 0 {
if config::verbose() {
eprintln!("parquet filter mask found that row group can be skipped");
}

return Ok(None);
}

if let Some(rc) = &row_index {
df = unsafe { DataFrame::new_no_checks(md.num_rows(), vec![]) };
df.with_row_index_mut(
rc.name.clone(),
Some(rg_offsets[rg_idx] + rc.offset),
);
df = df.filter(&BooleanChunked::from_chunk_iter(
PlSmallStr::EMPTY,
[BooleanArray::new(ArrowDataType::Boolean, f.clone(), None)],
))?;
unsafe { df.column_extend_unchecked(live_columns) };
} else {
df = DataFrame::new(live_columns).unwrap();
}
filter_mask = f.clone();
} else {
df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns.clone()) };

materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
);

let s = predicate.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");
let s = predicate.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");

if let Some(rc) = &row_index {
df.with_row_index_mut(
rc.name.clone(),
Some(rg_offsets[rg_idx] + rc.offset),
);
}
df = df.filter(mask)?;
// Create without hive columns - the first merge phase does not handle hive partitions. This also saves
// some unnecessary filtering.
df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) };

let mut filter_mask = MutableBitmap::with_capacity(mask.len());
if let Some(rc) = &row_index {
df.with_row_index_mut(
rc.name.clone(),
Some(rg_offsets[rg_idx] + rc.offset),
);
}
df = df.filter(mask)?;

// We need to account for the validity of the items
for chunk in mask.downcast_iter() {
match chunk.validity() {
None => filter_mask.extend_from_bitmap(chunk.values()),
Some(validity) => {
filter_mask.extend_from_bitmap(&(validity & chunk.values()))
},
}
let mut mut_filter_mask = MutableBitmap::with_capacity(mask.len());

// We need to account for the validity of the items
for chunk in mask.downcast_iter() {
match chunk.validity() {
None => mut_filter_mask.extend_from_bitmap(chunk.values()),
Some(validity) => {
mut_filter_mask.extend_from_bitmap(&(validity & chunk.values()))
},
}
}

filter_mask.freeze()
},
Some(f) => {
df = unsafe { DataFrame::new_no_checks(f.set_bits(), live_columns) };
f.clone()
},
};
filter_mask = mut_filter_mask.freeze();
}

debug_assert_eq!(md.num_rows(), filter_mask.len());
debug_assert_eq!(df.height(), filter_mask.set_bits());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl IndexMapping for usize {
}
}

#[allow(clippy::too_many_arguments)]
pub fn decode_dict<T: NativeType>(
values: HybridRleDecoder<'_>,
dict: &[T],
Expand All @@ -81,6 +82,7 @@ pub fn decode_dict<T: NativeType>(
}

#[inline(never)]
#[allow(clippy::too_many_arguments)]
pub fn decode_dict_dispatch<B: AlignedBytes, D: IndexMapping<Output = B>>(
mut values: HybridRleDecoder<'_>,
dict: D,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ pub fn decode<B: AlignedBytes, D: IndexMapping<Output = B>>(
.set_bits();
target.resize(target.len() + num_values, dict.get(needle as u32).unwrap());
}
} else if predicate.include_values {
decode_multiple_values(values, dict, dict_mask, target, pred_true_mask)?;
} else {
if predicate.include_values {
decode_multiple_values(values, dict, dict_mask, target, pred_true_mask)?;
} else {
decode_multiple_no_values(values, dict_mask, pred_true_mask)?;
}
decode_multiple_no_values(values, dict_mask, pred_true_mask)?;
}

assert_eq!(expected_pred_true_mask_len, pred_true_mask.len());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl utils::Decoded for (FSBVec, MutableBitmap) {
fn len(&self) -> usize {
self.0.len()
}

fn extend_nulls(&mut self, n: usize) {
match &mut self.0 {
FSBVec::Size1(v) => v.resize(v.len() + n, Zeroable::zeroed()),
Expand All @@ -190,6 +190,7 @@ impl utils::Decoded for (FSBVec, MutableBitmap) {
}
}

#[allow(clippy::too_many_arguments)]
fn decode_fsb_plain(
size: usize,
values: &[u8],
Expand Down Expand Up @@ -547,8 +548,13 @@ impl Decoder for BinaryDecoder {
additional: &dyn arrow::array::Array,
is_optional: bool,
) -> ParquetResult<()> {
let additional = additional.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap();
decoded.0.extend_from_byte_slice(additional.values().as_slice());
let additional = additional
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap();
decoded
.0
.extend_from_byte_slice(additional.values().as_slice());
match additional.validity() {
Some(v) => decoded.1.extend_from_bitmap(v),
None if is_optional => decoded.1.extend_constant(additional.len(), true),
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pub fn column_iter_to_arrays(
field: Field,
filter: Option<Filter>,
) -> PolarsResult<(Box<dyn Array>, Bitmap)> {
let (_, array, pred_true_mask) = columns_to_iter_recursive(columns, types, field, vec![], filter)?;
let (_, array, pred_true_mask) =
columns_to_iter_recursive(columns, types, field, vec![], filter)?;
Ok((array, pred_true_mask))
}
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/read/deserialize/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ pub fn columns_to_iter_recursive(
// Is this inefficient? Yes. Is this how we are going to do it for now? Yes.

let Some(last_field) = fields.last() else {
return Err(ParquetError::not_supported("Struct has zero fields").into());
return Err(ParquetError::not_supported("Struct has zero fields"));
};

let field_to_nested_array =
Expand Down
10 changes: 8 additions & 2 deletions crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,10 @@ impl<D: utils::Decoder> PageNestedDecoder<D> {
})
}

pub fn collect(mut self, filter: Option<Filter>) -> ParquetResult<(NestedState, D::Output, Bitmap)> {
pub fn collect(
mut self,
filter: Option<Filter>,
) -> ParquetResult<(NestedState, D::Output, Bitmap)> {
// @TODO: We should probably count the filter so that we don't overallocate
let mut target = self.decoder.with_capacity(self.iter.total_num_values());
// @TODO: Self capacity
Expand Down Expand Up @@ -719,7 +722,10 @@ impl<D: utils::Decoder> PageNestedDecoder<D> {
Ok((nested_state, array, Bitmap::new()))
}

pub fn collect_boxed(self, filter: Option<Filter>) -> ParquetResult<(NestedState, Box<dyn Array>, Bitmap)> {
pub fn collect_boxed(
self,
filter: Option<Filter>,
) -> ParquetResult<(NestedState, Box<dyn Array>, Bitmap)> {
use arrow::array::IntoBoxedArray;
let (nested, array, ptm) = self.collect(filter)?;
Ok((nested, array.into_boxed(), ptm))
Expand Down
1 change: 0 additions & 1 deletion crates/polars-parquet/src/arrow/read/deserialize/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ impl utils::Decoder for NullDecoder {
Ok(())
}


fn finalize(
&self,
dtype: ArrowDataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ where
additional: &dyn arrow::array::Array,
is_optional: bool,
) -> ParquetResult<()> {
let additional = additional.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
let additional = additional
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap();
decoded.0.extend(additional.values().iter().copied());
match additional.validity() {
Some(v) => decoded.1.extend_from_bitmap(v),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::read::{Filter, ParquetError};
mod predicate;
mod required;

#[allow(clippy::too_many_arguments)]
pub fn decode<P: ParquetNativeType, T: NativeType, D: DecoderFunction<P, T>>(
values: &[u8],
is_optional: bool,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use arrow::types::AlignedBytes;

use crate::parquet::error::ParquetResult;

use super::ArrayChunks;
use crate::parquet::error::ParquetResult;

#[inline(never)]
pub fn decode<B: AlignedBytes>(
Expand Down Expand Up @@ -32,4 +31,3 @@ pub fn decode<B: AlignedBytes>(

Ok(())
}

4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ pub fn page_iter_to_array(
)
},
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => {
return Err(ParquetError::not_supported(
return Err(ParquetError::not_supported(format!(
"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}",
));
)));
},
(PhysicalType::Int32, Date64) => PageDecoder::new(
pages,
Expand Down
40 changes: 30 additions & 10 deletions crates/polars-parquet/src/arrow/read/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,36 @@ impl ParquetScalar {

pub(crate) fn to_aligned_bytes<B: AlignedBytes>(&self) -> Option<B> {
match self {
Self::Int8(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::Int16(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::Int32(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::Int64(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::UInt8(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::UInt16(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::UInt32(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::UInt64(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::Float32(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::Float64(v) => <B::Unaligned>::try_from(&v.to_le_bytes()).ok().map(B::from_unaligned),
Self::Int8(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
Self::Int16(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
Self::Int32(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
Self::Int64(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
Self::UInt8(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
Self::UInt16(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
Self::UInt32(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
Self::UInt64(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
Self::Float32(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
Self::Float64(v) => <B::Unaligned>::try_from(&v.to_le_bytes())
.ok()
.map(B::from_unaligned),
_ => None,
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/parquet/page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::parquet::metadata::Descriptor;
pub use crate::parquet::parquet_bridge::{DataPageHeaderExt, PageType};
use crate::parquet::statistics::Statistics;
pub use crate::parquet::thrift_format::{
DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, PageHeader as ParquetPageHeader,
Encoding as FormatEncoding,
DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, Encoding as FormatEncoding,
PageHeader as ParquetPageHeader,
};

pub enum PageResult {
Expand Down

0 comments on commit 96400e4

Please sign in to comment.