Skip to content

Commit

Permalink
Improve catalog code
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 16, 2024
1 parent 4ace6ad commit 2e95c37
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 153 deletions.
178 changes: 54 additions & 124 deletions bustubx/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::atomic::AtomicU32};
use std::collections::HashMap;

use crate::buffer::TABLE_HEAP_BUFFER_POOL_SIZE;
use crate::catalog::SchemaRef;
Expand All @@ -8,12 +8,9 @@ use crate::{
index::{BPlusTreeIndex, IndexMetadata},
TableHeap,
},
BustubxResult,
BustubxError, BustubxResult,
};

pub type TableOid = u32;
pub type IndexOid = u32;

pub static DEFAULT_CATALOG_NAME: &str = "bustubx";
pub static DEFAULT_SCHEMA_NAME: &str = "public";

Expand All @@ -23,7 +20,6 @@ pub struct TableInfo {
pub schema: SchemaRef,
pub name: String,
pub table: TableHeap,
pub oid: TableOid,
}

// index元信息
Expand All @@ -32,89 +28,75 @@ pub struct IndexInfo {
pub name: String,
pub index: BPlusTreeIndex,
pub table_name: String,
pub oid: IndexOid,
}

pub struct Catalog {
pub tables: HashMap<TableOid, TableInfo>,
pub table_names: HashMap<String, TableOid>,
pub next_table_oid: AtomicU32,
pub indexes: HashMap<IndexOid, IndexInfo>,
// table_name -> index_name -> index_oid
pub index_names: HashMap<String, HashMap<String, IndexOid>>,
pub next_index_oid: AtomicU32,
pub tables: HashMap<String, TableInfo>,
pub indexes: HashMap<String, IndexInfo>,
pub buffer_pool_manager: BufferPoolManager,
}
impl Catalog {
pub fn new(buffer_pool_manager: BufferPoolManager) -> Self {
Self {
tables: HashMap::new(),
table_names: HashMap::new(),
next_table_oid: AtomicU32::new(0),
indexes: HashMap::new(),
index_names: HashMap::new(),
next_index_oid: AtomicU32::new(0),
buffer_pool_manager,
}
}

pub fn create_table(&mut self, table_name: String, schema: SchemaRef) -> Option<&TableInfo> {
if self.table_names.contains_key(&table_name) {
return None;
pub fn create_table(
&mut self,
table_name: String,
schema: SchemaRef,
) -> BustubxResult<&TableInfo> {
if !self.tables.contains_key(&table_name) {
// 一个table对应一个buffer pool manager
let buffer_pool_manager = BufferPoolManager::new(
TABLE_HEAP_BUFFER_POOL_SIZE,
self.buffer_pool_manager.disk_manager.clone(),
);
let table_heap = TableHeap::try_new(schema.clone(), buffer_pool_manager)?;
let table_info = TableInfo {
schema,
name: table_name.clone(),
table: table_heap,
};

self.tables.insert(table_name.clone(), table_info);
}

// 一个table对应一个buffer pool manager
let buffer_pool_manager = BufferPoolManager::new(
TABLE_HEAP_BUFFER_POOL_SIZE,
self.buffer_pool_manager.disk_manager.clone(),
);
let table_heap = TableHeap::try_new(schema.clone(), buffer_pool_manager).unwrap();
let table_oid = self
.next_table_oid
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let table_info = TableInfo {
schema,
name: table_name.clone(),
table: table_heap,
oid: table_oid,
};

self.tables.insert(table_oid, table_info);
self.table_names.insert(table_name.clone(), table_oid);
self.index_names.insert(table_name, HashMap::new());
self.tables.get(&table_oid)
self.tables
.get(&table_name)
.ok_or(BustubxError::Internal("Failed to create table".to_string()))
}

pub fn get_table_by_name(&self, table_name: &str) -> Option<&TableInfo> {
self.table_names
pub fn get_table_by_name(&self, table_name: &str) -> BustubxResult<&TableInfo> {
self.tables
.get(table_name)
.and_then(|oid| self.tables.get(oid))
}
pub fn get_mut_table_by_name(&mut self, table_name: &str) -> Option<&mut TableInfo> {
self.table_names
.get(table_name)
.and_then(|oid| self.tables.get_mut(oid))
.ok_or(BustubxError::Internal(format!(
"Not found the table {}",
table_name
)))
}

pub fn get_table_by_oid(&self, oid: TableOid) -> Option<&TableInfo> {
self.tables.get(&oid)
}

pub fn get_mut_table_by_oid(&mut self, table_oid: TableOid) -> Option<&mut TableInfo> {
self.tables.get_mut(&table_oid)
pub fn get_mut_table_by_name(&mut self, table_name: &str) -> BustubxResult<&mut TableInfo> {
self.tables
.get_mut(table_name)
.ok_or(BustubxError::Internal(format!(
"Not found the table {}",
table_name
)))
}

pub fn create_index(
&mut self,
index_name: String,
table_name: String,
key_attrs: Vec<usize>,
) -> &IndexInfo {
let table_info = self
.get_table_by_name(&table_name)
.expect("table not found");
) -> BustubxResult<&IndexInfo> {
let table_info = self.get_table_by_name(&table_name)?;
let tuple_schema = table_info.schema.clone();
let key_schema = tuple_schema.project(&key_attrs).unwrap();
let key_schema = tuple_schema.project(&key_attrs)?;

let index_metadata = IndexMetadata::new(
index_name.clone(),
Expand All @@ -130,51 +112,20 @@ impl Catalog {
// TODO compute leaf_max_size and internal_max_size
let b_plus_tree_index = BPlusTreeIndex::new(index_metadata, buffer_pool_manager, 10, 10);

let index_oid = self
.next_index_oid
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let index_info = IndexInfo {
key_schema,
name: index_name.clone(),
index: b_plus_tree_index,
table_name: table_name.clone(),
oid: index_oid,
};
self.indexes.insert(index_oid, index_info);
if self.index_names.contains_key(&table_name) {
self.index_names
.get_mut(&table_name)
.unwrap()
.insert(index_name, index_oid);
} else {
let mut index_names = HashMap::new();
index_names.insert(index_name, index_oid);
self.index_names.insert(table_name, index_names);
}
self.indexes.get(&index_oid).unwrap()
}

pub fn get_index_by_oid(&self, oid: IndexOid) -> Option<&IndexInfo> {
self.indexes.get(&oid)
self.indexes.insert(index_name.clone(), index_info);
self.indexes
.get(&index_name)
.ok_or(BustubxError::Internal("Failed to create table".to_string()))
}

pub fn get_index_by_name(&self, table_name: &str, index_name: &str) -> Option<&IndexInfo> {

Check warning on line 127 in bustubx/src/catalog/catalog.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused variable: `table_name`
self.index_names
.get(table_name)
.and_then(|index_names| index_names.get(index_name))
.and_then(|index_oid| self.indexes.get(index_oid))
}

pub fn get_table_indexes(&self, table_name: &str) -> Vec<&IndexInfo> {
self.index_names
.get(table_name)
.map(|index_names| {
index_names
.iter()
.map(|(_, index_oid)| self.indexes.get(index_oid).unwrap())
.collect()
})
.unwrap_or(vec![])
self.indexes.get(index_name)
}
}

Expand Down Expand Up @@ -208,7 +159,6 @@ mod tests {
.unwrap();
assert_eq!(table_info.name, table_name);
assert_eq!(table_info.schema, schema);
assert_eq!(table_info.oid, 0);

let table_name = "test_table2".to_string();
let schema = Arc::new(Schema::new(vec![
Expand All @@ -221,7 +171,6 @@ mod tests {
.unwrap();
assert_eq!(table_info.name, table_name);
assert_eq!(table_info.schema, schema);
assert_eq!(table_info.oid, 1);

let _ = remove_file(db_path);
}
Expand Down Expand Up @@ -260,18 +209,7 @@ mod tests {
assert_eq!(table_info.schema.column_count(), 3);

let table_info = catalog.get_table_by_name("test_table3");
assert!(table_info.is_none());

let table_info = catalog.get_table_by_oid(0).unwrap();
assert_eq!(table_info.name, table_name1);
assert_eq!(table_info.schema.column_count(), 3);

let table_info = catalog.get_table_by_oid(1).unwrap();
assert_eq!(table_info.name, table_name2);
assert_eq!(table_info.schema.column_count(), 3);

let table_info = catalog.get_table_by_oid(2);
assert!(table_info.is_none());
assert!(table_info.is_err());

let _ = remove_file(db_path);
}
Expand All @@ -295,7 +233,9 @@ mod tests {

let index_name1 = "test_index1".to_string();
let key_attrs = vec![0, 2];
let index_info = catalog.create_index(index_name1.clone(), table_name.clone(), key_attrs);
let index_info = catalog
.create_index(index_name1.clone(), table_name.clone(), key_attrs)
.unwrap();
assert_eq!(index_info.name, index_name1);
assert_eq!(index_info.table_name, table_name);
assert_eq!(index_info.key_schema.column_count(), 2);
Expand Down Expand Up @@ -323,11 +263,12 @@ mod tests {
.data_type,
DataType::Int32
);
assert_eq!(index_info.oid, 0);

let index_name2 = "test_index2".to_string();
let key_attrs = vec![1];
let index_info = catalog.create_index(index_name2.clone(), table_name.clone(), key_attrs);
let index_info = catalog
.create_index(index_name2.clone(), table_name.clone(), key_attrs)
.unwrap();
assert_eq!(index_info.name, index_name2);
assert_eq!(index_info.table_name, table_name);
assert_eq!(index_info.key_schema.column_count(), 1);
Expand All @@ -343,23 +284,12 @@ mod tests {
.data_type,
DataType::Int16
);
assert_eq!(index_info.oid, 1);

let index_info = catalog.get_index_by_name(table_name.as_str(), index_name1.as_str());
assert!(index_info.is_some());
let index_info = index_info.unwrap();
assert_eq!(index_info.name, index_name1);

let index_info = catalog.get_index_by_oid(1);
assert!(index_info.is_some());
let index_info = index_info.unwrap();
assert_eq!(index_info.name, index_name2);

let table_indexes = catalog.get_table_indexes(table_name.as_str());
assert_eq!(table_indexes.len(), 2);
assert!(table_indexes[0].name == index_name1 || table_indexes[0].name == index_name2);
assert!(table_indexes[1].name == index_name1 || table_indexes[1].name == index_name2);

let _ = remove_file(db_path);
}
}
4 changes: 1 addition & 3 deletions bustubx/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ mod column;
mod data_type;
mod schema;

pub use catalog::{
Catalog, IndexInfo, IndexOid, TableInfo, TableOid, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
pub use catalog::{Catalog, IndexInfo, TableInfo, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};

Check warning on line 6 in bustubx/src/catalog/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused imports: `DEFAULT_CATALOG_NAME`, `DEFAULT_SCHEMA_NAME`, `IndexInfo`, `TableInfo`
pub use column::{Column, ColumnRef};
pub use data_type::DataType;
pub use schema::{Schema, SchemaRef, EMPTY_SCHEMA_REF, INSERT_OUTPUT_SCHEMA_REF};
2 changes: 1 addition & 1 deletion bustubx/src/execution/physical_plan/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl VolcanoExecutor for PhysicalCreateTable {
context.catalog.create_table(
self.table.table().to_string(),
Arc::new(self.schema.clone()),
);
)?;
Ok(None)
}
fn output_schema(&self) -> SchemaRef {
Expand Down
3 changes: 1 addition & 2 deletions bustubx/src/execution/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ impl VolcanoExecutor for PhysicalInsert {
// TODO update index if needed
let table_heap = &mut context
.catalog
.get_mut_table_by_name(self.table.table())
.unwrap()
.get_mut_table_by_name(self.table.table())?
.table;
let tuple_meta = TupleMeta {
insert_txn_id: 0,
Expand Down
10 changes: 2 additions & 8 deletions bustubx/src/execution/physical_plan/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,15 @@ impl PhysicalSeqScan {
impl VolcanoExecutor for PhysicalSeqScan {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
debug!("init table scan executor");
let table_info = context
.catalog
.get_mut_table_by_name(self.table.table())
.unwrap();
let table_info = context.catalog.get_mut_table_by_name(self.table.table())?;
let inited_iterator = table_info.table.iter(None, None);
let mut iterator = self.iterator.lock().unwrap();
*iterator = inited_iterator;
Ok(())
}

fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
let table_info = context
.catalog
.get_mut_table_by_name(self.table.table())
.unwrap();
let table_info = context.catalog.get_mut_table_by_name(self.table.table())?;
let mut iterator = self.iterator.lock().unwrap();
let full_tuple = iterator.next(&mut table_info.table);
return Ok(full_tuple.map(|t| t.1));
Expand Down
8 changes: 3 additions & 5 deletions bustubx/src/planner/logical_planner/plan_create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ impl<'a> LogicalPlanner<'a> {
let table_schema = self
.context
.catalog
.get_table_by_name(table.table())
.map_or(
Err(BustubxError::Plan(format!("table {} not found", table))),
|info| Ok(info.schema.clone()),
)?;
.get_table_by_name(table.table())?
.schema
.clone();
Ok(LogicalPlan::CreateIndex(CreateIndex {
index_name,
table,
Expand Down
8 changes: 3 additions & 5 deletions bustubx/src/planner/logical_planner/plan_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ impl<'a> LogicalPlanner<'a> {
let table_schema = self
.context
.catalog
.get_table_by_name(table.table())
.map_or(
Err(BustubxError::Plan(format!("table {} not found", table))),
|info| Ok(info.schema.clone()),
)?;
.get_table_by_name(table.table())?
.schema
.clone();

let projected_schema = if columns_ident.is_empty() {
table_schema.clone()
Expand Down
8 changes: 3 additions & 5 deletions bustubx/src/planner/logical_planner/plan_set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,9 @@ impl LogicalPlanner<'_> {
let schema = self
.context
.catalog
.get_table_by_name(table_ref.table())
.map_or(
Err(BustubxError::Plan(format!("table {} not found", table_ref))),
|info| Ok(info.schema.clone()),
)?;
.get_table_by_name(table_ref.table())?
.schema
.clone();
Ok(LogicalPlan::TableScan(TableScan {
table_ref,
table_schema: schema,
Expand Down

0 comments on commit 2e95c37

Please sign in to comment.