Skip to content

Commit

Permalink
chore: Add more batch methods (#3400)
Browse files Browse the repository at this point in the history
Split from mutable buffers branch
  • Loading branch information
scsmithr authored Jan 14, 2025
1 parent e69750f commit ac8a38c
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 35 deletions.
9 changes: 0 additions & 9 deletions crates/rayexec_execution/src/arrays/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,15 +772,6 @@ impl Array {
self.selection2 = Some(selection.into())
}

pub fn make_shared(&mut self) {
if let Some(validity) = &mut self.validity2 {
validity.make_shared();
}
if let Some(selection) = &mut self.selection2 {
selection.make_shared()
}
}

/// Updates this array's selection vector.
///
/// Takes into account any existing selection. This allows for repeated
Expand Down
157 changes: 151 additions & 6 deletions crates/rayexec_execution/src/arrays/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::Arc;

use rayexec_error::{RayexecError, Result};
use stdutil::iter::IntoExactSizeIterator;

use super::array::buffer_manager::NopBufferManager;
use super::datatype::DataType;
use crate::arrays::array::Array;
use crate::arrays::executor::scalar::concat_with_exact_total_len;
use crate::arrays::row::ScalarRow;
Expand Down Expand Up @@ -46,6 +49,28 @@ impl Batch {
}
}

/// Create a batch by initializing arrays for the given datatypes.
///
/// Each array will be initialized to hold `capacity` rows.
pub fn try_new(
datatypes: impl IntoExactSizeIterator<Item = DataType>,
capacity: usize,
) -> Result<Self> {
let datatypes = datatypes.into_iter();
let mut arrays = Vec::with_capacity(datatypes.len());

for datatype in datatypes {
let array = Array::try_new(&Arc::new(NopBufferManager), datatype, capacity)?;
arrays.push(array)
}

Ok(Batch {
arrays,
num_rows: 0,
capacity,
})
}

