Skip to content

Commit

Permalink
VolcanoExecutor init and next method return BustubxResult
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 31, 2024
1 parent d9f2a6a commit f009dc7
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 78 deletions.
2 changes: 1 addition & 1 deletion bustubx/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Database {
let mut execution_engine = ExecutionEngine {
context: execution_ctx,
};
let tuples = execution_engine.execute(Arc::new(physical_plan));
let tuples = execution_engine.execute(Arc::new(physical_plan))?;
println!("execution result: {:?}", tuples);
Ok(tuples)
}
Expand Down
18 changes: 11 additions & 7 deletions bustubx/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ use std::sync::Arc;
use tracing::span;

Check warning on line 3 in bustubx/src/execution/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `tracing::span`

use crate::catalog::SchemaRef;
use crate::{catalog::Catalog, planner::physical_plan::PhysicalPlan, storage::Tuple};
use crate::{
catalog::Catalog, planner::physical_plan::PhysicalPlan, storage::Tuple, BustubxResult,
};

pub trait VolcanoExecutor {
fn init(&self, context: &mut ExecutionContext) {}
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple>;
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
Ok(())
}
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>>;
fn output_schema(&self) -> SchemaRef;
}

Expand All @@ -20,17 +24,17 @@ pub struct ExecutionEngine<'a> {
pub context: ExecutionContext<'a>,
}
impl ExecutionEngine<'_> {
pub fn execute(&mut self, plan: Arc<PhysicalPlan>) -> Vec<Tuple> {
plan.init(&mut self.context);
pub fn execute(&mut self, plan: Arc<PhysicalPlan>) -> BustubxResult<Vec<Tuple>> {
plan.init(&mut self.context)?;
let mut result = Vec::new();
loop {
let next_tuple = plan.next(&mut self.context);
let next_tuple = plan.next(&mut self.context)?;
if next_tuple.is_some() {
result.push(next_tuple.unwrap());
} else {
break;
}
}
result
Ok(result)
}
}
1 change: 0 additions & 1 deletion bustubx/src/planner/logical_planner/plan_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ impl<'a> LogicalPlanner<'a> {
}
records.push(record);
}
println!("LWZTEST columns: {:?}, records: {:?}", columns, records);
let values_node = LogicalPlan {
operator: LogicalOperator::new_values_operator(columns.clone(), records),
children: Vec::new(),
Expand Down
5 changes: 3 additions & 2 deletions bustubx/src/planner/physical_plan/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
catalog::Schema,
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
BustubxResult,
};
use std::sync::Arc;

Expand All @@ -15,13 +16,13 @@ pub struct PhysicalCreateIndex {
}

