diff --git a/crates/rayexec_execution/src/arrays/array/mod.rs b/crates/rayexec_execution/src/arrays/array/mod.rs index 3d9df31ee..cab757879 100644 --- a/crates/rayexec_execution/src/arrays/array/mod.rs +++ b/crates/rayexec_execution/src/arrays/array/mod.rs @@ -772,15 +772,6 @@ impl Array { self.selection2 = Some(selection.into()) } - pub fn make_shared(&mut self) { - if let Some(validity) = &mut self.validity2 { - validity.make_shared(); - } - if let Some(selection) = &mut self.selection2 { - selection.make_shared() - } - } - /// Updates this array's selection vector. /// /// Takes into account any existing selection. This allows for repeated diff --git a/crates/rayexec_execution/src/arrays/batch.rs b/crates/rayexec_execution/src/arrays/batch.rs index e6d989365..c9bfacaa3 100644 --- a/crates/rayexec_execution/src/arrays/batch.rs +++ b/crates/rayexec_execution/src/arrays/batch.rs @@ -1,7 +1,10 @@ use std::sync::Arc; use rayexec_error::{RayexecError, Result}; +use stdutil::iter::IntoExactSizeIterator; +use super::array::buffer_manager::NopBufferManager; +use super::datatype::DataType; use crate::arrays::array::Array; use crate::arrays::executor::scalar::concat_with_exact_total_len; use crate::arrays::row::ScalarRow; @@ -46,6 +49,28 @@ impl Batch { } } + /// Create a batch by initializing arrays for the given datatypes. + /// + /// Each array will be initialized to hold `capacity` rows. + pub fn try_new( + datatypes: impl IntoExactSizeIterator, + capacity: usize, + ) -> Result { + let datatypes = datatypes.into_iter(); + let mut arrays = Vec::with_capacity(datatypes.len()); + + for datatype in datatypes { + let array = Array::try_new(&Arc::new(NopBufferManager), datatype, capacity)?; + arrays.push(array) + } + + Ok(Batch { + arrays, + num_rows: 0, + capacity, + }) + } + /// Create a new batch from some number of arrays. /// /// All arrays should have the same logical length. @@ -91,19 +116,82 @@ impl Batch { /// /// Errors if `rows` is greater than the capacity of the batch. pub fn set_num_rows(&mut self, rows: usize) -> Result<()> { - if rows > self.capacity { - return Err(RayexecError::new("Number of rows exceeds capacity") - .with_field("capacity", self.capacity) - .with_field("requested_num_rows", rows)); - } + // TODO: Need to solidify what capacity should be with dictionaries. + // if rows > self.capacity { + // return Err(RayexecError::new("Number of rows exceeds capacity") + // .with_field("capacity", self.capacity) + // .with_field("requested_num_rows", rows)); + // } self.num_rows = rows; Ok(()) } + /// Selects rows from the batch based on `selection`. + pub fn select(&mut self, selection: &[usize]) -> Result<()> { + for arr in &mut self.arrays { + arr.select(&Arc::new(NopBufferManager), selection.iter().copied())?; + } + self.set_num_rows(selection.len())?; + + Ok(()) + } + + /// Reset all arrays in the batch for writes. + pub fn reset_for_write(&mut self) -> Result<()> { + for arr in &mut self.arrays { + arr.reset_for_write(&Arc::new(NopBufferManager))?; + } + Ok(()) + } + + /// Copy rows from this batch to another batch. + /// + /// `mapping` provides (from, to) pairs for how to copy the rows. + pub fn copy_rows(&self, mapping: &[(usize, usize)], dest: &mut Self) -> Result<()> { + if self.arrays.len() != dest.arrays.len() { + return Err(RayexecError::new( + "Attempted to copy rows to another batch with invalid number of columns", + )); + } + + for (from, to) in self.arrays.iter().zip(dest.arrays.iter_mut()) { + from.copy_rows(mapping.iter().copied(), to)?; + } + + Ok(()) + } + + /// Appends a batch to the end of self. + /// + /// Errors if this batch doesn't have enough capacity to append the other + /// batch. + pub fn append(&mut self, other: &Batch) -> Result<()> { + if self.num_rows() + other.num_rows() > self.capacity { + return Err( + RayexecError::new("Batch doesn't have sufficient capacity for append") + .with_field("self_rows", self.num_rows()) + .with_field("other_rows", other.num_rows()) + .with_field("self_capacity", self.capacity), + ); + } + + for (from, to) in other.arrays.iter().zip(self.arrays.iter_mut()) { + // [0..batch_num_rows) => [self_row_count..) + let mapping = + (0..other.num_rows()).zip(self.num_rows..(self.num_rows + other.num_rows())); + from.copy_rows(mapping, to)?; + } + + self.num_rows += other.num_rows; + + Ok(()) + } + /// Concat multiple batches into one. /// /// Batches are requried to have the same logical schemas. + #[deprecated] pub fn concat(batches: &[Batch]) -> Result { let num_cols = match batches.first() { Some(batch) => batch.num_arrays(), @@ -146,6 +234,7 @@ impl Batch { } // TODO: Owned variant + #[deprecated] pub fn project(&self, indices: &[usize]) -> Self { let cols = indices .iter() @@ -160,6 +249,7 @@ impl Batch { } // TODO: Remove + #[deprecated] pub fn slice(&self, offset: usize, count: usize) -> Self { let cols = self.arrays.iter().map(|c| c.slice(offset, count)).collect(); Batch { @@ -174,7 +264,8 @@ impl Batch { /// /// This accepts an Arc selection as it'll be cloned for each array in the /// batch. - pub fn select(&self, selection: Arc) -> Batch { + #[deprecated] + pub fn select_old(&self, selection: Arc) -> Batch { let cols = self .arrays .iter() @@ -193,6 +284,7 @@ impl Batch { } /// Get the row at some index. + #[deprecated] pub fn row(&self, idx: usize) -> Option { if idx >= self.num_rows { return None; @@ -231,4 +323,57 @@ impl Batch { pub fn into_arrays(self) -> Vec { self.arrays } + + /// Helper for returning a pretty formatted table for the batch. + /// + /// This should only be used during debugging. + #[cfg(debug_assertions)] + #[allow(unused)] + pub fn debug_table(&self) -> super::format::pretty::table::PrettyTable { + use crate::arrays::field::{Field, Schema}; + use crate::arrays::format::pretty::table::PrettyTable; + + let schema = + Schema::new(self.arrays.iter().enumerate().map(|(idx, array)| { + Field::new(format!("array{idx}"), array.datatype().clone(), true) + })); + + PrettyTable::try_new(&schema, &[self], 100, None) + .expect("to be able to create pretty table") + } +} + +#[cfg(test)] +mod tests { + use stdutil::iter::TryFromExactSizeIterator; + + use super::*; + use crate::arrays::testutil::assert_batches_eq; + + #[test] + fn append_batch_simple() { + let mut batch = Batch::try_new([DataType::Int32, DataType::Utf8], 1024).unwrap(); + + let append1 = Batch::try_from_arrays([ + Array::try_from_iter([1, 2, 3]).unwrap(), + Array::try_from_iter(["a", "b", "c"]).unwrap(), + ]) + .unwrap(); + batch.append(&append1).unwrap(); + + let append2 = Batch::try_from_arrays([ + Array::try_from_iter([4, 5, 6]).unwrap(), + Array::try_from_iter(["d", "e", "f"]).unwrap(), + ]) + .unwrap(); + batch.append(&append2).unwrap(); + + let expected = Batch::try_from_arrays([ + Array::try_from_iter([1, 2, 3, 4, 5, 6]).unwrap(), + Array::try_from_iter(["a", "b", "c", "d", "e", "f"]).unwrap(), + ]) + .unwrap(); + + assert_batches_eq(&expected, &batch); + } } diff --git a/crates/rayexec_execution/src/arrays/format/pretty/table.rs b/crates/rayexec_execution/src/arrays/format/pretty/table.rs index 81f4c2c93..a4a0c55e6 100644 --- a/crates/rayexec_execution/src/arrays/format/pretty/table.rs +++ b/crates/rayexec_execution/src/arrays/format/pretty/table.rs @@ -1,3 +1,4 @@ +use std::borrow::Borrow; use std::collections::HashMap; use std::fmt::{self, Write as _}; use std::ops::Range; @@ -38,9 +39,9 @@ pub struct PrettyTable { impl PrettyTable { /// Try to create a new pretty-formatted table. - pub fn try_new( + pub fn try_new>( schema: &Schema, - batches: &[Batch], + batches: &[B], max_width: usize, max_rows: Option, ) -> Result { @@ -86,6 +87,7 @@ impl PrettyTable { // to help determine the size of the columns. let samples = match batches.first() { Some(batch) => batch + .borrow() .arrays() .iter() .map(|col| ColumnValues::try_from_array(col, Some(0..NUM_VALS_FOR_AVG), None)) @@ -148,7 +150,7 @@ impl PrettyTable { column_widths.insert(elide_index(&column_widths), 1); } - let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + let total_rows: usize = batches.iter().map(|b| b.borrow().num_rows()).sum(); let max_rows = max_rows.unwrap_or(DEFAULT_MAX_ROWS); let (mut head_rows, mut tail_rows) = if total_rows > max_rows { @@ -190,7 +192,8 @@ impl PrettyTable { break; } - let (vals, num_rows) = Self::column_values_for_batch(batch, &format, 0..head_rows)?; + let (vals, num_rows) = + Self::column_values_for_batch(batch.borrow(), &format, 0..head_rows)?; head.push(PrettyValues::new( col_alignments.clone(), column_widths.clone(), @@ -206,13 +209,13 @@ impl PrettyTable { break; } - let num_rows = batch.num_rows(); + let num_rows = batch.borrow().num_rows(); let range = if tail_rows >= num_rows { 0..num_rows } else { (num_rows - tail_rows)..num_rows }; - let (vals, num_rows) = Self::column_values_for_batch(batch, &format, range)?; + let (vals, num_rows) = Self::column_values_for_batch(batch.borrow(), &format, range)?; tail.push(PrettyValues::new( col_alignments.clone(), column_widths.clone(), diff --git a/crates/rayexec_execution/src/execution/intermediate/planner/plan_scan.rs b/crates/rayexec_execution/src/execution/intermediate/planner/plan_scan.rs index c49db5784..f3b2f211e 100644 --- a/crates/rayexec_execution/src/execution/intermediate/planner/plan_scan.rs +++ b/crates/rayexec_execution/src/execution/intermediate/planner/plan_scan.rs @@ -16,6 +16,7 @@ use crate::logical::operator::Node; use crate::storage::table_storage::Projections; impl IntermediatePipelineBuildState<'_> { + #[allow(deprecated)] pub fn plan_scan(&mut self, id_gen: &mut PipelineIdGen, scan: Node) -> Result<()> { let location = scan.location; @@ -73,6 +74,7 @@ impl IntermediatePipelineBuildState<'_> { Ok(()) } + #[allow(deprecated)] fn create_batches_for_row_values( &self, projections: Projections, diff --git a/crates/rayexec_execution/src/execution/operators/filter.rs b/crates/rayexec_execution/src/execution/operators/filter.rs index 3315fcbf5..7871f609a 100644 --- a/crates/rayexec_execution/src/execution/operators/filter.rs +++ b/crates/rayexec_execution/src/execution/operators/filter.rs @@ -23,9 +23,10 @@ impl FilterOperation { } impl StatelessOperation for FilterOperation { + #[allow(deprecated)] fn execute(&self, batch: Batch) -> Result { let selection = self.predicate.select(&batch)?; - let batch = batch.select(Arc::new(selection)); // TODO: Select mut + let batch = batch.select_old(Arc::new(selection)); // TODO: Select mut Ok(batch) } diff --git a/crates/rayexec_execution/src/execution/operators/hash_join/condition.rs b/crates/rayexec_execution/src/execution/operators/hash_join/condition.rs index 4d6d97ebb..61cefde53 100644 --- a/crates/rayexec_execution/src/execution/operators/hash_join/condition.rs +++ b/crates/rayexec_execution/src/execution/operators/hash_join/condition.rs @@ -91,6 +91,7 @@ impl LeftPrecomputedJoinConditions { /// /// The output is the (left, right) selection vectors to use for the final /// output batch. + #[allow(deprecated)] pub fn compute_selection_for_probe( &self, left_batch_idx: usize, @@ -106,7 +107,7 @@ impl LeftPrecomputedJoinConditions { let mut results = Vec::with_capacity(self.conditions.len()); // Select rows from the right batch. - let selected_right = right.select(right_row_sel.clone()); + let selected_right = right.select_old(right_row_sel.clone()); for condition in &self.conditions { let mut left_precomputed = condition diff --git a/crates/rayexec_execution/src/execution/operators/hash_join/global_hash_table.rs b/crates/rayexec_execution/src/execution/operators/hash_join/global_hash_table.rs index edf0fbf88..b2214894b 100644 --- a/crates/rayexec_execution/src/execution/operators/hash_join/global_hash_table.rs +++ b/crates/rayexec_execution/src/execution/operators/hash_join/global_hash_table.rs @@ -124,6 +124,7 @@ impl GlobalHashTable { } /// Probe the table. + #[allow(deprecated)] pub fn probe( &self, right: &Batch, @@ -222,10 +223,10 @@ impl GlobalHashTable { // Get the left columns for this batch. let left_batch = self.batches.get(batch_idx).expect("batch to exist"); - let left_cols = left_batch.select(Arc::new(left_row_sel)).into_arrays(); + let left_cols = left_batch.select_old(Arc::new(left_row_sel)).into_arrays(); // Get the right. - let right_cols = right.select(Arc::new(right_row_sel)).into_arrays(); + let right_cols = right.select_old(Arc::new(right_row_sel)).into_arrays(); // Create final batch. let batch = Batch::try_from_arrays(left_cols.into_iter().chain(right_cols))?; diff --git a/crates/rayexec_execution/src/execution/operators/limit.rs b/crates/rayexec_execution/src/execution/operators/limit.rs index ee0fca6ff..30fb9462b 100644 --- a/crates/rayexec_execution/src/execution/operators/limit.rs +++ b/crates/rayexec_execution/src/execution/operators/limit.rs @@ -88,6 +88,7 @@ impl ExecutableOperator for PhysicalLimit { }) } + #[allow(deprecated)] fn poll_push( &self, cx: &mut Context, diff --git a/crates/rayexec_execution/src/execution/operators/nl_join.rs b/crates/rayexec_execution/src/execution/operators/nl_join.rs index f048aa4e8..9c57e3ba0 100644 --- a/crates/rayexec_execution/src/execution/operators/nl_join.rs +++ b/crates/rayexec_execution/src/execution/operators/nl_join.rs @@ -419,6 +419,7 @@ impl ExecutableOperator for PhysicalNestedLoopJoin { /// Generate a cross product of two batches, applying an optional filter to the /// result. +#[allow(deprecated)] fn cross_join( left_batch_idx: usize, left: &Batch, @@ -435,7 +436,7 @@ fn cross_join( let selection = SelectionVector::repeated(right.num_rows(), left_idx); // Columns from the left, one row repeated. - let left_columns = left.select(Arc::new(selection)).into_arrays(); + let left_columns = left.select_old(Arc::new(selection)).into_arrays(); // Columns from the right, all rows. let right_columns = right.clone().into_arrays(); @@ -444,7 +445,7 @@ fn cross_join( // If we have a filter, apply it to the output batch. if let Some(filter_expr) = &filter_expr { let selection = Arc::new(filter_expr.select(&output)?); - output = output.select(selection.clone()); + output = output.select_old(selection.clone()); // If we're left joining, compute indices in the left batch that we // visited. diff --git a/crates/rayexec_execution/src/execution/operators/util/outer_join_tracker.rs b/crates/rayexec_execution/src/execution/operators/util/outer_join_tracker.rs index 8ba2a93ee..16eae9556 100644 --- a/crates/rayexec_execution/src/execution/operators/util/outer_join_tracker.rs +++ b/crates/rayexec_execution/src/execution/operators/util/outer_join_tracker.rs @@ -120,6 +120,7 @@ impl LeftOuterJoinDrainState { /// /// This will filter out rows that have been visited, and join the remaining /// rows will null columns on the right. + #[allow(deprecated)] pub fn drain_next(&mut self) -> Result> { loop { let batch = match self.batches.get(self.batch_idx) { @@ -148,7 +149,7 @@ impl LeftOuterJoinDrainState { continue; } - let left_cols = batch.select(Arc::new(selection)).into_arrays(); + let left_cols = batch.select_old(Arc::new(selection)).into_arrays(); let right_cols = self .right_types .iter() @@ -161,6 +162,7 @@ impl LeftOuterJoinDrainState { } } + #[allow(deprecated)] pub fn drain_semi_next(&mut self) -> Result> { loop { let batch = match self.batches.get(self.batch_idx) { @@ -183,7 +185,7 @@ impl LeftOuterJoinDrainState { continue; } - let left_cols = batch.select(Arc::new(selection)).into_arrays(); + let left_cols = batch.select_old(Arc::new(selection)).into_arrays(); let right_cols = self .right_types .iter() @@ -230,6 +232,7 @@ impl RightOuterJoinTracker { /// the batch. /// /// Returns None if all row on the right were visited. + #[allow(deprecated)] pub fn into_unvisited(self, left_types: &[DataType], right: &Batch) -> Result> { let selection = SelectionVector::from_iter(self.unvisited.index_iter()); let num_rows = selection.num_rows(); @@ -237,7 +240,7 @@ impl RightOuterJoinTracker { return Ok(None); } - let right_cols = right.select(Arc::new(selection)).into_arrays(); + let right_cols = right.select_old(Arc::new(selection)).into_arrays(); let left_null_cols = left_types .iter() diff --git a/crates/rayexec_execution/src/execution/operators/util/resizer.rs b/crates/rayexec_execution/src/execution/operators/util/resizer.rs index 3e63a27b5..e50e07d2d 100644 --- a/crates/rayexec_execution/src/execution/operators/util/resizer.rs +++ b/crates/rayexec_execution/src/execution/operators/util/resizer.rs @@ -36,6 +36,7 @@ impl BatchResizer { /// Typically this will return either no batches or a single batch. However /// there is a case where this can return multiple batches if 'len(input) + /// pending_row_count > target * 2' (aka very large input batch). + #[allow(deprecated)] pub fn try_push(&mut self, batch: Batch) -> Result { if batch.num_rows() == 0 { return Ok(ComputedBatches::None); @@ -61,8 +62,8 @@ impl BatchResizer { let sel_a = SelectionVector::with_range(0..diff); let sel_b = SelectionVector::with_range(diff..batch.num_rows()); - let batch_a = batch.select(Arc::new(sel_a)); - let batch_b = batch.select(Arc::new(sel_b)); + let batch_a = batch.select_old(Arc::new(sel_a)); + let batch_b = batch.select_old(Arc::new(sel_b)); self.pending.push(batch_a); @@ -102,6 +103,7 @@ impl BatchResizer { Ok(ComputedBatches::None) } + #[allow(deprecated)] pub fn flush_remaining(&mut self) -> Result { if self.pending_row_count == 0 { return Ok(ComputedBatches::None); diff --git a/crates/rayexec_execution/src/expr/physical/case_expr.rs b/crates/rayexec_execution/src/expr/physical/case_expr.rs index ed11f0c7a..016877142 100644 --- a/crates/rayexec_execution/src/expr/physical/case_expr.rs +++ b/crates/rayexec_execution/src/expr/physical/case_expr.rs @@ -30,6 +30,7 @@ pub struct PhysicalCaseExpr { } impl PhysicalCaseExpr { + #[allow(deprecated)] pub fn eval<'a>(&self, batch: &'a Batch) -> Result> { let mut arrays = Vec::new(); let mut indices: Vec<(usize, usize)> = (0..batch.num_rows()).map(|_| (0, 0)).collect(); @@ -46,7 +47,7 @@ impl PhysicalCaseExpr { let selection = Arc::new(SelectionVector::from_iter(remaining.index_iter())); // Get batch with only remaining rows that we should consider. - let selected_batch = batch.select(selection.clone()); + let selected_batch = batch.select_old(selection.clone()); // Execute 'when'. let selected = case.when.eval(&selected_batch)?; @@ -56,7 +57,7 @@ impl PhysicalCaseExpr { SelectExecutor::select(&selected, &mut trues_sel)?; // Select rows in batch to execute on based on 'trues'. - let execute_batch = selected_batch.select(Arc::new(trues_sel.clone())); + let execute_batch = selected_batch.select_old(Arc::new(trues_sel.clone())); let output = case.then.eval(&execute_batch)?; // Store array for later interleaving. @@ -81,7 +82,7 @@ impl PhysicalCaseExpr { // Do all remaining rows. if remaining.count_trues() != 0 { let selection = Arc::new(SelectionVector::from_iter(remaining.index_iter())); - let remaining_batch = batch.select(selection.clone()); + let remaining_batch = batch.select_old(selection.clone()); let output = self.else_expr.eval(&remaining_batch)?; let array_idx = arrays.len(); diff --git a/crates/rayexec_execution/src/storage/table_storage.rs b/crates/rayexec_execution/src/storage/table_storage.rs index 258f69cff..3f3fcc7f4 100644 --- a/crates/rayexec_execution/src/storage/table_storage.rs +++ b/crates/rayexec_execution/src/storage/table_storage.rs @@ -118,6 +118,7 @@ impl ProjectedScan { ProjectedScan { projections, scan } } + #[allow(deprecated)] async fn pull_inner(&mut self) -> Result> { let batch = match self.scan.pull().await? { Some(batch) => batch, diff --git a/crates/rayexec_shell/src/result_table.rs b/crates/rayexec_shell/src/result_table.rs index 6124601be..90e25bba8 100644 --- a/crates/rayexec_shell/src/result_table.rs +++ b/crates/rayexec_shell/src/result_table.rs @@ -234,6 +234,7 @@ pub struct MaterializedRowIter<'a> { impl<'a> Iterator for MaterializedRowIter<'a> { type Item = ScalarRow<'a>; + #[allow(deprecated)] fn next(&mut self) -> Option { loop { let batch = self.table.batches.get(self.batch_idx)?;