/// Create a new batch from some number of arrays.
///
/// All arrays should have the same logical length.
Expand Down Expand Up @@ -91,19 +116,82 @@ impl Batch {
///
/// Errors if `rows` is greater than the capacity of the batch.
pub fn set_num_rows(&mut self, rows: usize) -> Result<()> {
if rows > self.capacity {
return Err(RayexecError::new("Number of rows exceeds capacity")
.with_field("capacity", self.capacity)
.with_field("requested_num_rows", rows));
}
// TODO: Need to solidify what capacity should be with dictionaries.
// if rows > self.capacity {
// return Err(RayexecError::new("Number of rows exceeds capacity")
// .with_field("capacity", self.capacity)
// .with_field("requested_num_rows", rows));
// }
self.num_rows = rows;

Ok(())
}

/// Selects rows from the batch based on `selection`.
pub fn select(&mut self, selection: &[usize]) -> Result<()> {
for arr in &mut self.arrays {
arr.select(&Arc::new(NopBufferManager), selection.iter().copied())?;
}
self.set_num_rows(selection.len())?;

Ok(())
}

/// Reset all arrays in the batch for writes.
pub fn reset_for_write(&mut self) -> Result<()> {
for arr in &mut self.arrays {
arr.reset_for_write(&Arc::new(NopBufferManager))?;
}
Ok(())
}

/// Copy rows from this batch to another batch.
///
/// `mapping` provides (from, to) pairs for how to copy the rows.
pub fn copy_rows(&self, mapping: &[(usize, usize)], dest: &mut Self) -> Result<()> {
if self.arrays.len() != dest.arrays.len() {
return Err(RayexecError::new(
"Attempted to copy rows to another batch with invalid number of columns",
));
}

for (from, to) in self.arrays.iter().zip(dest.arrays.iter_mut()) {
from.copy_rows(mapping.iter().copied(), to)?;
}

Ok(())
}

/// Appends a batch to the end of self.
///
/// Errors if this batch doesn't have enough capacity to append the other
/// batch.
pub fn append(&mut self, other: &Batch) -> Result<()> {
if self.num_rows() + other.num_rows() > self.capacity {
return Err(
RayexecError::new("Batch doesn't have sufficient capacity for append")
.with_field("self_rows", self.num_rows())
.with_field("other_rows", other.num_rows())
.with_field("self_capacity", self.capacity),
);
}

for (from, to) in other.arrays.iter().zip(self.arrays.iter_mut()) {
// [0..batch_num_rows) => [self_row_count..)
let mapping =
(0..other.num_rows()).zip(self.num_rows..(self.num_rows + other.num_rows()));
from.copy_rows(mapping, to)?;
}

self.num_rows += other.num_rows;

Ok(())
}

/// Concat multiple batches into one.
///
/// Batches are requried to have the same logical schemas.
#[deprecated]
pub fn concat(batches: &[Batch]) -> Result<Self> {
let num_cols = match batches.first() {
Some(batch) => batch.num_arrays(),
Expand Down Expand Up @@ -146,6 +234,7 @@ impl Batch {
}

// TODO: Owned variant
#[deprecated]
pub fn project(&self, indices: &[usize]) -> Self {
let cols = indices
.iter()
Expand All @@ -160,6 +249,7 @@ impl Batch {
}

// TODO: Remove
#[deprecated]
pub fn slice(&self, offset: usize, count: usize) -> Self {
let cols = self.arrays.iter().map(|c| c.slice(offset, count)).collect();
Batch {
Expand All @@ -174,7 +264,8 @@ impl Batch {
///
/// This accepts an Arc selection as it'll be cloned for each array in the
/// batch.
pub fn select(&self, selection: Arc<SelectionVector>) -> Batch {
#[deprecated]
pub fn select_old(&self, selection: Arc<SelectionVector>) -> Batch {
let cols = self
.arrays
.iter()
Expand All @@ -193,6 +284,7 @@ impl Batch {
}

/// Get the row at some index.
#[deprecated]
pub fn row(&self, idx: usize) -> Option<ScalarRow> {
if idx >= self.num_rows {
return None;
Expand Down Expand Up @@ -231,4 +323,57 @@ impl Batch {
pub fn into_arrays(self) -> Vec<Array> {
self.arrays
}

/// Helper for returning a pretty formatted table for the batch.
///
/// This should only be used during debugging.
#[cfg(debug_assertions)]
#[allow(unused)]
pub fn debug_table(&self) -> super::format::pretty::table::PrettyTable {
use crate::arrays::field::{Field, Schema};
use crate::arrays::format::pretty::table::PrettyTable;

let schema =
Schema::new(self.arrays.iter().enumerate().map(|(idx, array)| {
Field::new(format!("array{idx}"), array.datatype().clone(), true)
}));

PrettyTable::try_new(&schema, &[self], 100, None)
.expect("to be able to create pretty table")
}
}

#[cfg(test)]
mod tests {
use stdutil::iter::TryFromExactSizeIterator;

use super::*;
use crate::arrays::testutil::assert_batches_eq;

#[test]
fn append_batch_simple() {
let mut batch = Batch::try_new([DataType::Int32, DataType::Utf8], 1024).unwrap();

let append1 = Batch::try_from_arrays([
Array::try_from_iter([1, 2, 3]).unwrap(),
Array::try_from_iter(["a", "b", "c"]).unwrap(),
])
.unwrap();
batch.append(&append1).unwrap();

let append2 = Batch::try_from_arrays([
Array::try_from_iter([4, 5, 6]).unwrap(),
Array::try_from_iter(["d", "e", "f"]).unwrap(),
])
.unwrap();
batch.append(&append2).unwrap();

let expected = Batch::try_from_arrays([
Array::try_from_iter([1, 2, 3, 4, 5, 6]).unwrap(),
Array::try_from_iter(["a", "b", "c", "d", "e", "f"]).unwrap(),
])
.unwrap();

assert_batches_eq(&expected, &batch);
}
}
15 changes: 9 additions & 6 deletions crates/rayexec_execution/src/arrays/format/pretty/table.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::{self, Write as _};
use std::ops::Range;
Expand Down Expand Up @@ -38,9 +39,9 @@ pub struct PrettyTable {

impl PrettyTable {
/// Try to create a new pretty-formatted table.
pub fn try_new(
pub fn try_new<B: Borrow<Batch>>(
schema: &Schema,
batches: &[Batch],
batches: &[B],
max_width: usize,
max_rows: Option<usize>,
) -> Result<Self> {
Expand Down Expand Up @@ -86,6 +87,7 @@ impl PrettyTable {
// to help determine the size of the columns.
let samples = match batches.first() {
Some(batch) => batch
.borrow()
.arrays()
.iter()
.map(|col| ColumnValues::try_from_array(col, Some(0..NUM_VALS_FOR_AVG), None))
Expand Down Expand Up @@ -148,7 +150,7 @@ impl PrettyTable {
column_widths.insert(elide_index(&column_widths), 1);
}

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let total_rows: usize = batches.iter().map(|b| b.borrow().num_rows()).sum();
let max_rows = max_rows.unwrap_or(DEFAULT_MAX_ROWS);

let (mut head_rows, mut tail_rows) = if total_rows > max_rows {
Expand Down Expand Up @@ -190,7 +192,8 @@ impl PrettyTable {
break;
}

let (vals, num_rows) = Self::column_values_for_batch(batch, &format, 0..head_rows)?;
let (vals, num_rows) =
Self::column_values_for_batch(batch.borrow(), &format, 0..head_rows)?;
head.push(PrettyValues::new(
col_alignments.clone(),
column_widths.clone(),
Expand All @@ -206,13 +209,13 @@ impl PrettyTable {
break;
}

let num_rows = batch.num_rows();
let num_rows = batch.borrow().num_rows();
let range = if tail_rows >= num_rows {
0..num_rows
} else {
(num_rows - tail_rows)..num_rows
};
let (vals, num_rows) = Self::column_values_for_batch(batch, &format, range)?;
let (vals, num_rows) = Self::column_values_for_batch(batch.borrow(), &format, range)?;
tail.push(PrettyValues::new(
col_alignments.clone(),
column_widths.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::logical::operator::Node;
use crate::storage::table_storage::Projections;

impl IntermediatePipelineBuildState<'_> {
#[allow(deprecated)]
pub fn plan_scan(&mut self, id_gen: &mut PipelineIdGen, scan: Node<LogicalScan>) -> Result<()> {
let location = scan.location;

Expand Down Expand Up @@ -73,6 +74,7 @@ impl IntermediatePipelineBuildState<'_> {
Ok(())
}

#[allow(deprecated)]
fn create_batches_for_row_values(
&self,
projections: Projections,
Expand Down
3 changes: 2 additions & 1 deletion crates/rayexec_execution/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ impl FilterOperation {
}

impl StatelessOperation for FilterOperation {
#[allow(deprecated)]
fn execute(&self, batch: Batch) -> Result<Batch> {
let selection = self.predicate.select(&batch)?;
let batch = batch.select(Arc::new(selection)); // TODO: Select mut
let batch = batch.select_old(Arc::new(selection)); // TODO: Select mut

Ok(batch)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl LeftPrecomputedJoinConditions {
///
/// The output is the (left, right) selection vectors to use for the final
/// output batch.
#[allow(deprecated)]
pub fn compute_selection_for_probe(
&self,
left_batch_idx: usize,
Expand All @@ -106,7 +107,7 @@ impl LeftPrecomputedJoinConditions {
let mut results = Vec::with_capacity(self.conditions.len());

// Select rows from the right batch.
let selected_right = right.select(right_row_sel.clone());
let selected_right = right.select_old(right_row_sel.clone());

for condition in &self.conditions {
let mut left_precomputed = condition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl GlobalHashTable {
}

/// Probe the table.
#[allow(deprecated)]
pub fn probe(
&self,
right: &Batch,
Expand Down Expand Up @@ -222,10 +223,10 @@ impl GlobalHashTable {

// Get the left columns for this batch.
let left_batch = self.batches.get(batch_idx).expect("batch to exist");
let left_cols = left_batch.select(Arc::new(left_row_sel)).into_arrays();
let left_cols = left_batch.select_old(Arc::new(left_row_sel)).into_arrays();

// Get the right.
let right_cols = right.select(Arc::new(right_row_sel)).into_arrays();
let right_cols = right.select_old(Arc::new(right_row_sel)).into_arrays();

// Create final batch.
let batch = Batch::try_from_arrays(left_cols.into_iter().chain(right_cols))?;
Expand Down
1 change: 1 addition & 0 deletions crates/rayexec_execution/src/execution/operators/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl ExecutableOperator for PhysicalLimit {
})
}

#[allow(deprecated)]
fn poll_push(
&self,
cx: &mut Context,
Expand Down
5 changes: 3 additions & 2 deletions crates/rayexec_execution/src/execution/operators/nl_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ impl ExecutableOperator for PhysicalNestedLoopJoin {

/// Generate a cross product of two batches, applying an optional filter to the
/// result.
#[allow(deprecated)]
fn cross_join(
left_batch_idx: usize,
left: &Batch,
Expand All @@ -435,7 +436,7 @@ fn cross_join(
let selection = SelectionVector::repeated(right.num_rows(), left_idx);

// Columns from the left, one row repeated.
let left_columns = left.select(Arc::new(selection)).into_arrays();
let left_columns = left.select_old(Arc::new(selection)).into_arrays();
// Columns from the right, all rows.
let right_columns = right.clone().into_arrays();

Expand All @@ -444,7 +445,7 @@ fn cross_join(
// If we have a filter, apply it to the output batch.
if let Some(filter_expr) = &filter_expr {
let selection = Arc::new(filter_expr.select(&output)?);
output = output.select(selection.clone());
output = output.select_old(selection.clone());

// If we're left joining, compute indices in the left batch that we
// visited.
Expand Down
Loading

0 comments on commit ac8a38c

Please sign in to comment.