impl VolcanoExecutor for PhysicalCreateIndex {
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
context.catalog.create_index(
self.index_name.clone(),
self.table_name.clone(),
self.key_attrs.clone(),
);
None
Ok(None)
}
fn output_schema(&self) -> SchemaRef {
Arc::new(Schema::copy_schema(
Expand Down
5 changes: 3 additions & 2 deletions bustubx/src/planner/physical_plan/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
catalog::Schema,
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
BustubxResult,
};
use std::sync::Arc;

Expand All @@ -13,11 +14,11 @@ pub struct PhysicalCreateTable {
}

impl VolcanoExecutor for PhysicalCreateTable {
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
context
.catalog
.create_table(self.table_name.clone(), Arc::new(self.schema.clone()));
None
Ok(None)
}
fn output_schema(&self) -> SchemaRef {
Arc::new(self.schema.clone())
Expand Down
6 changes: 3 additions & 3 deletions bustubx/src/planner/physical_plan/dummy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::catalog::{Schema, SchemaRef};
use crate::execution::{ExecutionContext, VolcanoExecutor};
use crate::Tuple;
use crate::{BustubxResult, Tuple};
use std::sync::Arc;

#[derive(Debug)]
pub struct Dummy;

impl VolcanoExecutor for Dummy {
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
None
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
Ok(None)
}

fn output_schema(&self) -> SchemaRef {
Expand Down
15 changes: 8 additions & 7 deletions bustubx/src/planner/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
common::ScalarValue,
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
BustubxResult,
};

use super::PhysicalPlan;
Expand All @@ -17,22 +18,22 @@ pub struct PhysicalFilter {
}

impl VolcanoExecutor for PhysicalFilter {
fn init(&self, context: &mut ExecutionContext) {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
println!("init filter executor");
self.input.init(context);
self.input.init(context)
}

fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
loop {
let next_tuple = self.input.next(context);
let next_tuple = self.input.next(context)?;
if next_tuple.is_none() {
return None;
return Ok(None);
}
let tuple = next_tuple.unwrap();
let compare_res = self.predicate.evaluate(&tuple).unwrap();
let compare_res = self.predicate.evaluate(&tuple)?;
if let ScalarValue::Boolean(Some(v)) = compare_res {
if v {
return Some(tuple);
return Ok(Some(tuple));
}
} else {
panic!("filter predicate should be boolean")
Expand Down
15 changes: 8 additions & 7 deletions bustubx/src/planner/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
common::ScalarValue,
execution::{ExecutionContext, VolcanoExecutor},
storage::{Tuple, TupleMeta},
BustubxResult,
};

use super::PhysicalPlan;
Expand All @@ -29,27 +30,27 @@ impl PhysicalInsert {
}
}
impl VolcanoExecutor for PhysicalInsert {
fn init(&self, context: &mut ExecutionContext) {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
println!("init insert executor");
self.insert_rows
.store(0, std::sync::atomic::Ordering::SeqCst);
self.input.init(context);
self.input.init(context)
}
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
loop {
let next_tuple = self.input.next(context);
let next_tuple = self.input.next(context)?;
if next_tuple.is_none() {
// only return insert_rows when input exhausted
if self.insert_rows.load(std::sync::atomic::Ordering::SeqCst) == 0 {
return None;
return Ok(None);
} else {
let insert_rows = self.insert_rows.load(std::sync::atomic::Ordering::SeqCst);
self.insert_rows
.store(0, std::sync::atomic::Ordering::SeqCst);
return Some(Tuple::new(
return Ok(Some(Tuple::new(
self.output_schema(),
vec![ScalarValue::Int32(Some(insert_rows as i32))],
));
)));
}
}

Expand Down
17 changes: 9 additions & 8 deletions bustubx/src/planner/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
catalog::Schema,
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
BustubxResult,
};

use super::PhysicalPlan;
Expand All @@ -28,16 +29,16 @@ impl PhysicalLimit {
}
}
impl VolcanoExecutor for PhysicalLimit {
fn init(&self, context: &mut ExecutionContext) {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
println!("init limit executor");
self.cursor.store(0, std::sync::atomic::Ordering::SeqCst);
self.input.init(context);
self.input.init(context)
}
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
loop {
let next_tuple = self.input.next(context);
let next_tuple = self.input.next(context)?;
if next_tuple.is_none() {
return None;
return Ok(None);
}
let cursor = self
.cursor
Expand All @@ -49,12 +50,12 @@ impl VolcanoExecutor for PhysicalLimit {
if self.limit.is_some() {
let limit = self.limit.unwrap();
if (cursor as usize) < offset + limit {
return next_tuple;
return Ok(next_tuple);
} else {
return None;
return Ok(None);
}
} else {
return next_tuple;
return Ok(next_tuple);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions bustubx/src/planner/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::catalog::SchemaRef;
use crate::{
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
BustubxResult,
};

#[derive(Debug)]
Expand All @@ -44,7 +45,7 @@ pub enum PhysicalPlan {
}

impl VolcanoExecutor for PhysicalPlan {
fn init(&self, context: &mut ExecutionContext) {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
match self {
PhysicalPlan::Dummy(op) => op.init(context),
PhysicalPlan::CreateTable(op) => op.init(context),
Expand All @@ -60,7 +61,7 @@ impl VolcanoExecutor for PhysicalPlan {
}
}

fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
match self {
PhysicalPlan::Dummy(op) => op.next(context),
PhysicalPlan::CreateTable(op) => op.next(context),
Expand Down
29 changes: 15 additions & 14 deletions bustubx/src/planner/physical_plan/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
execution::{ExecutionContext, VolcanoExecutor},
planner::table_ref::join::JoinType,
storage::Tuple,
BustubxResult,
};

use super::PhysicalPlan;
Expand Down Expand Up @@ -38,16 +39,16 @@ impl PhysicalNestedLoopJoin {
}
}
impl VolcanoExecutor for PhysicalNestedLoopJoin {
fn init(&self, context: &mut ExecutionContext) {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
println!("init nested loop join executor");
*self.left_tuple.lock().unwrap() = None;
self.left_input.init(context);
self.right_input.init(context);
self.left_input.init(context)?;
self.right_input.init(context)
}
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
let left_tuple = self.left_tuple.lock().unwrap();
let mut left_next_tuple = if left_tuple.is_none() {
self.left_input.next(context)
self.left_input.next(context)?
} else {
Some(left_tuple.clone().unwrap())
};
Expand All @@ -57,7 +58,7 @@ impl VolcanoExecutor for PhysicalNestedLoopJoin {
while left_next_tuple.is_some() {
let left_tuple = left_next_tuple.clone().unwrap();

let mut right_next_tuple = self.right_input.next(context);
let mut right_next_tuple = self.right_input.next(context)?;
while right_next_tuple.is_some() {
let right_tuple = right_next_tuple.unwrap();

Expand All @@ -66,33 +67,33 @@ impl VolcanoExecutor for PhysicalNestedLoopJoin {
// save latest left_next_result before return
*self.left_tuple.lock().unwrap() = Some(left_tuple.clone());

return Some(Tuple::try_merge(vec![left_tuple, right_tuple]).unwrap());
return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
} else {
let condition = self.condition.clone().unwrap();
let merged_tuple =
Tuple::try_merge(vec![left_tuple.clone(), right_tuple.clone()]).unwrap();
let evaluate_res = condition.evaluate(&merged_tuple).unwrap();
Tuple::try_merge(vec![left_tuple.clone(), right_tuple.clone()])?;
let evaluate_res = condition.evaluate(&merged_tuple)?;
// TODO support left/right join after null support added
if let ScalarValue::Boolean(Some(v)) = evaluate_res {
if v {
// save latest left_next_result before return
*self.left_tuple.lock().unwrap() = Some(left_tuple.clone());

return Some(Tuple::try_merge(vec![left_tuple, right_tuple]).unwrap());
return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
}
} else {
panic!("nested loop join condition should be boolean")
}
}

right_next_tuple = self.right_input.next(context);
right_next_tuple = self.right_input.next(context)?;
}

// reset right executor
self.right_input.init(context);
left_next_tuple = self.left_input.next(context);
self.right_input.init(context)?;
left_next_tuple = self.left_input.next(context)?;
}
return None;
return Ok(None);
}

fn output_schema(&self) -> SchemaRef {
Expand Down
15 changes: 8 additions & 7 deletions bustubx/src/planner/physical_plan/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::expression::{Expr, ExprTrait};
use crate::{
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
BustubxResult,
};

use super::PhysicalPlan;
Expand All @@ -16,22 +17,22 @@ pub struct PhysicalProject {
}

impl VolcanoExecutor for PhysicalProject {
fn init(&self, context: &mut ExecutionContext) {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
println!("init project executor");
self.input.init(context);
self.input.init(context)
}

fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
let next_tuple = self.input.next(context);
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
let next_tuple = self.input.next(context)?;
if next_tuple.is_none() {
return None;
return Ok(None);
}
let next_tuple = next_tuple.unwrap();
let mut new_values = Vec::new();
for expr in &self.expressions {
new_values.push(expr.evaluate(&next_tuple).unwrap());
new_values.push(expr.evaluate(&next_tuple)?);
}
return Some(Tuple::new(self.output_schema(), new_values));
return Ok(Some(Tuple::new(self.output_schema(), new_values)));
}

fn output_schema(&self) -> SchemaRef {
Expand Down
Loading

0 comments on commit f009dc7

Please sign in to comment.