Skip to content

Commit

Permalink
remove eval2
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Jan 10, 2025
1 parent 4d0ad3c commit b1501b8
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ impl IntermediatePipelineBuildState<'_> {
.context("Failed to plan expressions for values")?;
let arrs = exprs
.into_iter()
.map(|expr| {
let arr = expr.eval(&dummy_batch)?;
Ok(arr.into_owned())
})
.map(|expr| expr.eval(&dummy_batch))
.collect::<Result<Vec<_>>>()?;
row_arrs.push(arrs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl LeftPrecomputedJoinConditions {
pub fn precompute_for_left_batch(&mut self, left: &Batch) -> Result<()> {
for condition in &mut self.conditions {
let precomputed = condition.left.eval(left)?;
condition.left_precomputed.push(precomputed.into_owned())
condition.left_precomputed.push(precomputed)
}

Ok(())
Expand Down Expand Up @@ -127,7 +127,7 @@ impl LeftPrecomputedJoinConditions {
let result = condition
.function
.function_impl
.execute2(&[&left_precomputed, right_arr.as_ref()])?;
.execute2(&[&left_precomputed, &right_arr])?;

results.push(result);
}
Expand Down
5 changes: 1 addition & 4 deletions crates/rayexec_execution/src/execution/operators/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ impl StatelessOperation for ProjectOperation {
let arrs = self
.exprs
.iter()
.map(|expr| {
let arr = expr.eval(&batch)?;
Ok(arr.into_owned())
})
.map(|expr| expr.eval(&batch))
.collect::<Result<Vec<_>>>()?;

Batch::try_from_arrays(arrs)
Expand Down
10 changes: 2 additions & 8 deletions crates/rayexec_execution/src/execution/operators/table_inout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ impl ExecutableOperator for PhysicalTableInOut {
let inputs = self
.function_inputs
.iter()
.map(|expr| {
let arr = expr.eval(&batch)?;
Ok(arr.into_owned())
})
.map(|expr| expr.eval(&batch))
.collect::<Result<Vec<_>>>()?;

let inputs = Batch::try_from_arrays(inputs)?;
Expand All @@ -115,10 +112,7 @@ impl ExecutableOperator for PhysicalTableInOut {
let additional_outputs = self
.projected_outputs
.iter()
.map(|expr| {
let arr = expr.eval(&batch)?;
Ok(arr.into_owned())
})
.map(|expr| expr.eval(&batch))
.collect::<Result<Vec<_>>>()?;

state.additional_outputs = additional_outputs;
Expand Down
4 changes: 2 additions & 2 deletions crates/rayexec_execution/src/execution/operators/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ impl ExecutableOperator for PhysicalUnnest {

// Compute inputs. These will be stored until we've processed all rows.
for (col_idx, expr) in self.project_expressions.iter().enumerate() {
state.project_inputs[col_idx] = expr.eval(&batch)?.into_owned();
state.project_inputs[col_idx] = expr.eval(&batch)?;
}

for (col_idx, expr) in self.unnest_expressions.iter().enumerate() {
state.unnest_inputs[col_idx] = expr.eval(&batch)?.into_owned();
state.unnest_inputs[col_idx] = expr.eval(&batch)?;
}

state.input_num_rows = batch.num_rows();
Expand Down
70 changes: 0 additions & 70 deletions crates/rayexec_execution/src/expr/physical/case_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,76 +177,6 @@ impl PhysicalCaseExpr {

Ok(())
}

pub fn eval2<'a>(&self, batch: &'a Batch) -> Result<Cow<'a, Array>> {
let mut arrays = Vec::new();
let mut indices: Vec<(usize, usize)> = (0..batch.num_rows()).map(|_| (0, 0)).collect();

// Track remaining rows we need to evaluate.
//
// True bits are rows we still need to consider.
let mut remaining = Bitmap::new_with_all_true(batch.num_rows());

let mut trues_sel = SelectionVector::with_capacity(batch.num_rows());

for case in &self.cases {
// Generate selection from remaining bitmap.
let selection = Arc::new(SelectionVector::from_iter(remaining.index_iter()));

// Get batch with only remaining rows that we should consider.
let selected_batch = batch.select(selection.clone());

// Execute 'when'.
let selected = case.when.eval(&selected_batch)?;

// Determine which rows should be executed for 'then', and which we
// need to fall through on.
SelectExecutor::select(&selected, &mut trues_sel)?;

// Select rows in batch to execute on based on 'trues'.
let execute_batch = selected_batch.select(Arc::new(trues_sel.clone()));
let output = case.then.eval(&execute_batch)?;

// Store array for later interleaving.
let array_idx = arrays.len();
arrays.push(output.into_owned());

// Figure out mapping from the 'trues' selection to the original row
// index.
//
// The selection vector locations should index into the full-length
// selection vector to get the original row index.
for (array_row_idx, selected_row_idx) in trues_sel.iter_locations().enumerate() {
// Final output row.
let output_row_idx = selection.get(selected_row_idx);
indices[output_row_idx] = (array_idx, array_row_idx);

// Update bitmap, this row was handled.
remaining.set_unchecked(output_row_idx, false);
}
}

// Do all remaining rows.
if remaining.count_trues() != 0 {
let selection = Arc::new(SelectionVector::from_iter(remaining.index_iter()));
let remaining_batch = batch.select(selection.clone());

let output = self.else_expr.eval(&remaining_batch)?;
let array_idx = arrays.len();
arrays.push(output.into_owned());

// Update indices.
for (array_row_idx, output_row_idx) in selection.iter_locations().enumerate() {
indices[output_row_idx] = (array_idx, array_row_idx);
}
}

// Interleave.
let refs: Vec<_> = arrays.iter().collect();
let arr = interleave(&refs, &indices)?;

Ok(Cow::Owned(arr))
}
}

impl fmt::Display for PhysicalCaseExpr {
Expand Down
7 changes: 0 additions & 7 deletions crates/rayexec_execution/src/expr/physical/cast_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ impl PhysicalCastExpr {

Ok(())
}

pub fn eval2<'a>(&self, batch: &'a Batch) -> Result<Cow<'a, Array>> {
let input = self.expr.eval(batch)?;
unimplemented!()
// let out = cast_array(input.as_ref(), self.to.clone(), CastFailBehavior::Error)?;
// Ok(Cow::Owned(out))
}
}

impl fmt::Display for PhysicalCastExpr {
Expand Down
12 changes: 0 additions & 12 deletions crates/rayexec_execution/src/expr/physical/column_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,6 @@ impl PhysicalColumnExpr {

Ok(())
}

pub fn eval2<'a>(&self, batch: &'a Batch) -> Result<Cow<'a, Array>> {
let col = batch.array(self.idx).ok_or_else(|| {
RayexecError::new(format!(
"Tried to get column at index {} in a batch with {} columns",
self.idx,
batch.arrays().len()
))
})?;

Ok(Cow::Borrowed(col))
}
}

