Skip to content

Commit

Permalink
Add Display for physical plan
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 30, 2024
1 parent a7a6f4a commit ad9b2d8
Show file tree
Hide file tree
Showing 19 changed files with 152 additions and 39 deletions.
2 changes: 1 addition & 1 deletion bustubx/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::catalog::SchemaRef;
use crate::{catalog::Catalog, planner::physical_plan::PhysicalPlan, storage::Tuple};

pub trait VolcanoExecutor {
fn init(&self, context: &mut ExecutionContext);
fn init(&self, context: &mut ExecutionContext) {}
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple>;
fn output_schema(&self) -> SchemaRef;
}
Expand Down
30 changes: 24 additions & 6 deletions bustubx/src/expression/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
use crate::catalog::DataType;
use crate::catalog::Schema;
use crate::common::ScalarValue;
use crate::error::BustubxResult;
use crate::storage::Tuple;

mod alias;
mod binary;
mod column;
Expand All @@ -14,6 +8,13 @@ pub use binary::{BinaryExpr, BinaryOp};
pub use column::ColumnExpr;
pub use literal::Literal;

use crate::catalog::DataType;
use crate::catalog::Schema;
use crate::common::ScalarValue;
use crate::storage::Tuple;
use crate::BustubxError;
use crate::BustubxResult;

pub trait ExprTrait {
/// Get the data type of this expression, given the schema of the input
fn data_type(&self, input_schema: &Schema) -> BustubxResult<DataType>;
Expand Down Expand Up @@ -53,3 +54,20 @@ impl ExprTrait for Expr {
}
}
}

impl TryFrom<&sqlparser::ast::Expr> for Expr {
type Error = BustubxError;

fn try_from(value: &sqlparser::ast::Expr) -> Result<Self, Self::Error> {
match value {
sqlparser::ast::Expr::Value(value) => todo!(),
sqlparser::ast::Expr::BinaryOp { left, op, right } => todo!(),
sqlparser::ast::Expr::Identifier(ident) => todo!(),
sqlparser::ast::Expr::CompoundIdentifier(idents) => todo!(),
_ => Err(BustubxError::NotSupport(format!(
"sqlparser expr not supported: {}",
value
))),
}
}
}
6 changes: 3 additions & 3 deletions bustubx/src/planner/logical_planner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod bind_create_table;
mod bind_insert;
mod bind_select;
mod logical_planner;
mod plan_create_index;
mod plan_create_table;
mod plan_insert;
mod plan_select;

pub use logical_planner::{LogicalPlanner, PlannerContext};
9 changes: 6 additions & 3 deletions bustubx/src/planner/physical_plan/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ pub struct PhysicalCreateIndex {
}

impl VolcanoExecutor for PhysicalCreateIndex {
fn init(&self, context: &mut ExecutionContext) {
println!("init create index executor");
}
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
context.catalog.create_index(
self.index_name.clone(),
Expand All @@ -33,3 +30,9 @@ impl VolcanoExecutor for PhysicalCreateIndex {
))
}
}

impl std::fmt::Display for PhysicalCreateIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
9 changes: 6 additions & 3 deletions bustubx/src/planner/physical_plan/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ pub struct PhysicalCreateTable {
}

impl VolcanoExecutor for PhysicalCreateTable {
fn init(&self, context: &mut ExecutionContext) {
println!("init create table executor");
}
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
context
.catalog
Expand All @@ -26,3 +23,9 @@ impl VolcanoExecutor for PhysicalCreateTable {
Arc::new(self.schema.clone())
}
}

impl std::fmt::Display for PhysicalCreateTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
23 changes: 23 additions & 0 deletions bustubx/src/planner/physical_plan/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use crate::catalog::{Schema, SchemaRef};
use crate::execution::{ExecutionContext, VolcanoExecutor};
use crate::Tuple;
use std::sync::Arc;

#[derive(Debug)]
pub struct Dummy;

impl VolcanoExecutor for Dummy {
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
None
}

fn output_schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
}

impl std::fmt::Display for Dummy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Dummy")
}
}
7 changes: 7 additions & 0 deletions bustubx/src/planner/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl VolcanoExecutor for PhysicalFilter {
println!("init filter executor");
self.input.init(context);
}

fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
loop {
let next_tuple = self.input.next(context);
Expand All @@ -44,3 +45,9 @@ impl VolcanoExecutor for PhysicalFilter {
self.input.output_schema()
}
}

impl std::fmt::Display for PhysicalFilter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
6 changes: 6 additions & 0 deletions bustubx/src/planner/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,9 @@ impl VolcanoExecutor for PhysicalInsert {
)]))
}
}

impl std::fmt::Display for PhysicalInsert {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
6 changes: 6 additions & 0 deletions bustubx/src/planner/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,9 @@ impl VolcanoExecutor for PhysicalLimit {
self.input.output_schema()
}
}

impl std::fmt::Display for PhysicalLimit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
43 changes: 30 additions & 13 deletions bustubx/src/planner/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
use std::sync::Arc;

use crate::catalog::SchemaRef;
use crate::{
catalog::Schema,
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
};

mod create_index;
mod create_table;
mod dummy;
mod filter;
mod insert;
mod limit;
Expand All @@ -20,6 +12,7 @@ mod values;

pub use create_index::PhysicalCreateIndex;
pub use create_table::PhysicalCreateTable;
pub use dummy::Dummy;
pub use filter::PhysicalFilter;
pub use insert::PhysicalInsert;
pub use limit::PhysicalLimit;
Expand All @@ -29,9 +22,15 @@ pub use seq_scan::PhysicalSeqScan;
pub use sort::PhysicalSort;
pub use values::PhysicalValues;

use crate::catalog::SchemaRef;
use crate::{
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
};

