diff --git a/crates/rayexec_execution/src/arrays/array/mod.rs b/crates/rayexec_execution/src/arrays/array/mod.rs index f2e864110..3d9df31ee 100644 --- a/crates/rayexec_execution/src/arrays/array/mod.rs +++ b/crates/rayexec_execution/src/arrays/array/mod.rs @@ -13,13 +13,15 @@ mod shared_or_owned; use std::fmt::Debug; use std::sync::Arc; -use array_buffer::{ArrayBuffer, DictionaryBuffer, ListBuffer, SecondaryBuffer}; +use array_buffer::{ArrayBuffer, DictionaryBuffer, ListBuffer, ListItemMetadata, SecondaryBuffer}; use array_data::ArrayData; use buffer_manager::{BufferManager, NopBufferManager}; use flat::FlatArrayView; use half::f16; use physical_type::{ + Addressable, AddressableMut, + MutablePhysicalStorage, PhysicalAny, PhysicalBinary, PhysicalBool, @@ -34,6 +36,7 @@ use physical_type::{ PhysicalI8, PhysicalInterval, PhysicalList, + PhysicalStorage, PhysicalType, PhysicalU128, PhysicalU16, @@ -282,6 +285,400 @@ where Ok(()) } + + /// Resets self to prepare for writing to the array. + /// + /// This will: + /// - Reset validity to all 'valid'. + /// - Create or reuse a writeable buffer for array data. No guarantees are + /// made about the contents of the buffer. + /// + /// Bfuffer values _must_ be written for a row before attempting to read a + /// value for that row after calling this function. Underlying storage may + /// be cleared resulting in stale metadata (and thus invalid reads). + pub fn reset_for_write(&mut self, manager: &Arc) -> Result<()> { + let cap = self.capacity(); + let next = self.next.as_mut().unwrap(); + + next.validity = Validity::new_all_valid(cap); + + // Check if dictionary first since we want to try to get the underlying + // buffer from that. We should only have layer of "dictionary", so we + // shouldn't need to recurse. + if next.data.as_ref().physical_type() == PhysicalType::Dictionary { + let secondary = next.data.try_as_mut()?.get_secondary_mut(); + let dict = match std::mem::replace(secondary, SecondaryBuffer::None) { + SecondaryBuffer::Dictionary(dict) => dict, + other => { + return Err(RayexecError::new(format!( + "Expected dictionary secondary buffer, got {other:?}", + ))) + } + }; + + // TODO: Not sure what to do if capacities don't match. Currently + // dictionaries are only created through 'select' and the index + // buffer gets initialized to the length of the selection. + next.data = dict.buffer; + } + + if let Err(()) = next.data.try_reset_for_write() { + // Need to create a new buffer and set that. + let buffer = array_buffer_for_datatype(manager, &self.datatype, cap)?; + next.data = ArrayData::owned(buffer) + } + + // Reset secondary buffers. + match next.data.try_as_mut()?.get_secondary_mut() { + SecondaryBuffer::StringViewHeap(heap) => { + heap.clear(); + // All metadata is stale. Panics may occur if attempting to read + // prior to writing new values for a row. + } + SecondaryBuffer::List(list) => { + list.entries = 0; + // Child array keeps its capacity, it'll be overwritten. List + // item metadata will become stale, but technically won't error. + } + SecondaryBuffer::Dictionary(_) => (), + SecondaryBuffer::None => (), + } + + Ok(()) + } + + /// "Clones" some other array into this array. + /// + /// This will try to make the buffer from the other array managed to make it + /// cheaply cloneable and shared with this array. + /// + /// Array capacities and datatypes must be the same for both arrays. + pub fn try_clone_from(&mut self, manager: &B, other: &mut Self) -> Result<()> { + if self.datatype != other.datatype { + return Err(RayexecError::new( + "Attempted clone array from other array with different data types", + ) + .with_field("own_datatype", self.datatype.clone()) + .with_field("other_datatype", other.datatype.clone())); + } + + // TODO: Do we want this check? Dictionaries right now can have differing capacities based + // on selection inputs. + // if self.capacity() != other.capacity() { + // return Err(RayexecError::new( + // "Attempted to clone into array from other array with different capacity", + // ) + // .with_field("own_capacity", self.capacity()) + // .with_field("other_capacity", other.capacity())); + // } + + let managed = other.next_mut().data.make_managed(manager)?; + self.next_mut().data.set_managed(managed)?; + self.next_mut().validity = other.next().validity.clone(); + + Ok(()) + } + + /// Copy rows from self to another array. + /// + /// `mapping` provides a mapping of source indices to destination indices in + /// (source, dest) pairs. + pub fn copy_rows( + &self, + mapping: impl stdutil::iter::IntoExactSizeIterator, + dest: &mut Self, + ) -> Result<()> { + match self.datatype.physical_type() { + PhysicalType::Boolean => copy_rows::(self, mapping, dest)?, + PhysicalType::Int8 => copy_rows::(self, mapping, dest)?, + PhysicalType::Int16 => copy_rows::(self, mapping, dest)?, + PhysicalType::Int32 => copy_rows::(self, mapping, dest)?, + PhysicalType::Int64 => copy_rows::(self, mapping, dest)?, + PhysicalType::Int128 => copy_rows::(self, mapping, dest)?, + PhysicalType::UInt8 => copy_rows::(self, mapping, dest)?, + PhysicalType::UInt16 => copy_rows::(self, mapping, dest)?, + PhysicalType::UInt32 => copy_rows::(self, mapping, dest)?, + PhysicalType::UInt64 => copy_rows::(self, mapping, dest)?, + PhysicalType::UInt128 => copy_rows::(self, mapping, dest)?, + PhysicalType::Float16 => copy_rows::(self, mapping, dest)?, + PhysicalType::Float32 => copy_rows::(self, mapping, dest)?, + PhysicalType::Float64 => copy_rows::(self, mapping, dest)?, + PhysicalType::Interval => copy_rows::(self, mapping, dest)?, + PhysicalType::Utf8 => copy_rows::(self, mapping, dest)?, + _ => unimplemented!(), + } + + Ok(()) + } + + pub fn get_value(&self, idx: usize) -> Result { + if idx >= self.capacity() { + return Err(RayexecError::new("Index out of bounds") + .with_field("idx", idx) + .with_field("capacity", self.capacity())); + } + + let flat = self.flat_view()?; + let idx = flat.selection.get(idx).expect("Index to be in bounds"); + + if !flat.validity.is_valid(idx) { + return Ok(ScalarValue::Null); + } + + match &self.datatype { + DataType::Boolean => { + let v = PhysicalBool::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Boolean(*v)) + } + DataType::Int8 => { + let v = PhysicalI8::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Int8(*v)) + } + DataType::Int16 => { + let v = PhysicalI16::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Int16(*v)) + } + DataType::Int32 => { + let v = PhysicalI32::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Int32(*v)) + } + DataType::Int64 => { + let v = PhysicalI64::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Int64(*v)) + } + DataType::Int128 => { + let v = PhysicalI128::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Int128(*v)) + } + DataType::UInt8 => { + let v = PhysicalU8::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::UInt8(*v)) + } + DataType::UInt16 => { + let v = PhysicalU16::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::UInt16(*v)) + } + DataType::UInt32 => { + let v = PhysicalU32::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::UInt32(*v)) + } + DataType::UInt64 => { + let v = PhysicalU64::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::UInt64(*v)) + } + DataType::UInt128 => { + let v = PhysicalU128::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::UInt128(*v)) + } + DataType::Float16 => { + let v = PhysicalF16::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Float16(*v)) + } + DataType::Float32 => { + let v = PhysicalF32::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Float32(*v)) + } + DataType::Float64 => { + let v = PhysicalF64::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Float64(*v)) + } + DataType::Decimal64(m) => { + let v = PhysicalI64::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Decimal64(Decimal64Scalar { + precision: m.precision, + scale: m.scale, + value: *v, + })) + } + DataType::Decimal128(m) => { + let v = PhysicalI128::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Decimal128(Decimal128Scalar { + precision: m.precision, + scale: m.scale, + value: *v, + })) + } + DataType::Interval => { + let v = PhysicalInterval::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Interval(*v)) + } + DataType::Timestamp(m) => { + let v = PhysicalI64::get_addressable(flat.array_buffer)? + .get(idx) + .unwrap(); + Ok(ScalarValue::Timestamp(TimestampScalar { + unit: m.unit, + value: *v, + })) + } + DataType::Utf8 => { + let addressable = PhysicalUtf8::get_addressable(flat.array_buffer)?; + let v = addressable.get(idx).unwrap(); + Ok(ScalarValue::Utf8(v.into())) + } + DataType::Binary => { + let addressable = PhysicalBinary::get_addressable(flat.array_buffer)?; + let v = addressable.get(idx).unwrap(); + Ok(ScalarValue::Binary(v.into())) + } + + _ => not_implemented!("get value for scalar type"), + } + } + + /// Set a scalar value at a given index. + pub fn set_value(&mut self, idx: usize, val: &ScalarValue) -> Result<()> { + if idx >= self.capacity() { + return Err(RayexecError::new("Index out of bounds") + .with_field("idx", idx) + .with_field("capacity", self.capacity())); + } + + let next = self.next_mut(); + next.validity.set_valid(idx); + let data = next.data.try_as_mut()?; + + match val { + ScalarValue::Null => { + next.validity.set_invalid(idx); + } + ScalarValue::Boolean(val) => { + PhysicalBool::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Int8(val) => { + PhysicalI8::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Int16(val) => { + PhysicalI16::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Int32(val) => { + PhysicalI32::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Int64(val) => { + PhysicalI64::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Int128(val) => { + PhysicalI128::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::UInt8(val) => { + PhysicalU8::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::UInt16(val) => { + PhysicalU16::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::UInt32(val) => { + PhysicalU32::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::UInt64(val) => { + PhysicalU64::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::UInt128(val) => { + PhysicalU128::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Float16(val) => { + PhysicalF16::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Float32(val) => { + PhysicalF32::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Float64(val) => { + PhysicalF64::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Decimal64(val) => { + PhysicalI64::get_addressable_mut(data)?.put(idx, &val.value); + } + ScalarValue::Decimal128(val) => { + PhysicalI128::get_addressable_mut(data)?.put(idx, &val.value); + } + ScalarValue::Date32(val) => { + PhysicalI32::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Date64(val) => { + PhysicalI64::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Timestamp(val) => { + PhysicalI64::get_addressable_mut(data)?.put(idx, &val.value); + } + ScalarValue::Interval(val) => { + PhysicalInterval::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Utf8(val) => { + PhysicalUtf8::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::Binary(val) => { + PhysicalBinary::get_addressable_mut(data)?.put(idx, val); + } + ScalarValue::List(list) => { + let secondary = next.data.try_as_mut()?.get_secondary_mut().get_list_mut()?; + + // Ensure we have space to push. + let rem_cap = secondary.child.capacity() - secondary.entries; + if rem_cap < list.len() { + // TODO: Just resize secondary. + return Err(RayexecError::new( + "Secondary list buffer does not have required capacity", + ) + .with_field("remaining", rem_cap) + .with_field("need", list.len())); + } + + for (child_idx, val) in (secondary.entries..).zip(list) { + secondary.child.set_value(child_idx, val)?; + } + + // Now update entry count in child. Original value is our offset + // index. + let start_offset = secondary.entries; + secondary.entries += list.len(); + + // Set metadata pointing to new list. + PhysicalList::get_addressable_mut(next.data.try_as_mut()?)?.put( + idx, + &ListItemMetadata { + offset: start_offset as i32, + len: list.len() as i32, + }, + ); + } + ScalarValue::Struct(_) => not_implemented!("set value for struct"), + } + + Ok(()) + } } impl Array { @@ -307,7 +704,7 @@ impl Array { pub fn new_typed_null_array(datatype: DataType, len: usize) -> Result { // Create physical array data of length 1, and use a selection vector to // extend it out to the desired size. - let data = datatype.physical_type()?.zeroed_array_data(1); + let data = datatype.physical_type().zeroed_array_data(1); let validity = Bitmap::new_with_all_false(1); let selection = SelectionVector::repeated(len, 0); @@ -1120,6 +1517,44 @@ impl From for ArrayData2 { } } +/// Helper for copying rows. +fn copy_rows( + from: &Array, + mapping: impl stdutil::iter::IntoExactSizeIterator, + to: &mut Array, +) -> Result<()> +where + S: MutablePhysicalStorage, + B: BufferManager, +{ + let from_flat = from.flat_view()?; + let from_storage = S::get_addressable(from_flat.array_buffer)?; + + let next = to.next_mut(); + let to_data = next.data.try_as_mut()?; + let mut to_storage = S::get_addressable_mut(to_data)?; + + if from_flat.validity.all_valid() && next.validity.all_valid() { + for (from_idx, to_idx) in mapping.into_iter() { + let from_idx = from_flat.selection.get(from_idx).unwrap(); + let v = from_storage.get(from_idx).unwrap(); + to_storage.put(to_idx, v); + } + } else { + for (from_idx, to_idx) in mapping.into_iter() { + let from_idx = from_flat.selection.get(from_idx).unwrap(); + if from_flat.validity.is_valid(from_idx) { + let v = from_storage.get(from_idx).unwrap(); + to_storage.put(to_idx, v); + } else { + next.validity.set_invalid(to_idx); + } + } + } + + Ok(()) +} + /// Create a new array buffer for a datatype. fn array_buffer_for_datatype( manager: &Arc, @@ -1129,7 +1564,7 @@ fn array_buffer_for_datatype( where B: BufferManager, { - let buffer = match datatype.physical_type()? { + let buffer = match datatype.physical_type() { PhysicalType::UntypedNull => { ArrayBuffer::with_primary_capacity::(manager, capacity)? } @@ -1237,7 +1672,7 @@ macro_rules! impl_primitive_from_iter { }; } -// TODO: Bool +impl_primitive_from_iter!(bool, PhysicalBool, Boolean); impl_primitive_from_iter!(i8, PhysicalI8, Int8); impl_primitive_from_iter!(i16, PhysicalI16, Int16); @@ -1257,10 +1692,25 @@ impl_primitive_from_iter!(f64, PhysicalF64, Float64); impl_primitive_from_iter!(Interval, PhysicalInterval, Interval); -impl<'a> TryFromExactSizeIterator<&'a str> for Array { +/// Trait that provides `AsRef` for use with creating arrays from an +/// iterator. +/// +/// We don't use `AsRef` directly as the implementation of +/// `TryFromExactSizedIterator` could conflict with other types (the above impls +/// for primitives). A separate trait just lets us limit it to just `&str` and +/// `String`. +pub trait AsRefStr: AsRef {} + +impl AsRefStr for &str {} +impl AsRefStr for String {} + +impl TryFromExactSizeIterator for Array +where + S: AsRefStr, +{ type Error = RayexecError; - fn try_from_iter>( + fn try_from_iter>( iter: T, ) -> Result { let iter = iter.into_iter(); @@ -1273,7 +1723,7 @@ impl<'a> TryFromExactSizeIterator<&'a str> for Array { let mut addressable = buffer.try_as_string_view_addressable_mut()?; for (idx, v) in iter.enumerate() { - addressable.put(idx, v); + addressable.put(idx, v.as_ref()); } Ok(Array { @@ -1326,77 +1776,140 @@ where mod tests { use super::*; + use crate::arrays::testutil::assert_arrays_eq; #[test] - fn select_mut_no_change() { - let mut arr = Array::from_iter(["a", "b", "c"]); - let selection = SelectionVector::with_range(0..3); - - arr.select_mut2(selection); + fn select_no_change() { + let mut arr = Array::try_from_iter(["a", "b", "c"]).unwrap(); + arr.select(&Arc::new(NopBufferManager), [0, 1, 2]).unwrap(); - assert_eq!(ScalarValue::from("a"), arr.logical_value(0).unwrap()); - assert_eq!(ScalarValue::from("b"), arr.logical_value(1).unwrap()); - assert_eq!(ScalarValue::from("c"), arr.logical_value(2).unwrap()); + let expected = Array::try_from_iter(["a", "b", "c"]).unwrap(); + assert_arrays_eq(&expected, &arr); } #[test] - fn select_mut_prune_rows() { - let mut arr = Array::from_iter(["a", "b", "c"]); - let selection = SelectionVector::from_iter([0, 2]); + fn select_prune_rows() { + let mut arr = Array::try_from_iter(["a", "b", "c"]).unwrap(); + arr.select(&Arc::new(NopBufferManager), [0, 2]).unwrap(); - arr.select_mut2(selection); + let expected = Array::try_from_iter(["a", "c"]).unwrap(); + assert_arrays_eq(&expected, &arr); + } + + #[test] + fn select_expand_rows() { + let mut arr = Array::try_from_iter(["a", "b", "c"]).unwrap(); + arr.select(&Arc::new(NopBufferManager), [0, 1, 1, 2]) + .unwrap(); - assert_eq!(ScalarValue::from("a"), arr.logical_value(0).unwrap()); - assert_eq!(ScalarValue::from("c"), arr.logical_value(1).unwrap()); - assert!(arr.logical_value(2).is_err()); + let expected = Array::try_from_iter(["a", "b", "b", "c"]).unwrap(); + assert_arrays_eq(&expected, &arr); } #[test] - fn select_mut_expand_rows() { - let mut arr = Array::from_iter(["a", "b", "c"]); - let selection = SelectionVector::from_iter([0, 1, 1, 2]); + fn select_existing_selection() { + let mut arr = Array::try_from_iter(["a", "b", "c"]).unwrap(); + // => ["a", "c"] + arr.select(&Arc::new(NopBufferManager), [0, 2]).unwrap(); - arr.select_mut2(selection); + // => ["c", "c", "a"] + arr.select(&Arc::new(NopBufferManager), [1, 1, 0]).unwrap(); - assert_eq!(ScalarValue::from("a"), arr.logical_value(0).unwrap()); - assert_eq!(ScalarValue::from("b"), arr.logical_value(1).unwrap()); - assert_eq!(ScalarValue::from("b"), arr.logical_value(2).unwrap()); - assert_eq!(ScalarValue::from("c"), arr.logical_value(3).unwrap()); - assert!(arr.logical_value(4).is_err()); + let expected = Array::try_from_iter(["c", "c", "a"]).unwrap(); + assert_arrays_eq(&expected, &arr); } #[test] - fn select_mut_existing_selection() { - let mut arr = Array::from_iter(["a", "b", "c"]); - let selection = SelectionVector::from_iter([0, 2]); + fn get_value_simple() { + let arr = Array::try_from_iter(["a", "b", "c"]).unwrap(); + let val = arr.get_value(1).unwrap(); + assert_eq!(ScalarValue::Utf8("b".into()), val); + } + + #[test] + fn get_value_null() { + let arr = Array::try_from_iter([Some("a"), None, Some("c")]).unwrap(); + + let val = arr.get_value(0).unwrap(); + assert_eq!(ScalarValue::Utf8("a".into()), val); + + let val = arr.get_value(1).unwrap(); + assert_eq!(ScalarValue::Null, val); + } + #[test] + fn get_value_with_selection() { + let mut arr = Array::try_from_iter(["a", "b", "c"]).unwrap(); // => ["a", "c"] - arr.select_mut2(selection); + arr.select(&Arc::new(NopBufferManager), [0, 2]).unwrap(); + let val = arr.get_value(1).unwrap(); + + assert_eq!(ScalarValue::Utf8("c".into()), val); + } + + #[test] + fn copy_rows_simple() { + let from = Array::try_from_iter(["a", "b", "c"]).unwrap(); + let mut to = Array::try_from_iter(["d", "d", "d"]).unwrap(); + + from.copy_rows([(0, 1), (1, 2)], &mut to).unwrap(); - let selection = SelectionVector::from_iter([1, 1, 0]); - arr.select_mut2(selection); + let expected = Array::try_from_iter(["d", "a", "b"]).unwrap(); - assert_eq!(ScalarValue::from("c"), arr.logical_value(0).unwrap()); - assert_eq!(ScalarValue::from("c"), arr.logical_value(1).unwrap()); - assert_eq!(ScalarValue::from("a"), arr.logical_value(2).unwrap()); - assert!(arr.logical_value(3).is_err()); + assert_arrays_eq(&expected, &to); } #[test] - fn scalar_value_logical_eq_i32() { - let arr = Array::from_iter([1, 2, 3]); - let scalar = ScalarValue::Int32(2); + fn copy_rows_from_dict() { + let mut from = Array::try_from_iter(["a", "b", "c"]).unwrap(); + // => '["b", "a", "c"] + from.select(&Arc::new(NopBufferManager), [1, 0, 2]).unwrap(); + + let mut to = Array::try_from_iter(["d", "d", "d"]).unwrap(); + + from.copy_rows([(0, 1), (1, 2)], &mut to).unwrap(); + + let expected = Array::try_from_iter(["d", "b", "a"]).unwrap(); + + assert_arrays_eq(&expected, &to); + } + + #[test] + fn reset_after_clone_from() { + let mut a1 = Array::try_from_iter(["a", "bb", "ccc"]).unwrap(); + let mut a2 = Array::try_from_iter(["d", "ee", "fff"]).unwrap(); + + a1.try_clone_from(&NopBufferManager, &mut a2).unwrap(); + + let expected = Array::try_from_iter(["d", "ee", "fff"]).unwrap(); + assert_arrays_eq(&expected, &a1); + assert_arrays_eq(&expected, &a2); + + a1.reset_for_write(&Arc::new(NopBufferManager)).unwrap(); + + // Ensure we can write to it. + let mut strings = a1 + .next_mut() + .data + .try_as_mut() + .unwrap() + .try_as_string_view_addressable_mut() + .unwrap(); + + strings.put(0, "hello"); + strings.put(1, "world"); + strings.put(2, "goodbye"); - assert!(!arr.scalar_value_logically_eq(&scalar, 0).unwrap()); - assert!(arr.scalar_value_logically_eq(&scalar, 1).unwrap()); + let expected = Array::try_from_iter(["hello", "world", "goodbye"]).unwrap(); + assert_arrays_eq(&expected, &a1); } #[test] - fn scalar_value_logical_eq_null() { - let arr = Array::from_iter([Some(1), None, Some(3)]); - let scalar = ScalarValue::Null; + fn reset_resets_validity() { + let mut a = Array::try_from_iter([Some("a"), None, Some("c")]).unwrap(); + assert!(!a.next().validity.all_valid()); - assert!(!arr.scalar_value_logically_eq(&scalar, 0).unwrap()); - assert!(arr.scalar_value_logically_eq(&scalar, 1).unwrap()); + a.reset_for_write(&Arc::new(NopBufferManager)).unwrap(); + assert!(a.next().validity.all_valid()); } } diff --git a/crates/rayexec_execution/src/arrays/array/physical_type.rs b/crates/rayexec_execution/src/arrays/array/physical_type.rs index 4e305b7b9..baa8ca7a8 100644 --- a/crates/rayexec_execution/src/arrays/array/physical_type.rs +++ b/crates/rayexec_execution/src/arrays/array/physical_type.rs @@ -209,7 +209,7 @@ impl ProtoConv for PhysicalType { } /// Represents an in-memory array that can be indexed into to retrieve values. -pub trait Addressable: Debug { +pub trait Addressable<'a>: Debug { /// The type that get's returned. type T: Send + Debug + ?Sized; @@ -220,10 +220,10 @@ pub trait Addressable: Debug { } /// Get a value at the given index. - fn get(&self, idx: usize) -> Option<&Self::T>; + fn get(&self, idx: usize) -> Option<&'a Self::T>; } -impl Addressable for &[T] +impl<'a, T> Addressable<'a> for &'a [T] where T: Debug + Send, { @@ -233,7 +233,7 @@ where (**self).len() } - fn get(&self, idx: usize) -> Option<&Self::T> { + fn get(&self, idx: usize) -> Option<&'a Self::T> { (**self).get(idx) } } @@ -355,7 +355,7 @@ pub trait PhysicalStorage: Debug + Sync + Send + Clone + Copy + 'static { type StorageType: Sync + Send + ?Sized; /// The type of the addressable storage. - type Addressable<'a>: Addressable; + type Addressable<'a>: Addressable<'a, T = Self::StorageType>; /// Get addressable storage for indexing into the array. fn get_addressable(buffer: &ArrayBuffer) -> Result>; @@ -744,6 +744,16 @@ impl PhysicalStorage for PhysicalList { } } +impl MutablePhysicalStorage for PhysicalList { + type AddressableMut<'a> = &'a mut [Self::StorageType]; + + fn get_addressable_mut( + buffer: &mut ArrayBuffer, + ) -> Result> { + buffer.try_as_slice_mut::() + } +} + /// Dictionary arrays have the selection vector as the primary data buffer. #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub struct PhysicalDictionary; diff --git a/crates/rayexec_execution/src/arrays/array/string_view.rs b/crates/rayexec_execution/src/arrays/array/string_view.rs index 2bfb476cd..1fb2cdff4 100644 --- a/crates/rayexec_execution/src/arrays/array/string_view.rs +++ b/crates/rayexec_execution/src/arrays/array/string_view.rs @@ -8,14 +8,14 @@ pub struct StringViewAddressable<'a> { pub(crate) heap: &'a StringViewHeap, } -impl Addressable for StringViewAddressable<'_> { +impl<'a> Addressable<'a> for StringViewAddressable<'a> { type T = str; fn len(&self) -> usize { self.metadata.len() } - fn get(&self, idx: usize) -> Option<&Self::T> { + fn get(&self, idx: usize) -> Option<&'a Self::T> { let m = self.metadata.get(idx)?; let bs = self.heap.get(m)?; Some(unsafe { std::str::from_utf8_unchecked(bs) }) @@ -54,14 +54,14 @@ pub struct BinaryViewAddressable<'a> { pub(crate) heap: &'a StringViewHeap, } -impl Addressable for BinaryViewAddressable<'_> { +impl<'a> Addressable<'a> for BinaryViewAddressable<'a> { type T = [u8]; fn len(&self) -> usize { self.metadata.len() } - fn get(&self, idx: usize) -> Option<&Self::T> { + fn get(&self, idx: usize) -> Option<&'a Self::T> { let m = self.metadata.get(idx)?; self.heap.get(m) } diff --git a/crates/rayexec_execution/src/arrays/compute/cast/array.rs b/crates/rayexec_execution/src/arrays/compute/cast/array.rs index 87ed46e9b..1c33d41fe 100644 --- a/crates/rayexec_execution/src/arrays/compute/cast/array.rs +++ b/crates/rayexec_execution/src/arrays/compute/cast/array.rs @@ -88,7 +88,7 @@ pub fn cast_array(arr: &Array, to: DataType, behavior: CastFailBehavior) -> Resu let arr = match arr.datatype() { DataType::Null => { // Can cast NULL to anything else. - let data = to.physical_type()?.zeroed_array_data(arr.logical_len()); + let data = to.physical_type().zeroed_array_data(arr.logical_len()); let validity = Bitmap::new_with_all_false(arr.logical_len()); Array::new_with_validity_and_array_data(to, validity, data) } diff --git a/crates/rayexec_execution/src/arrays/datatype.rs b/crates/rayexec_execution/src/arrays/datatype.rs index 1f55f5cf0..8eb04c320 100644 --- a/crates/rayexec_execution/src/arrays/datatype.rs +++ b/crates/rayexec_execution/src/arrays/datatype.rs @@ -1,6 +1,6 @@ use std::fmt; -use rayexec_error::{not_implemented, OptionExt, RayexecError, Result, ResultExt}; +use rayexec_error::{OptionExt, RayexecError, Result, ResultExt}; use rayexec_proto::ProtoConv; use serde::{Deserialize, Serialize}; @@ -445,8 +445,8 @@ impl DataType { } } - pub fn physical_type(&self) -> Result { - Ok(match self { + pub fn physical_type(&self) -> PhysicalType { + match self { DataType::Null => PhysicalType::UntypedNull, DataType::Boolean => PhysicalType::Boolean, DataType::Int8 => PhysicalType::Int8, @@ -470,9 +470,9 @@ impl DataType { DataType::Interval => PhysicalType::Interval, DataType::Utf8 => PhysicalType::Utf8, DataType::Binary => PhysicalType::Binary, - DataType::Struct(_) => not_implemented!("struct data type to physical type"), + DataType::Struct(_) => PhysicalType::Struct, DataType::List(_) => PhysicalType::List, - }) + } } /// Return if this datatype is null. diff --git a/crates/rayexec_execution/src/arrays/executor/scalar/fill.rs b/crates/rayexec_execution/src/arrays/executor/scalar/fill.rs index 85c6c4e19..80aea5110 100644 --- a/crates/rayexec_execution/src/arrays/executor/scalar/fill.rs +++ b/crates/rayexec_execution/src/arrays/executor/scalar/fill.rs @@ -153,7 +153,7 @@ pub(crate) fn concat_with_exact_total_len(arrays: &[&Array], total_len: usize) - None => return Err(RayexecError::new("Cannot concat zero arrays")), }; - match datatype.physical_type()? { + match datatype.physical_type() { PhysicalType::UntypedNull => Ok(Array { datatype: datatype.clone(), selection2: None, @@ -380,7 +380,7 @@ pub fn interleave(arrays: &[&Array], indices: &[(usize, usize)]) -> Result return Err(RayexecError::new("Cannot interleave zero arrays")), }; - match datatype.physical_type()? { + match datatype.physical_type() { PhysicalType::UntypedNull => Ok(Array { datatype: datatype.clone(), selection2: None, diff --git a/crates/rayexec_execution/src/arrays/testutil.rs b/crates/rayexec_execution/src/arrays/testutil.rs index 8a3b9c7a8..39df6ca22 100644 --- a/crates/rayexec_execution/src/arrays/testutil.rs +++ b/crates/rayexec_execution/src/arrays/testutil.rs @@ -68,7 +68,7 @@ pub fn assert_arrays_eq_sel( let flat1 = array1.flat_view().unwrap(); let flat2 = array2.flat_view().unwrap(); - match array1.datatype.physical_type().unwrap() { + match array1.datatype.physical_type() { PhysicalType::Boolean => { assert_arrays_eq_sel_inner::(flat1, sel1, flat2, sel2) } diff --git a/crates/rayexec_execution/src/functions/aggregate/builtin/first.rs b/crates/rayexec_execution/src/functions/aggregate/builtin/first.rs index f5fafa8e8..056666aa9 100644 --- a/crates/rayexec_execution/src/functions/aggregate/builtin/first.rs +++ b/crates/rayexec_execution/src/functions/aggregate/builtin/first.rs @@ -81,7 +81,7 @@ impl AggregateFunction for First { let datatype = inputs[0].datatype(table_list)?; - let function_impl: Box = match datatype.physical_type()? { + let function_impl: Box = match datatype.physical_type() { PhysicalType::UntypedNull => Box::new(FirstUntypedNullImpl), PhysicalType::Boolean => Box::new(FirstBoolImpl), PhysicalType::Float16 => Box::new(FirstPrimitiveImpl::::new( diff --git a/crates/rayexec_execution/src/functions/aggregate/builtin/minmax.rs b/crates/rayexec_execution/src/functions/aggregate/builtin/minmax.rs index eb58f9cc3..5f5290a98 100644 --- a/crates/rayexec_execution/src/functions/aggregate/builtin/minmax.rs +++ b/crates/rayexec_execution/src/functions/aggregate/builtin/minmax.rs @@ -81,7 +81,7 @@ impl AggregateFunction for Min { let datatype = inputs[0].datatype(table_list)?; - let function_impl: Box = match datatype.physical_type()? { + let function_impl: Box = match datatype.physical_type() { PhysicalType::UntypedNull => Box::new(MinMaxUntypedNull), PhysicalType::Boolean => Box::new(MinBoolImpl::new()), PhysicalType::Float16 => { @@ -175,7 +175,7 @@ impl AggregateFunction for Max { let datatype = inputs[0].datatype(table_list)?; - let function_impl: Box = match datatype.physical_type()? { + let function_impl: Box = match datatype.physical_type() { PhysicalType::UntypedNull => Box::new(MinMaxUntypedNull), PhysicalType::Boolean => Box::new(MaxBoolImpl::new()), PhysicalType::Float16 => { diff --git a/crates/rayexec_execution/src/functions/scalar/builtin/comparison.rs b/crates/rayexec_execution/src/functions/scalar/builtin/comparison.rs index 8d3ed61b9..1540d4ef4 100644 --- a/crates/rayexec_execution/src/functions/scalar/builtin/comparison.rs +++ b/crates/rayexec_execution/src/functions/scalar/builtin/comparison.rs @@ -585,7 +585,7 @@ fn new_comparison_impl( } (DataType::List(m1), DataType::List(m2)) if m1 == m2 => { // TODO: We'll want to figure out casting for lists. - Box::new(ListComparisonImpl::::new(m1.datatype.physical_type()?)) + Box::new(ListComparisonImpl::::new(m1.datatype.physical_type())) } (a, b) => return Err(invalid_input_types_error(func, &[a, b])), },