Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add more batch methods #3400

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions crates/rayexec_execution/src/arrays/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,15 +772,6 @@ impl Array {
self.selection2 = Some(selection.into())
}

pub fn make_shared(&mut self) {
if let Some(validity) = &mut self.validity2 {
validity.make_shared();
}
if let Some(selection) = &mut self.selection2 {
selection.make_shared()
}
}

/// Updates this array's selection vector.
///
/// Takes into account any existing selection. This allows for repeated
Expand Down
157 changes: 151 additions & 6 deletions crates/rayexec_execution/src/arrays/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::Arc;

use rayexec_error::{RayexecError, Result};
use stdutil::iter::IntoExactSizeIterator;

use super::array::buffer_manager::NopBufferManager;
use super::datatype::DataType;
use crate::arrays::array::Array;
use crate::arrays::executor::scalar::concat_with_exact_total_len;
use crate::arrays::row::ScalarRow;
Expand Down Expand Up @@ -46,6 +49,28 @@ impl Batch {
}
}

/// Create a batch by initializing arrays for the given datatypes.
///
/// Each array will be initialized to hold `capacity` rows.
pub fn try_new(
datatypes: impl IntoExactSizeIterator<Item = DataType>,
capacity: usize,
) -> Result<Self> {
let datatypes = datatypes.into_iter();
let mut arrays = Vec::with_capacity(datatypes.len());

for datatype in datatypes {
let array = Array::try_new(&Arc::new(NopBufferManager), datatype, capacity)?;
arrays.push(array)
}

Ok(Batch {
arrays,
num_rows: 0,
capacity,
})
}