impl fmt::Display for PhysicalColumnExpr {
Expand Down
5 changes: 0 additions & 5 deletions crates/rayexec_execution/src/expr/physical/literal_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ impl PhysicalLiteralExpr {

Ok(())
}

pub fn eval2<'a>(&self, batch: &'a Batch) -> Result<Cow<'a, Array>> {
let arr = self.literal.as_array(batch.num_rows())?;
Ok(Cow::Owned(arr))
}
}

impl fmt::Display for PhysicalLiteralExpr {
Expand Down
20 changes: 11 additions & 9 deletions crates/rayexec_execution/src/expr/physical/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::fmt;
use case_expr::PhysicalCaseExpr;
use cast_expr::PhysicalCastExpr;
use column_expr::PhysicalColumnExpr;
use evaluator::ExpressionState;
use evaluator::{ExpressionEvaluator, ExpressionState};
use literal_expr::PhysicalLiteralExpr;
use rayexec_error::{not_implemented, OptionExt, Result};
use scalar_function_expr::PhysicalScalarFunctionExpr;
Expand Down Expand Up @@ -57,14 +57,16 @@ impl PhysicalScalarExpression {
}
}

pub fn eval<'a>(&self, batch: &'a Batch) -> Result<Cow<'a, Array>> {
match self {
Self::Case(e) => e.eval2(batch),
Self::Cast(e) => e.eval2(batch),
Self::Column(e) => e.eval2(batch),
Self::Literal(e) => e.eval2(batch),
Self::ScalarFunction(e) => e.eval2(batch),
}
// TODO: Remove, needs to happen after operator revamp.
pub fn eval(&self, batch: &Batch) -> Result<Array> {
unimplemented!()
// match self {
// Self::Case(e) => e.eval2(batch),
// Self::Cast(e) => e.eval2(batch),
// Self::Column(e) => e.eval2(batch),
// Self::Literal(e) => e.eval2(batch),
// Self::ScalarFunction(e) => e.eval2(batch),
// }
}

/// Produce a selection vector for the batch using this expression.
Expand Down
23 changes: 0 additions & 23 deletions crates/rayexec_execution/src/expr/physical/scalar_function_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,6 @@ impl PhysicalScalarFunctionExpr {

Ok(())
}

pub fn eval2<'a>(&self, batch: &'a Batch) -> Result<Cow<'a, Array>> {
let inputs = self
.inputs
.iter()
.map(|input| input.eval(batch))
.collect::<Result<Vec<_>>>()?;

let refs: Vec<_> = inputs.iter().map(|a| a.as_ref()).collect(); // Can I not?
let mut out = self.function.function_impl.execute2(&refs)?;

// If function is provided no input, it's expected to return an
// array of length 1. We extend the array here so that it's the
// same size as the rest.
//
// TODO: Could just extend the selection vector too.
if refs.is_empty() {
let scalar = out.logical_value(0)?;
out = scalar.as_array(batch.num_rows())?;
}

Ok(Cow::Owned(out))
}
}

impl fmt::Display for PhysicalScalarFunctionExpr {
Expand Down
6 changes: 2 additions & 4 deletions crates/rayexec_execution/src/functions/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Hash for PlannedScalarFunction {
}

pub trait ScalarFunctionImpl: Debug + Sync + Send + DynClone {
fn execute2(&self, inputs: &[&Array]) -> Result<Array> {
fn execute2(&self, _inputs: &[&Array]) -> Result<Array> {
unimplemented!()
}

Expand All @@ -118,9 +118,7 @@ pub trait ScalarFunctionImpl: Debug + Sync + Send + DynClone {
///
/// The batch's `selection` method should be called to determine which rows
/// should be looked at during function eval.
fn execute(&self, input: &Batch, output: &mut Array) -> Result<()> {
unimplemented!()
}
fn execute(&self, input: &Batch, output: &mut Array) -> Result<()>;
}

impl Clone for Box<dyn ScalarFunctionImpl> {
Expand Down

0 comments on commit b1501b8

Please sign in to comment.