Skip to content

Commit

Permalink
Improve catalog api
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 18, 2024
1 parent 2e95c37 commit 4872691
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 92 deletions.
101 changes: 37 additions & 64 deletions bustubx/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use crate::buffer::TABLE_HEAP_BUFFER_POOL_SIZE;
use crate::catalog::SchemaRef;
use crate::common::TableReference;
use crate::{
buffer::BufferPoolManager,
storage::{
Expand Down Expand Up @@ -35,6 +36,7 @@ pub struct Catalog {
pub indexes: HashMap<String, IndexInfo>,
pub buffer_pool_manager: BufferPoolManager,
}

impl Catalog {
pub fn new(buffer_pool_manager: BufferPoolManager) -> Self {
Self {
Expand All @@ -46,10 +48,10 @@ impl Catalog {

pub fn create_table(
&mut self,
table_name: String,
table_ref: TableReference,
schema: SchemaRef,
) -> BustubxResult<&TableInfo> {
if !self.tables.contains_key(&table_name) {
if !self.tables.contains_key(table_ref.table()) {
// 一个table对应一个buffer pool manager
let buffer_pool_manager = BufferPoolManager::new(
TABLE_HEAP_BUFFER_POOL_SIZE,
Expand All @@ -58,49 +60,50 @@ impl Catalog {
let table_heap = TableHeap::try_new(schema.clone(), buffer_pool_manager)?;
let table_info = TableInfo {
schema,
name: table_name.clone(),
name: table_ref.table().to_string(),
table: table_heap,
};

self.tables.insert(table_name.clone(), table_info);
self.tables
.insert(table_ref.table().to_string(), table_info);
}

self.tables
.get(&table_name)
.get(table_ref.table())
.ok_or(BustubxError::Internal("Failed to create table".to_string()))
}

pub fn get_table_by_name(&self, table_name: &str) -> BustubxResult<&TableInfo> {
pub fn table(&self, table_ref: &TableReference) -> BustubxResult<&TableInfo> {
self.tables
.get(table_name)
.get(table_ref.table())
.ok_or(BustubxError::Internal(format!(
"Not found the table {}",
table_name
table_ref
)))
}

pub fn get_mut_table_by_name(&mut self, table_name: &str) -> BustubxResult<&mut TableInfo> {
pub fn table_mut(&mut self, table_ref: &TableReference) -> BustubxResult<&mut TableInfo> {
self.tables
.get_mut(table_name)
.get_mut(table_ref.table())
.ok_or(BustubxError::Internal(format!(
"Not found the table {}",
table_name
table_ref
)))
}

pub fn create_index(
&mut self,
index_name: String,
table_name: String,
table_ref: &TableReference,
key_attrs: Vec<usize>,
) -> BustubxResult<&IndexInfo> {
let table_info = self.get_table_by_name(&table_name)?;
let table_info = self.table(table_ref)?;
let tuple_schema = table_info.schema.clone();
let key_schema = tuple_schema.project(&key_attrs)?;

let index_metadata = IndexMetadata::new(
index_name.clone(),
table_name.clone(),
table_ref.table().to_string(),
tuple_schema.clone(),
key_attrs,
);
Expand All @@ -116,15 +119,15 @@ impl Catalog {
key_schema,
name: index_name.clone(),
index: b_plus_tree_index,
table_name: table_name.clone(),
table_name: table_ref.table().to_string(),
};
self.indexes.insert(index_name.clone(), index_info);
self.indexes
.get(&index_name)
.ok_or(BustubxError::Internal("Failed to create table".to_string()))
}

pub fn get_index_by_name(&self, table_name: &str, index_name: &str) -> Option<&IndexInfo> {
pub fn get_index_by_name(&self, index_name: &str) -> Option<&IndexInfo> {
self.indexes.get(index_name)
}
}
Expand All @@ -133,6 +136,7 @@ impl Catalog {
mod tests {
use std::{fs::remove_file, sync::Arc};

use crate::common::TableReference;
use crate::{
buffer::BufferPoolManager,
catalog::{Column, DataType, Schema},
Expand All @@ -148,69 +152,38 @@ mod tests {
let buffer_pool_manager = BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut catalog = super::Catalog::new(buffer_pool_manager);

let table_name = "test_table1".to_string();
let table_ref1 = TableReference::bare("test_table1".to_string());
let schema = Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8, true),
Column::new("b".to_string(), DataType::Int16, true),
Column::new("c".to_string(), DataType::Int32, true),
]));
let table_info = catalog
.create_table(table_name.clone(), schema.clone())
.create_table(table_ref1.clone(), schema.clone())
.unwrap();
assert_eq!(table_info.name, table_name);
assert_eq!(table_info.name, table_ref1.table());
assert_eq!(table_info.schema, schema);

let table_name = "test_table2".to_string();
let table_ref2 = TableReference::bare("test_table2".to_string());
let schema = Arc::new(Schema::new(vec![
Column::new("d".to_string(), DataType::Int32, true),
Column::new("e".to_string(), DataType::Int16, true),
Column::new("f".to_string(), DataType::Int8, true),
]));
let table_info = catalog
.create_table(table_name.clone(), schema.clone())
.create_table(table_ref2.clone(), schema.clone())
.unwrap();
assert_eq!(table_info.name, table_name);
assert_eq!(table_info.name, table_ref2.table());
assert_eq!(table_info.schema, schema);

let _ = remove_file(db_path);
}

#[test]
pub fn test_catalog_get_table() {
let db_path = "./test_catalog_get_table.db";
let _ = remove_file(db_path);

let disk_manager = DiskManager::try_new(&db_path).unwrap();
let buffer_pool_manager = BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut catalog = super::Catalog::new(buffer_pool_manager);

let table_name1 = "test_table1".to_string();
let schema = Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8, true),
Column::new("b".to_string(), DataType::Int16, true),
Column::new("c".to_string(), DataType::Int32, true),
]));
let _ = catalog.create_table(table_name1.clone(), schema);

let table_name2 = "test_table2".to_string();
let schema = Arc::new(Schema::new(vec![
Column::new("d".to_string(), DataType::Int32, true),
Column::new("e".to_string(), DataType::Int16, true),
Column::new("f".to_string(), DataType::Int8, true),
]));
let _ = catalog.create_table(table_name2.clone(), schema);

let table_info = catalog.get_table_by_name(&table_name1).unwrap();
assert_eq!(table_info.name, table_name1);
let table_info = catalog.table(&table_ref1).unwrap();
assert_eq!(table_info.name, table_ref1.table());
assert_eq!(table_info.schema.column_count(), 3);

let table_info = catalog.get_table_by_name(&table_name2).unwrap();
assert_eq!(table_info.name, table_name2);
let table_info = catalog.table(&table_ref2).unwrap();
assert_eq!(table_info.name, table_ref2.table());
assert_eq!(table_info.schema.column_count(), 3);

let table_info = catalog.get_table_by_name("test_table3");
assert!(table_info.is_err());

let _ = remove_file(db_path);
}

Expand All @@ -223,21 +196,21 @@ mod tests {
let buffer_pool_manager = BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut catalog = super::Catalog::new(buffer_pool_manager);

let table_name = "test_table1".to_string();
let table_ref = TableReference::bare("test_table1".to_string());
let schema = Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8, true),
Column::new("b".to_string(), DataType::Int16, true),
Column::new("c".to_string(), DataType::Int32, true),
]));
let _ = catalog.create_table(table_name.clone(), schema);
let _ = catalog.create_table(table_ref.clone(), schema);

let index_name1 = "test_index1".to_string();
let key_attrs = vec![0, 2];
let index_info = catalog
.create_index(index_name1.clone(), table_name.clone(), key_attrs)
.create_index(index_name1.clone(), &table_ref, key_attrs)
.unwrap();
assert_eq!(index_info.name, index_name1);
assert_eq!(index_info.table_name, table_name);
assert_eq!(index_info.table_name, table_ref.table());
assert_eq!(index_info.key_schema.column_count(), 2);
assert_eq!(
index_info.key_schema.column_with_index(0).unwrap().name,
Expand Down Expand Up @@ -267,10 +240,10 @@ mod tests {
let index_name2 = "test_index2".to_string();
let key_attrs = vec![1];
let index_info = catalog
.create_index(index_name2.clone(), table_name.clone(), key_attrs)
.create_index(index_name2.clone(), &table_ref, key_attrs)
.unwrap();
assert_eq!(index_info.name, index_name2);
assert_eq!(index_info.table_name, table_name);
assert_eq!(index_info.table_name, table_ref.table());
assert_eq!(index_info.key_schema.column_count(), 1);
assert_eq!(
index_info.key_schema.column_with_index(0).unwrap().name,
Expand All @@ -285,7 +258,7 @@ mod tests {
DataType::Int16
);

let index_info = catalog.get_index_by_name(table_name.as_str(), index_name1.as_str());
let index_info = catalog.get_index_by_name(index_name1.as_str());
assert!(index_info.is_some());
let index_info = index_info.unwrap();
assert_eq!(index_info.name, index_name1);
Expand Down
7 changes: 3 additions & 4 deletions bustubx/src/execution/physical_plan/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ pub struct PhysicalCreateTable {

impl VolcanoExecutor for PhysicalCreateTable {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
context.catalog.create_table(
self.table.table().to_string(),
Arc::new(self.schema.clone()),
)?;
context
.catalog
.create_table(self.table.clone(), Arc::new(self.schema.clone()))?;
Ok(None)
}
fn output_schema(&self) -> SchemaRef {
Expand Down
5 changes: 1 addition & 4 deletions bustubx/src/execution/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ impl VolcanoExecutor for PhysicalInsert {
};

// TODO update index if needed
let table_heap = &mut context
.catalog
.get_mut_table_by_name(self.table.table())?
.table;
let table_heap = &mut context.catalog.table_mut(&self.table)?.table;
let tuple_meta = TupleMeta {
insert_txn_id: 0,
delete_txn_id: 0,
Expand Down
4 changes: 2 additions & 2 deletions bustubx/src/execution/physical_plan/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ impl PhysicalSeqScan {
impl VolcanoExecutor for PhysicalSeqScan {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
debug!("init table scan executor");
let table_info = context.catalog.get_mut_table_by_name(self.table.table())?;
let table_info = context.catalog.table_mut(&self.table)?;
let inited_iterator = table_info.table.iter(None, None);
let mut iterator = self.iterator.lock().unwrap();
*iterator = inited_iterator;
Ok(())
}

fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
let table_info = context.catalog.get_mut_table_by_name(self.table.table())?;
let table_info = context.catalog.table_mut(&self.table)?;
let mut iterator = self.iterator.lock().unwrap();
let full_tuple = iterator.next(&mut table_info.table);
return Ok(full_tuple.map(|t| t.1));
Expand Down
7 changes: 1 addition & 6 deletions bustubx/src/planner/logical_planner/plan_create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@ impl<'a> LogicalPlanner<'a> {
let col_expr = self.bind_order_by_expr(&col)?;
columns_expr.push(col_expr);
}
let table_schema = self
.context
.catalog
.get_table_by_name(table.table())?
.schema
.clone();
let table_schema = self.context.catalog.table(&table)?.schema.clone();
Ok(LogicalPlan::CreateIndex(CreateIndex {
index_name,
table,
Expand Down
7 changes: 1 addition & 6 deletions bustubx/src/planner/logical_planner/plan_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,7 @@ impl<'a> LogicalPlanner<'a> {
) -> BustubxResult<LogicalPlan> {
let values = self.plan_set_expr(source.body.as_ref())?;
let table = self.bind_table_name(table_name)?;
let table_schema = self
.context
.catalog
.get_table_by_name(table.table())?
.schema
.clone();
let table_schema = self.context.catalog.table(&table)?.schema.clone();

let projected_schema = if columns_ident.is_empty() {
table_schema.clone()
Expand Down
7 changes: 1 addition & 6 deletions bustubx/src/planner/logical_planner/plan_set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,7 @@ impl LogicalPlanner<'_> {
// TODO handle alias
let table_ref = self.bind_table_name(name)?;
// TODO get schema by full table name
let schema = self
.context
.catalog
.get_table_by_name(table_ref.table())?
.schema
.clone();
let schema = self.context.catalog.table(&table_ref)?.schema.clone();
Ok(LogicalPlan::TableScan(TableScan {
table_ref,
table_schema: schema,
Expand Down

0 comments on commit 4872691

Please sign in to comment.