Skip to content

Commit

Permalink
add capacity to batch
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Jan 9, 2025
1 parent 17a5000 commit fab1cb2
Show file tree
Hide file tree
Showing 32 changed files with 185 additions and 121 deletions.
2 changes: 1 addition & 1 deletion crates/docgen/src/markdown_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod tests {

#[test]
fn simple() {
let batch = Batch::try_new([
let batch = Batch::try_from_arrays([
Array::from_iter([1, 2, 3]),
Array::from_iter(["cat", "dog", "mouse"]),
])
Expand Down
2 changes: 1 addition & 1 deletion crates/rayexec_csv/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ impl AsyncCsvStream {
arrs.push(arr);
}

Batch::try_new(arrs)
Batch::try_from_arrays(arrs)
}

fn build_boolean(
Expand Down
70 changes: 41 additions & 29 deletions crates/rayexec_execution/src/arrays/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,46 @@ impl<B> Array<B>
where
B: BufferManager,
{
/// Create a new array with the given capacity.
///
/// This will take care of initalizing the primary and secondary data
/// buffers depending on the type.
pub fn try_new(manager: &Arc<B>, datatype: DataType, capacity: usize) -> Result<Self> {
let buffer = array_buffer_for_datatype(manager, &datatype, capacity)?;
let validity = Validity::new_all_valid(capacity);

Ok(Array {
datatype,
selection2: None,
validity2: None,
data2: ArrayData2::UntypedNull(UntypedNullStorage(capacity)),
next: Some(ArrayNextInner {
validity,
data: ArrayData::owned(buffer),
}),
})
}

// TODO: Remove
pub(crate) fn next(&self) -> &ArrayNextInner<B> {
self.next.as_ref().unwrap()
self.next.as_ref().expect("next to be set")
}

// TODO: Remove
pub(crate) fn next_mut(&mut self) -> &mut ArrayNextInner<B> {
self.next.as_mut().unwrap()
self.next.as_mut().expect("next to be set")
}

pub fn capacity(&self) -> usize {
self.next.as_ref().unwrap().data.primary_capacity()
if let Some(next) = &self.next {
return next.data.primary_capacity();
}

// TODO: Remove, just using to not break things completely yet.
match self.selection2.as_ref().map(|v| v.as_ref()) {
Some(v) => v.num_rows(),
None => self.data2.len(),
}
}

pub fn datatype(&self) -> &DataType {
Expand Down Expand Up @@ -251,26 +279,6 @@ where
}

impl Array {
/// Create a new array with the given capacity.
///
/// This will take care of initalizing the primary and secondary data
/// buffers depending on the type.
pub fn new(datatype: DataType, capacity: usize) -> Result<Self> {
let buffer = array_buffer_for_datatype(&datatype, capacity)?;
let validity = Validity::new_all_valid(capacity);

Ok(Array {
datatype,
selection2: None,
validity2: None,
data2: ArrayData2::UntypedNull(UntypedNullStorage(capacity)),
next: Some(ArrayNextInner {
validity,
data: ArrayData::owned(buffer),
}),
})
}

pub fn new_untyped_null_array(len: usize) -> Self {
// Note that we're adding a bitmap here even though the data already
// returns NULL. This allows the executors (especially for aggregates)
Expand Down Expand Up @@ -1107,12 +1115,14 @@ impl From<ListStorage> for ArrayData2 {
}

/// Create a new array buffer for a datatype.
fn array_buffer_for_datatype(
fn array_buffer_for_datatype<B>(
manager: &Arc<B>,
datatype: &DataType,
capacity: usize,
) -> Result<ArrayBuffer<NopBufferManager>> {
let manager = &Arc::new(NopBufferManager);

) -> Result<ArrayBuffer<B>>
where
B: BufferManager,
{
let buffer = match datatype.physical_type()? {
PhysicalType::UntypedNull => {
ArrayBuffer::with_primary_capacity::<PhysicalUntypedNull>(manager, capacity)?
Expand Down Expand Up @@ -1173,7 +1183,7 @@ fn array_buffer_for_datatype(
}
};

let child = Array::new(inner_type, capacity)?;
let child = Array::try_new(manager, inner_type, capacity)?;

let mut buffer = ArrayBuffer::with_primary_capacity::<PhysicalList>(manager, capacity)?;
buffer.put_secondary_buffer(SecondaryBuffer::List(ListBuffer::new(child)));
Expand All @@ -1200,7 +1210,9 @@ macro_rules! impl_primitive_from_iter {
) -> Result<Self, Self::Error> {
let iter = iter.into_iter();

let mut array = Array::new(DataType::$typ_variant, iter.len())?;
let manager = Arc::new(NopBufferManager);

let mut array = Array::try_new(&manager, DataType::$typ_variant, iter.len())?;
let slice = array
.next
.as_mut()
Expand Down
116 changes: 80 additions & 36 deletions crates/rayexec_execution/src/arrays/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,95 @@ use crate::arrays::selection::SelectionVector;
/// A batch of same-length arrays.
#[derive(Debug, Clone, PartialEq)]
pub struct Batch {
/// Arrays that make up this batch.
arrays: Vec<Array>,

/// Number of rows in this batch. Needed to allow for a batch that has no
/// columns but a non-zero number of rows.
num_rows: usize,
/// Arrays making up the batch.
///
/// All arrays must have the same capacity (underlying length).
pub(crate) arrays: Vec<Array>,
/// Number of logical rows in the batch.
///
/// Equal to or less than capacity when batch contains at least one array.
/// If the batch contains no arrays, number of rows can be arbitarily set.
///
/// This allows "resizing" batches without needed to resize the underlying
/// arrays, allowing for buffer reuse.
pub(crate) num_rows: usize,
/// Capacity (in number of rows) of the batch.
///
/// This should match the capacity of the arrays. If there are zero arrays
/// in the batch, this should be zero.
pub(crate) capacity: usize,
}

impl Batch {
pub const fn empty() -> Self {
Batch {
arrays: Vec::new(),
num_rows: 0,
capacity: 0,
}
}

pub fn empty_with_num_rows(num_rows: usize) -> Self {
Batch {
arrays: Vec::new(),
num_rows,
capacity: 0,
}
}

/// Create a new batch from some number of arrays.
///
/// All arrays should have the same logical length.
///
/// The initial number of rows the batch will report will equal the capacity
/// of the arrays. `set_num_rows` should be used if the logical number of
/// rows is less than capacity.
pub fn try_from_arrays(arrays: impl IntoIterator<Item = Array>) -> Result<Self> {
let arrays: Vec<_> = arrays.into_iter().collect();
let capacity = match arrays.first() {
Some(arr) => arr.capacity(),
None => {
return Ok(Batch {
arrays: Vec::new(),
num_rows: 0,
capacity: 0,
})
}
};

for array in &arrays {
if array.capacity() != capacity {
return Err(RayexecError::new(
"Attempted to create batch from arrays with different capacities",
)
.with_field("expected", capacity)
.with_field("got", array.capacity()));
}
}

Ok(Batch {
arrays,
num_rows: capacity,
capacity,
})
}

pub fn num_rows(&self) -> usize {
self.num_rows
}

/// Sets the logical number of rows for the batch.
///
/// Errors if `rows` is greater than the capacity of the batch.
pub fn set_num_rows(&mut self, rows: usize) -> Result<()> {
if rows > self.capacity {
return Err(RayexecError::new("Number of rows exceeds capacity")
.with_field("capacity", self.capacity)
.with_field("requested_num_rows", rows));
}
self.num_rows = rows;

Ok(())
}

/// Concat multiple batches into one.
Expand Down Expand Up @@ -74,32 +142,7 @@ impl Batch {
working_arrays.clear();
}

Batch::try_new(output_cols)
}

/// Create a new batch from some number of arrays.
///
/// All arrays should have the same logical length.
pub fn try_new(cols: impl IntoIterator<Item = Array>) -> Result<Self> {
let cols: Vec<_> = cols.into_iter().collect();
let len = match cols.first() {
Some(arr) => arr.logical_len(),
None => return Ok(Self::empty()),
};

for (idx, col) in cols.iter().enumerate() {
if col.logical_len() != len {
return Err(RayexecError::new(format!(
"Expected column length to be {len}, got {}. Column idx: {idx}",
col.logical_len()
)));
}
}

Ok(Batch {
arrays: cols,
num_rows: len,
})
Batch::try_from_arrays(output_cols)
}

// TODO: Owned variant
Expand All @@ -112,17 +155,21 @@ impl Batch {
Batch {
arrays: cols,
num_rows: self.num_rows,
capacity: self.capacity,
}
}

// TODO: Remove
pub fn slice(&self, offset: usize, count: usize) -> Self {
let cols = self.arrays.iter().map(|c| c.slice(offset, count)).collect();
Batch {
arrays: cols,
num_rows: count,
capacity: count,
}
}

// TODO: Remove
/// Selects rows in the batch.
///
/// This accepts an Arc selection as it'll be cloned for each array in the
Expand All @@ -141,6 +188,7 @@ impl Batch {
Batch {
arrays: cols,
num_rows: selection.as_ref().num_rows(),
capacity: selection.as_ref().num_rows(),
}
}

Expand Down Expand Up @@ -180,10 +228,6 @@ impl Batch {
self.arrays.len()
}

pub fn num_rows(&self) -> usize {
self.num_rows
}

pub fn into_arrays(self) -> Vec<Array> {
self.arrays
}
Expand Down
Loading

0 comments on commit fab1cb2

Please sign in to comment.