#[derive(Debug)]
pub enum PhysicalPlan {
Dummy,
Dummy(Dummy),
CreateTable(PhysicalCreateTable),
CreateIndex(PhysicalCreateIndex),
Project(PhysicalProject),
Expand All @@ -47,7 +46,7 @@ pub enum PhysicalPlan {
impl VolcanoExecutor for PhysicalPlan {
fn init(&self, context: &mut ExecutionContext) {
match self {
PhysicalPlan::Dummy => {}
PhysicalPlan::Dummy(op) => op.init(context),
PhysicalPlan::CreateTable(op) => op.init(context),
PhysicalPlan::CreateIndex(op) => op.init(context),
PhysicalPlan::Insert(op) => op.init(context),
Expand All @@ -63,7 +62,7 @@ impl VolcanoExecutor for PhysicalPlan {

fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
match self {
PhysicalPlan::Dummy => None,
PhysicalPlan::Dummy(op) => op.next(context),
PhysicalPlan::CreateTable(op) => op.next(context),
PhysicalPlan::CreateIndex(op) => op.next(context),
PhysicalPlan::Insert(op) => op.next(context),
Expand All @@ -79,7 +78,7 @@ impl VolcanoExecutor for PhysicalPlan {

fn output_schema(&self) -> SchemaRef {
match self {
Self::Dummy => Arc::new(Schema::new(vec![])),
Self::Dummy(op) => op.output_schema(),
Self::CreateTable(op) => op.output_schema(),
Self::CreateIndex(op) => op.output_schema(),
Self::Insert(op) => op.output_schema(),
Expand All @@ -93,3 +92,21 @@ impl VolcanoExecutor for PhysicalPlan {
}
}
}

impl std::fmt::Display for PhysicalPlan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Dummy(op) => write!(f, "{op}"),
Self::CreateTable(op) => write!(f, "{op}"),
Self::CreateIndex(op) => write!(f, "{op}"),
Self::Insert(op) => write!(f, "{op}"),
Self::Values(op) => write!(f, "{op}"),
Self::Project(op) => write!(f, "{op}"),
Self::Filter(op) => write!(f, "{op}"),
Self::TableScan(op) => write!(f, "{op}"),
Self::Limit(op) => write!(f, "{op}"),
Self::NestedLoopJoin(op) => write!(f, "{op}"),
Self::Sort(op) => write!(f, "{op}"),
}
}
}
6 changes: 6 additions & 0 deletions bustubx/src/planner/physical_plan/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,9 @@ impl VolcanoExecutor for PhysicalNestedLoopJoin {
)
}
}

impl std::fmt::Display for PhysicalNestedLoopJoin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
8 changes: 8 additions & 0 deletions bustubx/src/planner/physical_plan/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ pub struct PhysicalProject {
pub expressions: Vec<Expr>,
pub input: Arc<PhysicalPlan>,
}

impl VolcanoExecutor for PhysicalProject {
fn init(&self, context: &mut ExecutionContext) {
println!("init project executor");
self.input.init(context);
}

fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
let next_tuple = self.input.next(context);
if next_tuple.is_none() {
Expand All @@ -37,3 +39,9 @@ impl VolcanoExecutor for PhysicalProject {
self.input.output_schema()
}
}

impl std::fmt::Display for PhysicalProject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
8 changes: 8 additions & 0 deletions bustubx/src/planner/physical_plan/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct PhysicalSeqScan {

iterator: Mutex<TableIterator>,
}

impl PhysicalSeqScan {
pub fn new(table_oid: TableOid, columns: Vec<ColumnRef>) -> Self {
PhysicalSeqScan {
Expand All @@ -23,6 +24,7 @@ impl PhysicalSeqScan {
}
}
}

impl VolcanoExecutor for PhysicalSeqScan {
fn init(&self, context: &mut ExecutionContext) {
println!("init table scan executor");
Expand Down Expand Up @@ -50,3 +52,9 @@ impl VolcanoExecutor for PhysicalSeqScan {
})
}
}

impl std::fmt::Display for PhysicalSeqScan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
6 changes: 6 additions & 0 deletions bustubx/src/planner/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,9 @@ impl VolcanoExecutor for PhysicalSort {
self.input.output_schema()
}
}

impl std::fmt::Display for PhysicalSort {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
18 changes: 10 additions & 8 deletions bustubx/src/planner/physical_plan/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,16 @@ impl PhysicalValues {
}
}
impl VolcanoExecutor for PhysicalValues {
fn init(&self, context: &mut ExecutionContext) {
println!("init values executor");
self.cursor.store(0, std::sync::atomic::Ordering::SeqCst);
}
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple> {
let cursor = self
.cursor
.fetch_add(1, std::sync::atomic::Ordering::SeqCst) as usize;
if cursor < self.tuples.len() {
return if cursor < self.tuples.len() {
let values = self.tuples[cursor].clone();
return Some(Tuple::new(self.output_schema(), values));
Some(Tuple::new(self.output_schema(), values))
} else {
return None;
}
None
};
}

fn output_schema(&self) -> SchemaRef {
Expand All @@ -48,3 +44,9 @@ impl VolcanoExecutor for PhysicalValues {
})
}
}

impl std::fmt::Display for PhysicalValues {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
4 changes: 2 additions & 2 deletions bustubx/src/planner/physical_planner/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::Arc;
use crate::planner::logical_plan::LogicalPlan;
use crate::planner::operator::LogicalOperator;

use crate::planner::physical_plan::PhysicalCreateIndex;
use crate::planner::physical_plan::PhysicalCreateTable;
use crate::planner::physical_plan::PhysicalFilter;
use crate::planner::physical_plan::PhysicalInsert;
Expand All @@ -14,6 +13,7 @@ use crate::planner::physical_plan::PhysicalProject;
use crate::planner::physical_plan::PhysicalSeqScan;
use crate::planner::physical_plan::PhysicalSort;
use crate::planner::physical_plan::PhysicalValues;
use crate::planner::physical_plan::{Dummy, PhysicalCreateIndex};

pub struct PhysicalPlanner;

Expand All @@ -30,7 +30,7 @@ impl PhysicalPlanner {

pub fn build_plan(logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
let plan = match logical_plan.operator {
LogicalOperator::Dummy => PhysicalPlan::Dummy,
LogicalOperator::Dummy => PhysicalPlan::Dummy(Dummy {}),
LogicalOperator::CreateTable(ref logic_create_table) => {
PhysicalPlan::CreateTable(PhysicalCreateTable::new(
logic_create_table.table_name.clone(),
Expand Down

0 comments on commit ad9b2d8

Please sign in to comment.