/// Create a new batch from some number of arrays.
///
/// All arrays should have the same logical length.
Expand Down Expand Up @@ -91,19 +116,82 @@ impl Batch {
///
/// Errors if `rows` is greater than the capacity of the batch.
pub fn set_num_rows(&mut self, rows: usize) -> Result<()> {
if rows > self.capacity {
return Err(RayexecError::new("Number of rows exceeds capacity")
.with_field("capacity", self.capacity)
.with_field("requested_num_rows", rows));
}
// TODO: Need to solidify what capacity should be with dictionaries.
// if rows > self.capacity {
// return Err(RayexecError::new("Number of rows exceeds capacity")
// .with_field("capacity", self.capacity)
// .with_field("requested_num_rows", rows));
// }
self.num_rows = rows;

Ok(())
}

/// Selects rows from the batch based on `selection`.
pub fn select(&mut self, selection: &[usize]) -> Result<()> {
for arr in &mut self.arrays {
arr.select(&Arc::new(NopBufferManager), selection.iter().copied())?;
}
self.set_num_rows(selection.len())?;

Ok(())
}

/// Reset all arrays in the batch for writes.
pub fn reset_for_write(&mut self) -> Result<()> {
for arr in &mut self.arrays {
arr.reset_for_write(&Arc::new(NopBufferManager))?;
}
Ok(())
}

/// Copy rows from this batch to another batch.
///
/// `mapping` provides (from, to) pairs for how to copy the rows.
pub fn copy_rows(&self, mapping: &[(usize, usize)], dest: &mut Self) -> Result<()> {
if self.arrays.len() != dest.arrays.len() {
return Err(RayexecError::new(
"Attempted to copy rows to another batch with invalid number of columns",
));
}

for (from, to) in self.arrays.iter().zip(dest.arrays.iter_mut()) {
from.copy_rows(mapping.iter().copied(), to)?;
}

Ok(())
}

/// Appends a batch to the end of self.
///
/// Errors if this batch doesn't have enough capacity to append the other
/// batch.
pub fn append(&mut self, other: &Batch) -> Result<()> {
if self.num_rows() + other.num_rows() > self.capacity {
return Err(
RayexecError::new("Batch doesn't have sufficient capacity for append")
.with_field("self_rows", self.num_rows())
.with_field("other_rows", other.num_rows())
.with_field("self_capacity", self.capacity),
);
}

for (from, to) in other.arrays.iter().zip(self.arrays.iter_mut()) {
// [0..batch_num_rows) => [self_row_count..)
let mapping =
(0..other.num_rows()).zip(self.num_rows..(self.num_rows + other.num_rows()));
from.copy_rows(mapping, to)?;
}

self.num_rows += other.num_rows;

Ok(())
}

/// Concat multiple batches into one.
///
/// Batches are requried to have the same logical schemas.
#[deprecated]
pub fn concat(batches: &[Batch]) -> Result<Self> {
let num_cols = match batches.first() {
Some(batch) => batch.num_arrays(),
Expand Down Expand Up @@ -146,6 +234,7 @@ impl Batch {
}

// TODO: Owned variant
#[deprecated]
pub fn project(&self, indices: &[usize]) -> Self {
let cols = indices
.iter()
Expand All @@ -160,6 +249,7 @@ impl Batch {
}

// TODO: Remove
#[deprecated]
pub fn slice(&self, offset: usize, count: usize) -> Self {
let cols = self.arrays.iter().map(|c| c.slice(offset, count)).collect();
Batch {
Expand All @@ -174,7 +264,8 @@ impl Batch {
///
/// This accepts an Arc selection as it'll be cloned for each array in the
/// batch.
pub fn select(&self, selection: Arc<SelectionVector>) -> Batch {
#[deprecated]
pub fn select_old(&self, selection: Arc<SelectionVector>) -> Batch {
let cols = self
.arrays
.iter()
Expand All @@ -193,6 +284,7 @@ impl Batch {
}

/// Get the row at some index.
#[deprecated]
pub fn row(&self, idx: usize) -> Option<ScalarRow> {
if idx >= self.num_rows {
return None;
Expand Down Expand Up @@ -231,4 +323,57 @@ impl Batch {
pub fn into_arrays(self) -> Vec<Array> {
self.arrays
}

/// Helper for returning a pretty formatted table for the batch.
///
/// This should only be used during debugging.
#[cfg(debug_assertions)]
#[allow(unused)]
pub fn debug_table(&self) -> super::format::pretty::table::PrettyTable {
use crate::arrays::field::{Field, Schema};
use crate::arrays::format::pretty::table::PrettyTable;

let schema =
Schema::new(self.arrays.iter().enumerate().map(|(idx, array)| {
Field::new(format!("array{idx}"), array.datatype().clone(), true)
}));

PrettyTable::try_new(&schema, &[self], 100, None)
.expect("to be able to create pretty table")
}
}

#[cfg(test)]
mod tests {
use stdutil::iter::TryFromExactSizeIterator;

use super::*;
use crate::arrays::testutil::assert_batches_eq;

#[test]
fn append_batch_simple() {
let mut batch = Batch::try_new([DataType::Int32, DataType::Utf8], 1024).unwrap();

let append1 = Batch::try_from_arrays([
Array::try_from_iter([1, 2, 3]).unwrap(),
Array::try_from_iter(["a", "b", "c"]).unwrap(),
])
.unwrap();
batch.append(&append1).unwrap();

let append2 = Batch::try_from_arrays([
Array::try_from_iter([4, 5, 6]).unwrap(),
Array::try_from_iter(["d", "e", "f"]).unwrap(),
])
.unwrap();
batch.append(&append2).unwrap();

let expected = Batch::try_from_arrays([
Array::try_from_iter([1, 2, 3, 4, 5, 6]).unwrap(),
Array::try_from_iter(["a", "b", "c", "d", "e", "f"]).unwrap(),
])
.unwrap();

assert_batches_eq(&expected, &batch);
}
}
15 changes: 9 additions & 6 deletions crates/rayexec_execution/src/arrays/format/pretty/table.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::{self, Write as _};
use std::ops::Range;
Expand Down Expand Up @@ -38,9 +39,9 @@ pub struct PrettyTable {

impl PrettyTable {
/// Try to create a new pretty-formatted table.
pub fn try_new(
pub fn try_new<B: Borrow<Batch>>(
schema: &Schema,
batches: &[Batch],
batches: &[B],
max_width: usize,
max_rows: Option<usize>,
) -> Result<Self> {
Expand Down Expand Up @@ -86,6 +87,7 @@ impl PrettyTable {
// to help determine the size of the columns.
let samples = match batches.first() {
Some(batch) => batch
.borrow()
.arrays()
.iter()
.map(|col| ColumnValues::try_from_array(col, Some(0..NUM_VALS_FOR_AVG), None))
Expand Down Expand Up @@ -148,7 +150,7 @@ impl PrettyTable {
column_widths.insert(elide_index(&column_widths), 1);
}

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let total_rows: usize = batches.iter().map(|b| b.borrow().num_rows()).sum();
let max_rows = max_rows.unwrap_or(DEFAULT_MAX_ROWS);

let (mut head_rows, mut tail_rows) = if total_rows > max_rows {
Expand Down Expand Up @@ -190,7 +192,8 @@ impl PrettyTable {
break;
}

let (vals, num_rows) = Self::column_values_for_batch(batch, &format, 0..head_rows)?;
let (vals, num_rows) =
Self::column_values_for_batch(batch.borrow(), &format, 0..head_rows)?;
head.push(PrettyValues::new(
col_alignments.clone(),
column_widths.clone(),
Expand All @@ -206,13 +209,13 @@ impl PrettyTable {
break;
}

let num_rows = batch.num_rows();
let num_rows = batch.borrow().num_rows();
let range = if tail_rows >= num_rows {
0..num_rows
} else {
(num_rows - tail_rows)..num_rows
};
let (vals, num_rows) = Self::column_values_for_batch(batch, &format, range)?;
let (vals, num_rows) = Self::column_values_for_batch(batch.borrow(), &format, range)?;
tail.push(PrettyValues::new(
col_alignments.clone(),
column_widths.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::logical::operator::Node;
use crate::storage::table_storage::Projections;

impl IntermediatePipelineBuildState<'_> {
#[allow(deprecated)]
pub fn plan_scan(&mut self, id_gen: &mut PipelineIdGen, scan: Node<LogicalScan>) -> Result<()> {
let location = scan.location;

Expand Down Expand Up @@ -73,6 +74,7 @@ impl IntermediatePipelineBuildState<'_> {
Ok(())
}

#[allow(deprecated)]
fn create_batches_for_row_values(
&self,
projections: Projections,
Expand Down
3 changes: 2 additions & 1 deletion crates/rayexec_execution/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ impl FilterOperation {
}

impl StatelessOperation for FilterOperation {
#[allow(deprecated)]
fn execute(&self, batch: Batch) -> Result<Batch> {
let selection = self.predicate.select(&batch)?;
let batch = batch.select(Arc::new(selection)); // TODO: Select mut
let batch = batch.select_old(Arc::new(selection)); // TODO: Select mut

Ok(batch)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl LeftPrecomputedJoinConditions {
///
/// The output is the (left, right) selection vectors to use for the final
/// output batch.
#[allow(deprecated)]
pub fn compute_selection_for_probe(
&self,
left_batch_idx: usize,
Expand All @@ -106,7 +107,7 @@ impl LeftPrecomputedJoinConditions {
let mut results = Vec::with_capacity(self.conditions.len());

// Select rows from the right batch.
let selected_right = right.select(right_row_sel.clone());
let selected_right = right.select_old(right_row_sel.clone());

for condition in &self.conditions {
let mut left_precomputed = condition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl GlobalHashTable {
}

/// Probe the table.
#[allow(deprecated)]
pub fn probe(
&self,
right: &Batch,
Expand Down Expand Up @@ -222,10 +223,10 @@ impl GlobalHashTable {

// Get the left columns for this batch.
let left_batch = self.batches.get(batch_idx).expect("batch to exist");
let left_cols = left_batch.select(Arc::new(left_row_sel)).into_arrays();
let left_cols = left_batch.select_old(Arc::new(left_row_sel)).into_arrays();

// Get the right.
let right_cols = right.select(Arc::new(right_row_sel)).into_arrays();
let right_cols = right.select_old(Arc::new(right_row_sel)).into_arrays();

// Create final batch.
let batch = Batch::try_from_arrays(left_cols.into_iter().chain(right_cols))?;
Expand Down
1 change: 1 addition & 0 deletions crates/rayexec_execution/src/execution/operators/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl ExecutableOperator for PhysicalLimit {
})
}

#[allow(deprecated)]
fn poll_push(
&self,
cx: &mut Context,
Expand Down
5 changes: 3 additions & 2 deletions crates/rayexec_execution/src/execution/operators/nl_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ impl ExecutableOperator for PhysicalNestedLoopJoin {

/// Generate a cross product of two batches, applying an optional filter to the
/// result.
#[allow(deprecated)]
fn cross_join(
left_batch_idx: usize,
left: &Batch,
Expand All @@ -435,7 +436,7 @@ fn cross_join(
let selection = SelectionVector::repeated(right.num_rows(), left_idx);

// Columns from the left, one row repeated.
let left_columns = left.select(Arc::new(selection)).into_arrays();
let left_columns = left.select_old(Arc::new(selection)).into_arrays();
// Columns from the right, all rows.
let right_columns = right.clone().into_arrays();

Expand All @@ -444,7 +445,7 @@ fn cross_join(
// If we have a filter, apply it to the output batch.
if let Some(filter_expr) = &filter_expr {
let selection = Arc::new(filter_expr.select(&output)?);
output = output.select(selection.clone());
output = output.select_old(selection.clone());

// If we're left joining, compute indices in the left batch that we
// visited.
Expand Down
Loading
Loading