From 7482af039364dc45d1322a8eadad3ee26dc25a44 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 10 Jan 2025 16:37:16 +0100 Subject: [PATCH] Much further with proper implementation --- .../chunked_array/logical/categorical/mod.rs | 10 +- .../src/chunked_array/logical/date.rs | 2 +- .../src/chunked_array/logical/datetime.rs | 2 +- .../src/chunked_array/logical/duration.rs | 2 +- .../src/chunked_array/logical/mod.rs | 28 ++ .../src/chunked_array/logical/time.rs | 2 +- crates/polars-core/src/chunked_array/mod.rs | 114 +++++++ .../src/chunked_array/ops/any_value.rs | 63 +--- .../polars-core/src/chunked_array/ops/mod.rs | 58 +++- crates/polars-core/src/datatypes/any_value.rs | 75 +++-- crates/polars-core/src/fmt.rs | 4 +- crates/polars-core/src/frame/column/mod.rs | 20 ++ crates/polars-core/src/series/any_value.rs | 8 +- .../src/series/implementations/array.rs | 13 + .../src/series/implementations/binary.rs | 13 + .../series/implementations/binary_offset.rs | 10 + .../src/series/implementations/boolean.rs | 13 + .../src/series/implementations/categorical.rs | 12 + .../src/series/implementations/date.rs | 11 + .../src/series/implementations/datetime.rs | 13 + .../src/series/implementations/decimal.rs | 11 + .../src/series/implementations/duration.rs | 11 + .../src/series/implementations/floats.rs | 13 + .../src/series/implementations/list.rs | 11 + .../src/series/implementations/mod.rs | 25 ++ .../src/series/implementations/null.rs | 37 ++ .../src/series/implementations/object.rs | 13 +- .../src/series/implementations/string.rs | 13 + .../src/series/implementations/struct_.rs | 11 + .../src/series/implementations/time.rs | 11 + crates/polars-core/src/series/series_trait.rs | 8 + crates/polars-core/src/utils/mod.rs | 318 +++++++++++++++++- .../src/plans/conversion/type_coercion/mod.rs | 2 +- crates/polars-plan/src/plans/lit.rs | 2 +- .../polars-python/src/conversion/any_value.rs | 4 +- crates/polars-python/src/functions/eager.rs | 40 +-- 36 files changed, 850 insertions(+), 153 deletions(-) diff --git a/crates/polars-core/src/chunked_array/logical/categorical/mod.rs b/crates/polars-core/src/chunked_array/logical/categorical/mod.rs index 92376f4abe4d..7c7f6ab1974d 100644 --- a/crates/polars-core/src/chunked_array/logical/categorical/mod.rs +++ b/crates/polars-core/src/chunked_array/logical/categorical/mod.rs @@ -40,6 +40,10 @@ impl CategoricalChunked { Field::new(name.clone(), self.dtype().clone()) } + pub fn logical_mut(&mut self) -> &mut Logical { + &mut self.physical + } + pub fn is_empty(&self) -> bool { self.len() == 0 } @@ -347,8 +351,8 @@ impl LogicalType for CategoricalChunked { match self.physical.0.get_unchecked(i) { Some(i) => match self.dtype() { DataType::Enum(_, _) => AnyValue::Enum(i, self.get_rev_map(), SyncPtr::new_null()), - DataType::Categorical(_, _) => { - AnyValue::Categorical(i, self.get_rev_map(), SyncPtr::new_null()) + DataType::Categorical(_, ord) => { + AnyValue::Categorical(i, self.get_rev_map(), SyncPtr::new_null(), *ord) }, _ => unimplemented!(), }, @@ -553,7 +557,7 @@ mod test { ); assert!(matches!( s.get(0)?, - AnyValue::Categorical(0, RevMapping::Local(_, _), _) + AnyValue::Categorical(0, RevMapping::Local(_, _), _, _) )); let groups = s.group_tuples(false, true); diff --git a/crates/polars-core/src/chunked_array/logical/date.rs b/crates/polars-core/src/chunked_array/logical/date.rs index ae67fd09ed0e..0f8f6c51a6ee 100644 --- a/crates/polars-core/src/chunked_array/logical/date.rs +++ b/crates/polars-core/src/chunked_array/logical/date.rs @@ -20,7 +20,7 @@ impl LogicalType for DateChunked { } fn get_any_value(&self, i: usize) -> PolarsResult> { - self.0.get_any_value(i).map(|av| av.as_date()) + self.0.try_get_any_value(i).map(|av| av.as_date()) } unsafe fn get_any_value_unchecked(&self, i: usize) -> AnyValue<'_> { diff --git a/crates/polars-core/src/chunked_array/logical/datetime.rs b/crates/polars-core/src/chunked_array/logical/datetime.rs index 3b8c2b02024a..6653db5173b6 100644 --- a/crates/polars-core/src/chunked_array/logical/datetime.rs +++ b/crates/polars-core/src/chunked_array/logical/datetime.rs @@ -18,7 +18,7 @@ impl LogicalType for DatetimeChunked { fn get_any_value(&self, i: usize) -> PolarsResult> { self.0 - .get_any_value(i) + .try_get_any_value(i) .map(|av| av.as_datetime(self.time_unit(), self.time_zone().as_ref())) } diff --git a/crates/polars-core/src/chunked_array/logical/duration.rs b/crates/polars-core/src/chunked_array/logical/duration.rs index 1dc0eab17c5d..23497eaebedb 100644 --- a/crates/polars-core/src/chunked_array/logical/duration.rs +++ b/crates/polars-core/src/chunked_array/logical/duration.rs @@ -18,7 +18,7 @@ impl LogicalType for DurationChunked { fn get_any_value(&self, i: usize) -> PolarsResult> { self.0 - .get_any_value(i) + .try_get_any_value(i) .map(|av| av.as_duration(self.time_unit())) } unsafe fn get_any_value_unchecked(&self, i: usize) -> AnyValue<'_> { diff --git a/crates/polars-core/src/chunked_array/logical/mod.rs b/crates/polars-core/src/chunked_array/logical/mod.rs index 0baa286e9f1d..7a34fcda9e01 100644 --- a/crates/polars-core/src/chunked_array/logical/mod.rs +++ b/crates/polars-core/src/chunked_array/logical/mod.rs @@ -97,8 +97,36 @@ where pub fn physical(&self) -> &ChunkedArray { &self.0 } + pub fn field(&self) -> Field { let name = self.0.ref_field().name(); Field::new(name.clone(), LogicalType::dtype(self).clone()) } } + +impl> Logical +where + Self: LogicalType, + ChunkedArray: ChunkAnyValue, +{ + pub unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + if check_dtypes { + for df in dfs.iter() { + if df.width() == 0 { + continue; + } + + let column = &df.get_columns()[i]; + polars_ensure!(self.dtype() == column.dtype(), append); + } + } + + unsafe { self.0.append_gather_unchecked(dfs, i, check_names, false) } + } +} diff --git a/crates/polars-core/src/chunked_array/logical/time.rs b/crates/polars-core/src/chunked_array/logical/time.rs index ce9b890282d8..ce7d2cd08f54 100644 --- a/crates/polars-core/src/chunked_array/logical/time.rs +++ b/crates/polars-core/src/chunked_array/logical/time.rs @@ -59,7 +59,7 @@ impl LogicalType for TimeChunked { #[cfg(feature = "dtype-time")] fn get_any_value(&self, i: usize) -> PolarsResult> { - self.0.get_any_value(i).map(|av| av.as_time()) + self.0.try_get_any_value(i).map(|av| av.as_time()) } unsafe fn get_any_value_unchecked(&self, i: usize) -> AnyValue<'_> { self.0.get_any_value_unchecked(i).as_time() diff --git a/crates/polars-core/src/chunked_array/mod.rs b/crates/polars-core/src/chunked_array/mod.rs index 3f09d6f37574..ea9fa179bf56 100644 --- a/crates/polars-core/src/chunked_array/mod.rs +++ b/crates/polars-core/src/chunked_array/mod.rs @@ -503,6 +503,120 @@ impl ChunkedArray { } } +impl ChunkedArray +where + T: PolarsDataType, + Self: ChunkAnyValue, +{ + pub unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + let estimated_num_chunks = self.chunks.len().max(1) * dfs.len(); + self.chunks.reserve(estimated_num_chunks); + + let mut flags = self.flags.get_mut(); + + for other_df in dfs { + if other_df.width() == 0 { + continue; + } + + let ca: &Self = other_df.get_columns()[i] + .as_materialized_series() + .as_ref() + .as_ref(); + + if check_names { + polars_ensure!( + self.name() == ca.name(), + ShapeMismatch: "unable to vstack, column names don't match: {:?} and {:?}", + self.name(), ca.name(), + ); + } + + if check_dtypes { + polars_ensure!(self.dtype() == ca.dtype(), append); + } + + if ca.is_empty() { + continue; + } + + let boundary_idx = self.length; + self.length += ca.len(); + self.null_count += ca.null_count(); + self.chunks.extend(ca.chunks.iter().cloned()); + + if self.len() == ca.len() { + flags = ca.get_flags() + & (StatisticsFlags::CAN_FAST_EXPLODE_LIST | StatisticsFlags::IS_SORTED_ANY); + } else if flags.is_sorted_any() || flags.can_fast_explode_list() { + let ca_flags = ca.get_flags(); + + let ca_can_fast_explode_list = ca_flags & StatisticsFlags::CAN_FAST_EXPLODE_LIST; + flags &= !(ca_can_fast_explode_list ^ StatisticsFlags::CAN_FAST_EXPLODE_LIST); + + let is_sorted = flags.is_sorted(); + let ca_is_sorted = ca_flags.is_sorted(); + + flags.set_sorted(IsSorted::Not); + + // Preserve the sorted flag if possible. + if !matches!(is_sorted, IsSorted::Not) && is_sorted == ca_is_sorted { + let lst = self.get_any_value_static_unchecked(boundary_idx - 1); + let fst = self.get_any_value_static_unchecked(boundary_idx); + + let is_still_sorted = match (&lst, &fst) { + (AnyValue::Null, AnyValue::Null) => { + ca.null_count() == ca.len() + || self.null_count() - ca.null_count() == self.len() - ca.len() + }, + (AnyValue::Null, _) => self.null_count() - ca.null_count() == 0, + (_, AnyValue::Null) => self.null_count() == ca.null_count(), + (_, _) => { + let mut res = true; + + if self.null_count() > 0 { + // SAFETY: We know that self has at least one element + let are_nulls_first = + unsafe { self.get_any_value_unchecked(0) }.is_null(); + let are_nulls_last = + unsafe { self.get_any_value_unchecked(self.len() - 1) } + .is_null(); + + res &= are_nulls_first != are_nulls_last; + } + + // @NOTE: This only works when the logical type and the physical type + // have the same ordering. This is not the case for + // Categorical(ordering = 'lexical') so that needs a special case. + res &= match is_sorted { + IsSorted::Ascending => lst <= fst, + IsSorted::Descending => lst >= fst, + IsSorted::Not => unreachable!(), + }; + + res + }, + }; + + if is_still_sorted { + flags.set_sorted(is_sorted); + } + } + } + } + + self.flags = StatisticsFlagsIM::new(flags); + + Ok(()) + } +} + impl ChunkedArray where T: PolarsDataType, diff --git a/crates/polars-core/src/chunked_array/ops/any_value.rs b/crates/polars-core/src/chunked_array/ops/any_value.rs index 6088690cedf7..968c4deba8d8 100644 --- a/crates/polars-core/src/chunked_array/ops/any_value.rs +++ b/crates/polars-core/src/chunked_array/ops/any_value.rs @@ -81,10 +81,10 @@ pub(crate) unsafe fn arr_to_any_value<'a>( } }, #[cfg(feature = "dtype-categorical")] - DataType::Categorical(rev_map, _) => { + DataType::Categorical(rev_map, ord) => { let arr = &*(arr as *const dyn Array as *const UInt32Array); let v = arr.value_unchecked(idx); - AnyValue::Categorical(v, rev_map.as_ref().unwrap().as_ref(), SyncPtr::new_null()) + AnyValue::Categorical(v, rev_map.as_ref().unwrap().as_ref(), SyncPtr::new_null(), *ord) }, #[cfg(feature = "dtype-categorical")] DataType::Enum(rev_map, _) => { @@ -163,11 +163,12 @@ impl<'a> AnyValue<'a> { if arr.is_valid_unchecked(idx) { let v = arr.value_unchecked(idx); match fld.dtype() { - DataType::Categorical(Some(rev_map), _) => { + DataType::Categorical(Some(rev_map), ord) => { AnyValue::Categorical( v, rev_map, SyncPtr::from_const(values), + *ord, ) }, DataType::Enum(Some(rev_map), _) => { @@ -210,17 +211,6 @@ macro_rules! get_any_value_unchecked { }}; } -macro_rules! get_any_value { - ($self:ident, $index:expr) => {{ - if $index >= $self.len() { - polars_bail!(oob = $index, $self.len()); - } - // SAFETY: - // bounds are checked - Ok(unsafe { $self.get_any_value_unchecked($index) }) - }}; -} - impl ChunkAnyValue for ChunkedArray where T: PolarsNumericType, @@ -229,10 +219,6 @@ where unsafe fn get_any_value_unchecked(&self, index: usize) -> AnyValue { get_any_value_unchecked!(self, index) } - - fn get_any_value(&self, index: usize) -> PolarsResult { - get_any_value!(self, index) - } } impl ChunkAnyValue for BooleanChunked { @@ -240,10 +226,6 @@ impl ChunkAnyValue for BooleanChunked { unsafe fn get_any_value_unchecked(&self, index: usize) -> AnyValue { get_any_value_unchecked!(self, index) } - - fn get_any_value(&self, index: usize) -> PolarsResult { - get_any_value!(self, index) - } } impl ChunkAnyValue for StringChunked { @@ -251,10 +233,6 @@ impl ChunkAnyValue for StringChunked { unsafe fn get_any_value_unchecked(&self, index: usize) -> AnyValue { get_any_value_unchecked!(self, index) } - - fn get_any_value(&self, index: usize) -> PolarsResult { - get_any_value!(self, index) - } } impl ChunkAnyValue for BinaryChunked { @@ -262,10 +240,6 @@ impl ChunkAnyValue for BinaryChunked { unsafe fn get_any_value_unchecked(&self, index: usize) -> AnyValue { get_any_value_unchecked!(self, index) } - - fn get_any_value(&self, index: usize) -> PolarsResult { - get_any_value!(self, index) - } } impl ChunkAnyValue for BinaryOffsetChunked { @@ -273,10 +247,6 @@ impl ChunkAnyValue for BinaryOffsetChunked { unsafe fn get_any_value_unchecked(&self, index: usize) -> AnyValue { get_any_value_unchecked!(self, index) } - - fn get_any_value(&self, index: usize) -> PolarsResult { - get_any_value!(self, index) - } } impl ChunkAnyValue for ListChunked { @@ -284,10 +254,6 @@ impl ChunkAnyValue for ListChunked { unsafe fn get_any_value_unchecked(&self, index: usize) -> AnyValue { get_any_value_unchecked!(self, index) } - - fn get_any_value(&self, index: usize) -> PolarsResult { - get_any_value!(self, index) - } } #[cfg(feature = "dtype-array")] @@ -296,10 +262,6 @@ impl ChunkAnyValue for ArrayChunked { unsafe fn get_any_value_unchecked(&self, index: usize) -> AnyValue { get_any_value_unchecked!(self, index) } - - fn get_any_value(&self, index: usize) -> PolarsResult { - get_any_value!(self, index) - } } #[cfg(feature = "object")] @@ -311,31 +273,18 @@ impl ChunkAnyValue for ObjectChunked { Some(v) => AnyValue::Object(v), } } - - fn get_any_value(&self, index: usize) -> PolarsResult { - get_any_value!(self, index) - } } impl ChunkAnyValue for NullChunked { #[inline] - unsafe fn get_any_value_unchecked(&self, _index: usize) -> AnyValue { + unsafe fn get_any_value_unchecked(&self, index: usize) -> AnyValue { + debug_assert!(index < NullChunked::len(self)); AnyValue::Null } - - fn get_any_value(&self, _index: usize) -> PolarsResult { - Ok(AnyValue::Null) - } } #[cfg(feature = "dtype-struct")] impl ChunkAnyValue for StructChunked { - /// Gets AnyValue from LogicalType - fn get_any_value(&self, i: usize) -> PolarsResult> { - polars_ensure!(i < self.len(), oob = i, self.len()); - unsafe { Ok(self.get_any_value_unchecked(i)) } - } - unsafe fn get_any_value_unchecked(&self, i: usize) -> AnyValue<'_> { let (chunk_idx, idx) = index_to_chunked_index(self.chunks.iter().map(|c| c.len()), i); if let DataType::Struct(flds) = self.dtype() { diff --git a/crates/polars-core/src/chunked_array/ops/mod.rs b/crates/polars-core/src/chunked_array/ops/mod.rs index 9c2c241ef7ee..3307878d11f6 100644 --- a/crates/polars-core/src/chunked_array/ops/mod.rs +++ b/crates/polars-core/src/chunked_array/ops/mod.rs @@ -2,6 +2,7 @@ use arrow::offset::OffsetsBuffer; use crate::prelude::*; +use crate::series::implementations::null::NullChunked; pub(crate) mod aggregate; pub(crate) mod any_value; @@ -67,7 +68,25 @@ pub(crate) trait ToBitRepr { fn to_bit_repr(&self) -> BitRepr; } -pub trait ChunkAnyValue { +#[allow(private_bounds)] +trait HasLength { + fn _has_length_get_len(&self) -> usize; +} + +impl HasLength for ChunkedArray { + fn _has_length_get_len(&self) -> usize { + ChunkedArray::len(self) + } +} + +impl HasLength for NullChunked { + fn _has_length_get_len(&self) -> usize { + NullChunked::len(self) + } +} + +#[allow(private_bounds)] +pub trait ChunkAnyValue: HasLength { /// Get a single value. Beware this is slow. /// If you need to use this slightly performant, cast Categorical to UInt32 /// @@ -75,8 +94,43 @@ pub trait ChunkAnyValue { /// Does not do any bounds checking. unsafe fn get_any_value_unchecked(&self, index: usize) -> AnyValue; + /// Get a single value as as static. Beware this is slow. + /// + /// # Safety + /// Does not do any bounds checking. + unsafe fn get_any_value_static_unchecked(&self, index: usize) -> AnyValue<'static> { + unsafe { self.get_any_value_unchecked(index) }.into_static() + } + /// Get a single value. Beware this is slow. - fn get_any_value(&self, index: usize) -> PolarsResult; + #[inline(always)] + fn try_get_any_value(&self, index: usize) -> PolarsResult { + self.get_any_value(index) + .ok_or_else(|| polars_err!(oob = index, self._has_length_get_len())) + } + + /// Get a single value. Beware this is slow. + #[inline(always)] + fn try_get_any_value_static(&self, index: usize) -> PolarsResult> { + self.get_any_value_static(index) + .ok_or_else(|| polars_err!(oob = index, self._has_length_get_len())) + } + + /// Get a single value. Beware this is slow. + fn get_any_value(&self, index: usize) -> Option { + if index >= self._has_length_get_len() { + return None; + } + + // SAFETY: We just did the bounds check + Some(unsafe { self.get_any_value_unchecked(index) }) + } + + /// Get a single value. Beware this is slow. + #[inline(always)] + fn get_any_value_static(&self, index: usize) -> Option> { + self.get_any_value(index).map(AnyValue::into_static) + } } /// Explode/flatten a List or String Series diff --git a/crates/polars-core/src/datatypes/any_value.rs b/crates/polars-core/src/datatypes/any_value.rs index 5cd41ada2f02..d54f3f986103 100644 --- a/crates/polars-core/src/datatypes/any_value.rs +++ b/crates/polars-core/src/datatypes/any_value.rs @@ -73,11 +73,21 @@ pub enum AnyValue<'a> { // If syncptr is_null the data is in the rev-map // otherwise it is in the array pointer #[cfg(feature = "dtype-categorical")] - Categorical(u32, &'a RevMapping, SyncPtr), + Categorical( + u32, + &'a RevMapping, + SyncPtr, + CategoricalOrdering, + ), // If syncptr is_null the data is in the rev-map // otherwise it is in the array pointer #[cfg(feature = "dtype-categorical")] - CategoricalOwned(u32, Arc, SyncPtr), + CategoricalOwned( + u32, + Arc, + SyncPtr, + CategoricalOrdering, + ), #[cfg(feature = "dtype-categorical")] Enum(u32, &'a RevMapping, SyncPtr), #[cfg(feature = "dtype-categorical")] @@ -421,8 +431,8 @@ impl<'a> AnyValue<'a> { #[cfg(feature = "dtype-duration")] Duration(_, tu) => DataType::Duration(*tu), #[cfg(feature = "dtype-categorical")] - Categorical(_, _, _) | CategoricalOwned(_, _, _) => { - DataType::Categorical(None, Default::default()) + Categorical(_, _, _, ordering) | CategoricalOwned(_, _, _, ordering) => { + DataType::Categorical(None, *ordering) }, #[cfg(feature = "dtype-categorical")] Enum(_, _, _) | EnumOwned(_, _, _) => DataType::Enum(None, Default::default()), @@ -767,7 +777,7 @@ impl<'a> AnyValue<'a> { Self::StringOwned(s) => Cow::Owned(s.to_string()), Self::Null => Cow::Borrowed("null"), #[cfg(feature = "dtype-categorical")] - Self::Categorical(idx, rev, arr) | AnyValue::Enum(idx, rev, arr) => { + Self::Categorical(idx, rev, arr, _) | AnyValue::Enum(idx, rev, arr) => { if arr.is_null() { Cow::Borrowed(rev.get(*idx)) } else { @@ -775,7 +785,7 @@ impl<'a> AnyValue<'a> { } }, #[cfg(feature = "dtype-categorical")] - Self::CategoricalOwned(idx, rev, arr) | AnyValue::EnumOwned(idx, rev, arr) => { + Self::CategoricalOwned(idx, rev, arr, _) | AnyValue::EnumOwned(idx, rev, arr) => { if arr.is_null() { Cow::Owned(rev.get(*idx).to_string()) } else { @@ -854,8 +864,8 @@ impl AnyValue<'_> { #[cfg(feature = "dtype-time")] Time(v) => v.hash(state), #[cfg(feature = "dtype-categorical")] - Categorical(v, _, _) - | CategoricalOwned(v, _, _) + Categorical(v, _, _, _) + | CategoricalOwned(v, _, _, _) | Enum(v, _, _) | EnumOwned(v, _, _) => v.hash(state), #[cfg(feature = "object")] @@ -1005,8 +1015,8 @@ impl<'a> AnyValue<'a> { AnyValue::Datetime(*v, *tu, tz.as_ref().map(AsRef::as_ref)) }, #[cfg(feature = "dtype-categorical")] - AnyValue::CategoricalOwned(v, rev, arr) => { - AnyValue::Categorical(*v, rev.as_ref(), *arr) + AnyValue::CategoricalOwned(v, rev, arr, ord) => { + AnyValue::Categorical(*v, rev.as_ref(), *arr, *ord) }, #[cfg(feature = "dtype-categorical")] AnyValue::EnumOwned(v, rev, arr) => AnyValue::Enum(*v, rev.as_ref(), *arr), @@ -1072,9 +1082,9 @@ impl<'a> AnyValue<'a> { #[cfg(feature = "dtype-decimal")] Decimal(val, scale) => Decimal(val, scale), #[cfg(feature = "dtype-categorical")] - Categorical(v, rev, arr) => CategoricalOwned(v, Arc::new(rev.clone()), arr), + Categorical(v, rev, arr, ord) => CategoricalOwned(v, Arc::new(rev.clone()), arr, ord), #[cfg(feature = "dtype-categorical")] - CategoricalOwned(v, rev, arr) => CategoricalOwned(v, rev, arr), + CategoricalOwned(v, rev, arr, ord) => CategoricalOwned(v, rev, arr, ord), #[cfg(feature = "dtype-categorical")] Enum(v, rev, arr) => EnumOwned(v, Arc::new(rev.clone()), arr), #[cfg(feature = "dtype-categorical")] @@ -1088,7 +1098,7 @@ impl<'a> AnyValue<'a> { AnyValue::String(s) => Some(s), AnyValue::StringOwned(s) => Some(s.as_str()), #[cfg(feature = "dtype-categorical")] - AnyValue::Categorical(idx, rev, arr) | AnyValue::Enum(idx, rev, arr) => { + AnyValue::Categorical(idx, rev, arr, _) | AnyValue::Enum(idx, rev, arr) => { let s = if arr.is_null() { rev.get(*idx) } else { @@ -1097,7 +1107,7 @@ impl<'a> AnyValue<'a> { Some(s) }, #[cfg(feature = "dtype-categorical")] - AnyValue::CategoricalOwned(idx, rev, arr) | AnyValue::EnumOwned(idx, rev, arr) => { + AnyValue::CategoricalOwned(idx, rev, arr, _) | AnyValue::EnumOwned(idx, rev, arr) => { let s = if arr.is_null() { rev.get(*idx) } else { @@ -1177,9 +1187,9 @@ impl AnyValue<'_> { *l == Datetime(*rv, *rtu, rtz.as_ref().map(|v| v.as_ref())) }, #[cfg(feature = "dtype-categorical")] - (CategoricalOwned(lv, lrev, larr), r) => Categorical(*lv, lrev.as_ref(), *larr) == *r, + (CategoricalOwned(lv, lrev, larr, lord), r) => Categorical(*lv, lrev.as_ref(), *larr, *lord) == *r, #[cfg(feature = "dtype-categorical")] - (l, CategoricalOwned(rv, rrev, rarr)) => *l == Categorical(*rv, rrev.as_ref(), *rarr), + (l, CategoricalOwned(rv, rrev, rarr, rord)) => *l == Categorical(*rv, rrev.as_ref(), *rarr, *rord), #[cfg(feature = "dtype-categorical")] (EnumOwned(lv, lrev, larr), r) => Enum(*lv, lrev.as_ref(), *larr) == *r, #[cfg(feature = "dtype-categorical")] @@ -1215,7 +1225,7 @@ impl AnyValue<'_> { }, (List(l), List(r)) => l == r, #[cfg(feature = "dtype-categorical")] - (Categorical(idx_l, rev_l, ptr_l), Categorical(idx_r, rev_r, ptr_r)) => { + (Categorical(idx_l, rev_l, ptr_l, ord_l), Categorical(idx_r, rev_r, ptr_r, ord_r)) => { if !same_revmap(rev_l, *ptr_l, rev_r, *ptr_r) { // We can't support this because our Hash impl directly hashes the index. If you // add support for this we must change the Hash impl. @@ -1223,6 +1233,7 @@ impl AnyValue<'_> { "comparing categoricals with different revmaps is not supported" ); } + assert_eq!(ord_l, ord_r); idx_l == idx_r }, @@ -1355,12 +1366,12 @@ impl PartialOrd for AnyValue<'_> { l.partial_cmp(&Datetime(*rv, *rtu, rtz.as_ref().map(|v| v.as_ref()))) }, #[cfg(feature = "dtype-categorical")] - (CategoricalOwned(lv, lrev, larr), r) => { - Categorical(*lv, lrev.as_ref(), *larr).partial_cmp(r) + (CategoricalOwned(lv, lrev, larr, lord), r) => { + Categorical(*lv, lrev.as_ref(), *larr, *lord).partial_cmp(r) }, #[cfg(feature = "dtype-categorical")] - (l, CategoricalOwned(rv, rrev, rarr)) => { - l.partial_cmp(&Categorical(*rv, rrev.as_ref(), *rarr)) + (l, CategoricalOwned(rv, rrev, rarr, rord)) => { + l.partial_cmp(&Categorical(*rv, rrev.as_ref(), *rarr, *rord)) }, #[cfg(feature = "dtype-categorical")] (EnumOwned(lv, lrev, larr), r) => Enum(*lv, lrev.as_ref(), *larr).partial_cmp(r), @@ -1409,15 +1420,25 @@ impl PartialOrd for AnyValue<'_> { }, #[cfg(feature = "dtype-time")] (Time(l), Time(r)) => l.partial_cmp(r), + #[cfg(feature = "dtype-categorical")] - (Categorical(..), Categorical(..)) => { - unimplemented!( - "can't order categoricals as AnyValues, dtype for ordering is needed" - ) + (Categorical(l, l_revmapping, _, l_ord), Categorical(r, r_revmapping, _, r_ord)) => { + if !l_revmapping.same_src(r_revmapping) { + unimplemented!("can't order enums of different types") + } + assert_eq!(l_ord, r_ord); + + match l_ord { + CategoricalOrdering::Physical => l.partial_cmp(r), + CategoricalOrdering::Lexical => self.get_str().unwrap().partial_cmp(other.get_str().unwrap()), + } }, #[cfg(feature = "dtype-categorical")] - (Enum(..), Enum(..)) => { - unimplemented!("can't order enums as AnyValues, dtype for ordering is needed") + (Enum(l, l_revmapping, _), Enum(r, r_revmapping, _)) => { + if !l_revmapping.same_src(r_revmapping) { + unimplemented!("can't order enums of different types") + } + l.partial_cmp(r) }, (List(_), List(_)) => { unimplemented!("ordering for List dtype is not supported") diff --git a/crates/polars-core/src/fmt.rs b/crates/polars-core/src/fmt.rs index fdfa3b70c40b..7bb9cb5641e6 100644 --- a/crates/polars-core/src/fmt.rs +++ b/crates/polars-core/src/fmt.rs @@ -1179,8 +1179,8 @@ impl Display for AnyValue<'_> { write!(f, "{nt}") }, #[cfg(feature = "dtype-categorical")] - AnyValue::Categorical(_, _, _) - | AnyValue::CategoricalOwned(_, _, _) + AnyValue::Categorical(_, _, _, _) + | AnyValue::CategoricalOwned(_, _, _, _) | AnyValue::Enum(_, _, _) | AnyValue::EnumOwned(_, _, _) => { let s = self.get_str().unwrap(); diff --git a/crates/polars-core/src/frame/column/mod.rs b/crates/polars-core/src/frame/column/mod.rs index 39cbdc26a83c..d0e0d63fb558 100644 --- a/crates/polars-core/src/frame/column/mod.rs +++ b/crates/polars-core/src/frame/column/mod.rs @@ -74,6 +74,8 @@ impl Column { Self::Scalar(ScalarColumn::new(name, scalar, length)) } + + // # Materialize /// Get a reference to a [`Series`] for this [`Column`] /// @@ -130,6 +132,12 @@ impl Column { Column::Scalar(s) => s.take_materialized_series(), } } + pub fn lazy_as_materialized_series(&self) -> Option<&Series> { + match self { + Column::Series(s) => Some(s), + Column::Partitioned(_) | Column::Scalar(_) => None, + } + } #[inline] pub fn dtype(&self) -> &DataType { @@ -899,6 +907,18 @@ impl Column { .vec_hash_combine(build_hasher, hashes) } + pub unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + // @scalar-opt + let slf = self.into_materialized_series()._get_inner_mut(); + unsafe { slf.append_gather_unchecked(dfs, i, check_names, check_dtypes) } + } + pub fn append(&mut self, other: &Column) -> PolarsResult<&mut Self> { // @scalar-opt self.into_materialized_series() diff --git a/crates/polars-core/src/series/any_value.rs b/crates/polars-core/src/series/any_value.rs index 00cdeb213ffa..ca0a7875d615 100644 --- a/crates/polars-core/src/series/any_value.rs +++ b/crates/polars-core/src/series/any_value.rs @@ -442,8 +442,8 @@ fn any_values_to_categorical( AnyValue::Enum(s, rev, _) => builder.append_value(rev.get(*s)), AnyValue::EnumOwned(s, rev, _) => builder.append_value(rev.get(*s)), - AnyValue::Categorical(s, rev, _) => builder.append_value(rev.get(*s)), - AnyValue::CategoricalOwned(s, rev, _) => builder.append_value(rev.get(*s)), + AnyValue::Categorical(s, rev, _, _) => builder.append_value(rev.get(*s)), + AnyValue::CategoricalOwned(s, rev, _, _) => builder.append_value(rev.get(*s)), AnyValue::Binary(_) | AnyValue::BinaryOwned(_) if !strict => builder.append_null(), AnyValue::Null => builder.append_null(), @@ -490,8 +490,8 @@ fn any_values_to_enum(values: &[AnyValue], dtype: &DataType, strict: bool) -> Po AnyValue::Enum(s, rev, _) => builder.append_enum(*s, rev)?, AnyValue::EnumOwned(s, rev, _) => builder.append_enum(*s, rev)?, - AnyValue::Categorical(s, rev, _) => builder.append_str(rev.get(*s))?, - AnyValue::CategoricalOwned(s, rev, _) => builder.append_str(rev.get(*s))?, + AnyValue::Categorical(s, rev, _, _) => builder.append_str(rev.get(*s))?, + AnyValue::CategoricalOwned(s, rev, _, _) => builder.append_str(rev.get(*s))?, AnyValue::Binary(_) | AnyValue::BinaryOwned(_) if !strict => builder.append_null(), AnyValue::Null => builder.append_null(), diff --git a/crates/polars-core/src/series/implementations/array.rs b/crates/polars-core/src/series/implementations/array.rs index f67dc7f301c8..056e7e120abc 100644 --- a/crates/polars-core/src/series/implementations/array.rs +++ b/crates/polars-core/src/series/implementations/array.rs @@ -124,6 +124,19 @@ impl SeriesTrait for SeriesWrap { (a.into_series(), b.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + unsafe { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); let other = other.array()?; diff --git a/crates/polars-core/src/series/implementations/binary.rs b/crates/polars-core/src/series/implementations/binary.rs index b37756a2c087..ece0ab82d08e 100644 --- a/crates/polars-core/src/series/implementations/binary.rs +++ b/crates/polars-core/src/series/implementations/binary.rs @@ -125,6 +125,19 @@ impl SeriesTrait for SeriesWrap { (a.into_series(), b.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + unsafe { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); // todo! add object diff --git a/crates/polars-core/src/series/implementations/binary_offset.rs b/crates/polars-core/src/series/implementations/binary_offset.rs index 4976240f776f..6a1305d38d9c 100644 --- a/crates/polars-core/src/series/implementations/binary_offset.rs +++ b/crates/polars-core/src/series/implementations/binary_offset.rs @@ -91,6 +91,16 @@ impl SeriesTrait for SeriesWrap { (a.into_series(), b.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + self.0.append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); // todo! add object diff --git a/crates/polars-core/src/series/implementations/boolean.rs b/crates/polars-core/src/series/implementations/boolean.rs index 58a3da28e9d2..8798fc85f5c4 100644 --- a/crates/polars-core/src/series/implementations/boolean.rs +++ b/crates/polars-core/src/series/implementations/boolean.rs @@ -144,6 +144,19 @@ impl SeriesTrait for SeriesWrap { (a.into_series(), b.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + unsafe { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); self.0.append(other.as_ref().as_ref())?; diff --git a/crates/polars-core/src/series/implementations/categorical.rs b/crates/polars-core/src/series/implementations/categorical.rs index 1b2a8a77b49f..7e48f9cd5e1e 100644 --- a/crates/polars-core/src/series/implementations/categorical.rs +++ b/crates/polars-core/src/series/implementations/categorical.rs @@ -160,6 +160,18 @@ impl SeriesTrait for SeriesWrap { (a, b) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + unsafe { self.0 + .logical_mut() + .append_gather_unchecked(dfs, i, check_names, check_dtypes) } + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); self.0.append(other.categorical().unwrap()) diff --git a/crates/polars-core/src/series/implementations/date.rs b/crates/polars-core/src/series/implementations/date.rs index 021a4a7e18cf..f54ea5940440 100644 --- a/crates/polars-core/src/series/implementations/date.rs +++ b/crates/polars-core/src/series/implementations/date.rs @@ -189,6 +189,17 @@ impl SeriesTrait for SeriesWrap { self.0.median() } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); let other = other.to_physical_repr(); diff --git a/crates/polars-core/src/series/implementations/datetime.rs b/crates/polars-core/src/series/implementations/datetime.rs index ee4d9022782a..bc4738fbc613 100644 --- a/crates/polars-core/src/series/implementations/datetime.rs +++ b/crates/polars-core/src/series/implementations/datetime.rs @@ -195,6 +195,19 @@ impl SeriesTrait for SeriesWrap { self.0.median() } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + unsafe { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); let other = other.to_physical_repr(); diff --git a/crates/polars-core/src/series/implementations/decimal.rs b/crates/polars-core/src/series/implementations/decimal.rs index 6e477ccf6c3f..ad7d57e220aa 100644 --- a/crates/polars-core/src/series/implementations/decimal.rs +++ b/crates/polars-core/src/series/implementations/decimal.rs @@ -223,6 +223,17 @@ impl SeriesTrait for SeriesWrap { (a, b) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); // 3 refs diff --git a/crates/polars-core/src/series/implementations/duration.rs b/crates/polars-core/src/series/implementations/duration.rs index 51426f1b94e6..2ff8b37119c6 100644 --- a/crates/polars-core/src/series/implementations/duration.rs +++ b/crates/polars-core/src/series/implementations/duration.rs @@ -315,6 +315,17 @@ impl SeriesTrait for SeriesWrap { self.0.var(ddof) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); let other = other.to_physical_repr().into_owned(); diff --git a/crates/polars-core/src/series/implementations/floats.rs b/crates/polars-core/src/series/implementations/floats.rs index 9ccbb1d8d958..ec2f1b77e370 100644 --- a/crates/polars-core/src/series/implementations/floats.rs +++ b/crates/polars-core/src/series/implementations/floats.rs @@ -178,6 +178,19 @@ macro_rules! impl_dyn_series { (a.into_series(), b.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + unsafe { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); self.0.append(other.as_ref().as_ref())?; diff --git a/crates/polars-core/src/series/implementations/list.rs b/crates/polars-core/src/series/implementations/list.rs index f8ca26e9902d..6d63b90c9fa6 100644 --- a/crates/polars-core/src/series/implementations/list.rs +++ b/crates/polars-core/src/series/implementations/list.rs @@ -123,6 +123,17 @@ impl SeriesTrait for SeriesWrap { (a.into_series(), b.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); self.0.append(other.as_ref().as_ref()) diff --git a/crates/polars-core/src/series/implementations/mod.rs b/crates/polars-core/src/series/implementations/mod.rs index 9df0e7695127..4477edfaad32 100644 --- a/crates/polars-core/src/series/implementations/mod.rs +++ b/crates/polars-core/src/series/implementations/mod.rs @@ -51,6 +51,18 @@ impl Deref for SeriesWrap> { } } +impl AsRef> for SeriesWrap> { + fn as_ref(&self) -> &ChunkedArray { + &self.0 + } +} + +impl AsRef> for SeriesWrap> { + fn as_ref(&self) -> &ChunkedArray { + &self.0 .0 + } +} + unsafe impl IntoSeries for ChunkedArray where SeriesWrap>: SeriesTrait, @@ -248,6 +260,19 @@ macro_rules! impl_dyn_series { (a.into_series(), b.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + unsafe { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); self.0.append(other.as_ref().as_ref())?; diff --git a/crates/polars-core/src/series/implementations/null.rs b/crates/polars-core/src/series/implementations/null.rs index 0d634b30f5d0..256061d7a7ab 100644 --- a/crates/polars-core/src/series/implementations/null.rs +++ b/crates/polars-core/src/series/implementations/null.rs @@ -306,6 +306,43 @@ impl SeriesTrait for NullChunked { self.clone().into_series() } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + let estimated_num_chunks = self.n_chunks() * dfs.len(); + self.chunks.reserve(estimated_num_chunks); + + for df in dfs { + if df.width() == 0 { + continue; + } + + let column = &df.get_columns()[i]; + + if check_names { + polars_ensure!( + self.name() == column.name(), + ShapeMismatch: "unable to vstack, column names don't match: {:?} and {:?}", + self.name(), column.name(), + ); + } + + if check_dtypes { + polars_ensure!(column.dtype() == &DataType::Null, ComputeError: "expected null dtype"); + } + + // we don't create a new null array to keep probability of aligned chunks higher + self.chunks.extend(column.as_materialized_series().chunks().iter().cloned()); + self.length += column.len() as IdxSize; + } + + Ok(()) + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(other.dtype() == &DataType::Null, ComputeError: "expected null dtype"); // we don't create a new null array to keep probability of aligned chunks higher diff --git a/crates/polars-core/src/series/implementations/object.rs b/crates/polars-core/src/series/implementations/object.rs index bc753f9b06f4..9cc5ec7a3486 100644 --- a/crates/polars-core/src/series/implementations/object.rs +++ b/crates/polars-core/src/series/implementations/object.rs @@ -119,6 +119,17 @@ where (a.into_series(), b.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { if self.dtype() != other.dtype() { polars_bail!(append); @@ -177,7 +188,7 @@ where } fn get(&self, index: usize) -> PolarsResult { - ObjectChunked::get_any_value(&self.0, index) + ObjectChunked::try_get_any_value(&self.0, index) } unsafe fn get_unchecked(&self, index: usize) -> AnyValue { ObjectChunked::get_any_value_unchecked(&self.0, index) diff --git a/crates/polars-core/src/series/implementations/string.rs b/crates/polars-core/src/series/implementations/string.rs index 1dffefc679b1..5cc3f6d509ae 100644 --- a/crates/polars-core/src/series/implementations/string.rs +++ b/crates/polars-core/src/series/implementations/string.rs @@ -124,6 +124,19 @@ impl SeriesTrait for SeriesWrap { (a.into_series(), b.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + unsafe { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!( self.0.dtype() == other.dtype(), diff --git a/crates/polars-core/src/series/implementations/struct_.rs b/crates/polars-core/src/series/implementations/struct_.rs index d741747bc32d..09aab8ce1815 100644 --- a/crates/polars-core/src/series/implementations/struct_.rs +++ b/crates/polars-core/src/series/implementations/struct_.rs @@ -110,6 +110,17 @@ impl SeriesTrait for SeriesWrap { (l.into_series(), r.into_series()) } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); self.0.append(other.as_ref().as_ref()) diff --git a/crates/polars-core/src/series/implementations/time.rs b/crates/polars-core/src/series/implementations/time.rs index d0f3a7e0571a..cfbcf17ad87c 100644 --- a/crates/polars-core/src/series/implementations/time.rs +++ b/crates/polars-core/src/series/implementations/time.rs @@ -171,6 +171,17 @@ impl SeriesTrait for SeriesWrap { self.0.median() } + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()> { + self.0 + .append_gather_unchecked(dfs, i, check_names, check_dtypes) + } + fn append(&mut self, other: &Series) -> PolarsResult<()> { polars_ensure!(self.0.dtype() == other.dtype(), append); let other = other.to_physical_repr(); diff --git a/crates/polars-core/src/series/series_trait.rs b/crates/polars-core/src/series/series_trait.rs index c1b8d3f97764..a04ac6647f00 100644 --- a/crates/polars-core/src/series/series_trait.rs +++ b/crates/polars-core/src/series/series_trait.rs @@ -255,6 +255,14 @@ pub trait SeriesTrait: /// end of the array fn split_at(&self, _offset: i64) -> (Series, Series); + unsafe fn append_gather_unchecked( + &mut self, + dfs: &[DataFrame], + i: usize, + check_names: bool, + check_dtypes: bool, + ) -> PolarsResult<()>; + #[doc(hidden)] fn append(&mut self, _other: &Series) -> PolarsResult<()>; diff --git a/crates/polars-core/src/utils/mod.rs b/crates/polars-core/src/utils/mod.rs index c23a4b161bda..f1f8164c19f5 100644 --- a/crates/polars-core/src/utils/mod.rs +++ b/crates/polars-core/src/utils/mod.rs @@ -21,7 +21,9 @@ pub use series::*; pub use supertype::*; pub use {arrow, rayon}; +use crate::chunked_array::flags::StatisticsFlags; use crate::prelude::*; +use crate::series::IsSorted; use crate::POOL; #[repr(transparent)] @@ -749,33 +751,315 @@ pub fn accumulate_dataframes_vertical_unchecked(dfs: I) -> DataFrame where I: IntoIterator, { + unsafe { accumulate_dataframes_vertical_unchecked_impl(dfs, false, false, true) }.unwrap() +} + +#[cold] +#[inline(never)] +fn name_mismatch(n1: &str, n2: &str) -> PolarsError { + polars_err!( + ShapeMismatch: "unable to vstack, column names don't match: {n1} and {n2}", + ) +} +#[cold] +#[inline(never)] +fn dtype_mismatch(col: &str, dt1: &DataType, dt2: &DataType) -> PolarsError { + polars_err!( + SchemaMismatch: + "cannot append series '{col}', data types don't match ({dt1:?} != {dt2:?})", + ) +} +#[cold] +#[inline(never)] +fn width_mismatch(df1: &DataFrame, df2: &DataFrame) -> PolarsError { + let mut df1_extra = Vec::new(); + let mut df2_extra = Vec::new(); + + let s1 = df1.schema(); + let s2 = df2.schema(); + + s1.field_compare(&s2, &mut df1_extra, &mut df2_extra); + + let df1_extra = df1_extra + .into_iter() + .map(|(_, (n, _))| n.as_str()) + .collect::>() + .join(", "); + let df2_extra = df2_extra + .into_iter() + .map(|(_, (n, _))| n.as_str()) + .collect::>() + .join(", "); + + polars_err!( + SchemaMismatch: r#"unable to vstack, dataframes have different widths ({} != {}). + +One dataframe has additional columns: [{df1_extra}]. +Other dataframe has additional columns: [{df2_extra}]."#, + df1.width(), + df2.width(), + ) +} + +/// Accumulate several [`DataFrame`]s by vertically stacking them. +/// +/// This will not rechunk and always check that the widths of the [`DataFrame`]s is the same. +/// Checking of matches column names and datatypes can be enabled with the `check_names` and +/// `check_dtype`s respectively. +/// +/// Parallelism with [`rayon`] can be turned on with the `parallel` option. +/// +/// # Safety +/// +/// If `check_dtypes = False`, it needs to be ensured that all the datatypes between columns are +/// the same. +pub unsafe fn accumulate_dataframes_vertical_unchecked_impl( + dfs: impl IntoIterator, + check_names: bool, + check_dtypes: bool, + parallel: bool, +) -> PolarsResult { let mut iter = dfs.into_iter(); - let additional = iter.size_hint().0; - let mut acc_df = iter.next().unwrap(); - acc_df.reserve_chunks(additional); - for df in iter { - acc_df.vstack_mut_unchecked(&df); + // Find the first dataframe that actually has all columns. + let mut height = 0; + let Some(mut fst_full_schema_df) = iter.by_ref().find(|df| { + height += df.height(); + df.width() != 0 + }) else { + // SAFETY: No columns are given. + return Ok(unsafe { DataFrame::new_no_checks(height, Vec::new()) }); + }; + + let width = fst_full_schema_df.width(); + + // Find the first non-empty dataframe. + while fst_full_schema_df.height() == 0 { + let Some(df) = iter.next() else { + return Ok(fst_full_schema_df); + }; + + if df.width() == 0 { + continue; + } + + if df.width() != width { + return Err(width_mismatch(&df, &fst_full_schema_df)); + } + + if check_names || check_dtypes { + for (new_c, old_c) in df + .get_columns() + .iter() + .zip(fst_full_schema_df.get_columns()) + { + if check_names && old_c.name() != new_c.name() { + return Err(name_mismatch(old_c.name(), new_c.name())); + } + if check_dtypes && old_c.dtype() != new_c.dtype() { + return Err(dtype_mismatch(old_c.name(), old_c.dtype(), new_c.dtype())); + } + } + } + + fst_full_schema_df = df; } - acc_df + + let mut chunk_amount_hints: Vec = fst_full_schema_df + .get_columns() + .iter() + .map(|c| c.n_chunks()) + .collect(); + let mut columns: Vec<_> = fst_full_schema_df + .take_columns() + .into_iter() + .map(|c| { + let mut cs = Vec::::with_capacity(iter.size_hint().0 + 1); + cs.push(c); + cs + }) + .collect(); + + for df in iter { + if df.width() == 0 { + continue; + } + + if df.width() != width { + // Cold path: just reconstruct the dataframe so you can give a nice error message. + let fst_full_schema_df = + DataFrame::new(columns.iter_mut().map(|c| c.pop().unwrap()).collect()).unwrap(); + return Err(width_mismatch(&df, &fst_full_schema_df)); + } + + // Empty dataframes get skipped but we still want to check the names and datatypes if + // requested. + if df.height() == 0 { + if check_names || check_dtypes { + for (new_c, old_c) in df.get_columns().iter().zip(columns.iter().map(|cs| &cs[0])) { + if check_names && old_c.name() != new_c.name() { + return Err(name_mismatch(old_c.name(), new_c.name())); + } + if check_dtypes && old_c.dtype() != new_c.dtype() { + return Err(dtype_mismatch(old_c.name(), old_c.dtype(), new_c.dtype())); + } + } + } + + continue; + } + + chunk_amount_hints + .iter_mut() + .zip(df.get_columns()) + .for_each(|(chunk_amount_hint, column)| { + *chunk_amount_hint += column + .lazy_as_materialized_series() + .map_or(1, |s| s.n_chunks()) + }); + + columns + .iter_mut() + .zip(df.take_columns()) + .for_each(|(into, from)| into.push(from)); + } + + let f = |(column, chunk_amount_hint): (Vec, usize)| { + let mut iter = column.into_iter(); + let fst = iter.next().unwrap(); + + // @NOTE: + // Yeah... objects and categoricals are very annoying because they require merging of the + // datatypes. For now, just fallback to the dumb append loop. + if fst.dtype().contains_categoricals() || fst.dtype().contains_objects() { + let mut fst = fst; + for c in iter { + fst.append(&c)?; + } + return Ok(fst); + } + + let mut chunks = Vec::with_capacity(chunk_amount_hint); + + let name = fst.name().clone(); + let dtype = fst.dtype().clone(); + let mut flags = fst.get_flags(); + + let mut length = fst.len(); + let mut null_count = fst.null_count(); + + let (has_nulls_at_start, mut lst_av) = if flags.is_sorted_any() { + ( + fst.get(0).unwrap().is_null(), + fst.get(length - 1).unwrap().into_static(), + ) + } else { + (false, AnyValue::Null) + }; + + chunks.extend(fst.take_materialized_series().into_chunks()); + + for c in iter { + if check_names && name != c.name() { + return Err(name_mismatch(name.as_str(), c.name())); + } + if check_dtypes && &dtype != c.dtype() { + return Err(dtype_mismatch(name.as_str(), &dtype, c.dtype())); + } + + let c_len = c.len(); + let c_null_count = c.null_count(); + length += c_len; + null_count += c_null_count; + + let c_flags = c.get_flags(); + + let c_can_fast_explode_list = c_flags & StatisticsFlags::CAN_FAST_EXPLODE_LIST; + flags &= !(c_can_fast_explode_list ^ StatisticsFlags::CAN_FAST_EXPLODE_LIST); + + let is_sorted = flags.is_sorted(); + let c_is_sorted = c_flags.is_sorted(); + + flags.set_sorted(IsSorted::Not); + + // Preserve the sorted flag if possible. + use DataType as D; + if !matches!(is_sorted, IsSorted::Not) + && is_sorted == c_is_sorted + && !matches!(&dtype, D::Struct(_) | D::List(_) | D::Array(_, _)) + { + let fst_av = c.get(0).unwrap(); + + let is_still_sorted = match (&lst_av, &fst_av) { + (AnyValue::Null, AnyValue::Null) => { + null_count == c_len || null_count - c_null_count == length - c_len + }, + (AnyValue::Null, _) => null_count - c_null_count == 0, + (_, AnyValue::Null) => null_count == c_null_count, + (_, _) => { + let mut res = true; + + if null_count > 0 { + // SAFETY: We know that self has at least one element + let are_nulls_first = has_nulls_at_start; + let are_nulls_last = c.get(c_len - 1).unwrap().is_null(); + + res &= are_nulls_first != are_nulls_last; + } + + res & match is_sorted { + IsSorted::Ascending => lst_av <= fst_av, + IsSorted::Descending => lst_av >= fst_av, + IsSorted::Not => unreachable!(), + } + }, + }; + + if is_still_sorted { + flags.set_sorted(is_sorted); + lst_av = c.get(c.len() - 1).unwrap().into_static(); + } + } + + chunks.extend(c.take_materialized_series().into_chunks()); + } + + // @TODO: We know the length and null count. This will recalculate these. Maybe, we can + // pass these somehow. + // SAFETY: Either we checked that all Series have the same dtype or it is an invariant of + // this function. + let series = unsafe { Series::from_chunks_and_dtype_unchecked(name, chunks, &dtype) }; + Ok(Column::from(series)) + }; + + let columns = if parallel { + POOL.install(|| { + columns + .into_par_iter() + .zip(chunk_amount_hints) + .map(f) + .collect::>>() + }) + } else { + columns + .into_iter() + .zip(chunk_amount_hints) + .map(f) + .collect::>>() + }?; + + let height = columns[0].len(); + // SAFETY: We only took whole columns from valid dataframes. + let df = unsafe { DataFrame::new_no_checks(height, columns) }; + Ok(df) } /// This takes ownership of the DataFrame so that drop is called earlier. -/// # Panics -/// Panics if `dfs` is empty. pub fn accumulate_dataframes_vertical(dfs: I) -> PolarsResult where I: IntoIterator, { - let mut iter = dfs.into_iter(); - let additional = iter.size_hint().0; - let mut acc_df = iter.next().unwrap(); - acc_df.reserve_chunks(additional); - for df in iter { - acc_df.vstack_mut(&df)?; - } - - Ok(acc_df) + unsafe { accumulate_dataframes_vertical_unchecked_impl(dfs, true, true, true) } } /// Concat the DataFrames to a single DataFrame. diff --git a/crates/polars-plan/src/plans/conversion/type_coercion/mod.rs b/crates/polars-plan/src/plans/conversion/type_coercion/mod.rs index 88ba30a69c42..6558ae6e4a98 100644 --- a/crates/polars-plan/src/plans/conversion/type_coercion/mod.rs +++ b/crates/polars-plan/src/plans/conversion/type_coercion/mod.rs @@ -449,7 +449,7 @@ fn try_inline_literal_cast( #[cfg(feature = "dtype-duration")] (AnyValue::Duration(_, _), _) => return Ok(None), #[cfg(feature = "dtype-categorical")] - (AnyValue::Categorical(_, _, _), _) | (_, DataType::Categorical(_, _)) => { + (AnyValue::Categorical(_, _, _, _), _) | (_, DataType::Categorical(_, _)) => { return Ok(None) }, #[cfg(feature = "dtype-categorical")] diff --git a/crates/polars-plan/src/plans/lit.rs b/crates/polars-plan/src/plans/lit.rs index 7ab7bb21003f..851f29799db0 100644 --- a/crates/polars-plan/src/plans/lit.rs +++ b/crates/polars-plan/src/plans/lit.rs @@ -317,7 +317,7 @@ impl From> for LiteralValue { AnyValue::List(l) => Self::Series(SpecialEq::new(l)), AnyValue::StringOwned(o) => Self::String(o), #[cfg(feature = "dtype-categorical")] - AnyValue::Categorical(c, rev_mapping, arr) | AnyValue::Enum(c, rev_mapping, arr) => { + AnyValue::Categorical(c, rev_mapping, arr, _) | AnyValue::Enum(c, rev_mapping, arr) => { if arr.is_null() { Self::String(PlSmallStr::from_str(rev_mapping.get(c))) } else { diff --git a/crates/polars-python/src/conversion/any_value.rs b/crates/polars-python/src/conversion/any_value.rs index 18c133eb5a45..76bfc78d6e99 100644 --- a/crates/polars-python/src/conversion/any_value.rs +++ b/crates/polars-python/src/conversion/any_value.rs @@ -75,7 +75,7 @@ pub(crate) fn any_value_into_py_object<'py>( AnyValue::Boolean(v) => v.into_bound_py_any(py), AnyValue::String(v) => v.into_bound_py_any(py), AnyValue::StringOwned(v) => v.into_bound_py_any(py), - AnyValue::Categorical(idx, rev, arr) | AnyValue::Enum(idx, rev, arr) => { + AnyValue::Categorical(idx, rev, arr, _) | AnyValue::Enum(idx, rev, arr) => { let s = if arr.is_null() { rev.get(idx) } else { @@ -83,7 +83,7 @@ pub(crate) fn any_value_into_py_object<'py>( }; s.into_bound_py_any(py) }, - AnyValue::CategoricalOwned(idx, rev, arr) | AnyValue::EnumOwned(idx, rev, arr) => { + AnyValue::CategoricalOwned(idx, rev, arr, _) | AnyValue::EnumOwned(idx, rev, arr) => { let s = if arr.is_null() { rev.get(idx) } else { diff --git a/crates/polars-python/src/functions/eager.rs b/crates/polars-python/src/functions/eager.rs index 5f68067824f1..d01ad72e67dd 100644 --- a/crates/polars-python/src/functions/eager.rs +++ b/crates/polars-python/src/functions/eager.rs @@ -1,5 +1,5 @@ use polars::functions; -use polars_core::prelude::*; +use polars_core::utils::accumulate_dataframes_vertical; use pyo3::prelude::*; use crate::conversion::{get_df, get_series}; @@ -7,44 +7,20 @@ use crate::error::PyPolarsErr; use crate::{PyDataFrame, PySeries}; #[pyfunction] -pub fn concat_df(dfs: &Bound<'_, PyAny>, py: Python) -> PyResult { - use polars_core::error::PolarsResult; - use polars_core::utils::rayon::prelude::*; - +pub fn concat_df(dfs: &Bound<'_, PyAny>, _py: Python) -> PyResult { let mut iter = dfs.try_iter()?; let first = iter.next().unwrap()?; let first_rdf = get_df(&first)?; - let identity_df = first_rdf.clear(); - - let mut rdfs: Vec> = vec![Ok(first_rdf)]; + let mut rdfs = vec![first_rdf]; for item in iter { - let rdf = get_df(&item?)?; - rdfs.push(Ok(rdf)); + rdfs.push(get_df(&item?)?); } - - let identity = || Ok(identity_df.clone()); - - let df = py - .allow_threads(|| { - polars_core::POOL.install(|| { - rdfs.into_par_iter() - .fold(identity, |acc: PolarsResult, df| { - let mut acc = acc?; - acc.vstack_mut(&df?)?; - Ok(acc) - }) - .reduce(identity, |acc, df| { - let mut acc = acc?; - acc.vstack_mut(&df?)?; - Ok(acc) - }) - }) - }) - .map_err(PyPolarsErr::from)?; - - Ok(df.into()) + accumulate_dataframes_vertical(rdfs) + .map(Into::into) + .map_err(PyPolarsErr::from) + .map_err(PyErr::from) } #[pyfunction]