Skip to content

Commit

Permalink
Add BPlusTreePageCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 7, 2024
1 parent 1a57b1b commit a57d620
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 8 deletions.
220 changes: 218 additions & 2 deletions bustubx/src/storage/codec/index_page.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,111 @@
use crate::storage::codec::{CommonCodec, DecodedData};
use crate::storage::index_page::BPlusTreePageType;
use crate::buffer::BUSTUBX_PAGE_SIZE;
use crate::catalog::SchemaRef;
use crate::storage::codec::{CommonCodec, DecodedData, RidCodec, TupleCodec};
use crate::storage::index_page::{
BPlusTreeInternalPage, BPlusTreeInternalPageHeader, BPlusTreeLeafPage, BPlusTreeLeafPageHeader,
BPlusTreePage, BPlusTreePageType,
};
use crate::{BustubxError, BustubxResult};

pub struct BPlusTreePageCodec;

impl BPlusTreePageCodec {
pub fn encode(page: &BPlusTreePage) -> Vec<u8> {
match page {
BPlusTreePage::Leaf(page) => {
let mut bytes = vec![];
bytes.extend(BPlusTreeLeafPageHeaderCodec::encode(&page.header));
for (tuple, rid) in page.array.iter() {
bytes.extend(TupleCodec::encode(tuple));
bytes.extend(RidCodec::encode(rid));
}
// make sure length of bytes is BUSTUBX_PAGE_SIZE
assert!(bytes.len() <= BUSTUBX_PAGE_SIZE);
bytes.extend(vec![0; BUSTUBX_PAGE_SIZE - bytes.len()]);
bytes
}

BPlusTreePage::Internal(page) => {
let mut bytes = vec![];
bytes.extend(BPlusTreeInternalPageHeaderCodec::encode(&page.header));
for (tuple, page_id) in page.array.iter() {
bytes.extend(TupleCodec::encode(tuple));
bytes.extend(CommonCodec::encode_u32(*page_id));
}
// make sure length of bytes is BUSTUBX_PAGE_SIZE
assert!(bytes.len() <= BUSTUBX_PAGE_SIZE);
bytes.extend(vec![0; BUSTUBX_PAGE_SIZE - bytes.len()]);
bytes
}
}
}

pub fn decode(bytes: &[u8], schema: SchemaRef) -> BustubxResult<DecodedData<BPlusTreePage>> {
if bytes.len() != BUSTUBX_PAGE_SIZE {
return Err(BustubxError::Storage(format!(
"Index page size is not {} instead of {}",
BUSTUBX_PAGE_SIZE,
bytes.len()
)));
}
let mut left_bytes = bytes;

// not consume left_bytes
let (page_type, offset) = BPlusTreePageTypeCodec::decode(left_bytes)?;

match page_type {
BPlusTreePageType::LeafPage => {
let (header, offset) = BPlusTreeLeafPageHeaderCodec::decode(left_bytes)?;
left_bytes = &left_bytes[offset..];

let mut array = vec![];
for _ in 0..header.current_size {
let (tuple, offset) = TupleCodec::decode(left_bytes, schema.clone())?;
left_bytes = &left_bytes[offset..];

let (rid, offset) = RidCodec::decode(left_bytes)?;
left_bytes = &left_bytes[offset..];

array.push((tuple, rid));
}

Ok((
BPlusTreePage::Leaf(BPlusTreeLeafPage {
schema,
header,
array,
}),
BUSTUBX_PAGE_SIZE,
))
}
BPlusTreePageType::InternalPage => {
let (header, offset) = BPlusTreeInternalPageHeaderCodec::decode(left_bytes)?;
left_bytes = &left_bytes[offset..];

let mut array = vec![];
for _ in 0..header.current_size {
let (tuple, offset) = TupleCodec::decode(left_bytes, schema.clone())?;
left_bytes = &left_bytes[offset..];

let (page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

array.push((tuple, page_id));
}

Ok((
BPlusTreePage::Internal(BPlusTreeInternalPage {
schema,
header,
array,
}),
BUSTUBX_PAGE_SIZE,
))
}
}
}
}

pub struct BPlusTreePageTypeCodec;

impl BPlusTreePageTypeCodec {
Expand All @@ -21,3 +125,115 @@ impl BPlusTreePageTypeCodec {
}
}
}

pub struct BPlusTreeLeafPageHeaderCodec;

impl BPlusTreeLeafPageHeaderCodec {
pub fn encode(header: &BPlusTreeLeafPageHeader) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend(BPlusTreePageTypeCodec::encode(&header.page_type));
bytes.extend(CommonCodec::encode_u32(header.current_size));
bytes.extend(CommonCodec::encode_u32(header.max_size));
bytes.extend(CommonCodec::encode_u32(header.next_page_id));
bytes
}

pub fn decode(bytes: &[u8]) -> BustubxResult<DecodedData<BPlusTreeLeafPageHeader>> {
let mut left_bytes = bytes;

let (page_type, offset) = BPlusTreePageTypeCodec::decode(left_bytes)?;
left_bytes = &left_bytes[offset..];

let (current_size, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

let (max_size, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

let (next_page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

Ok((
BPlusTreeLeafPageHeader {
page_type,
current_size,
max_size,
next_page_id,
},
bytes.len() - left_bytes.len(),
))
}
}

pub struct BPlusTreeInternalPageHeaderCodec;

impl BPlusTreeInternalPageHeaderCodec {
pub fn encode(header: &BPlusTreeInternalPageHeader) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend(BPlusTreePageTypeCodec::encode(&header.page_type));
bytes.extend(CommonCodec::encode_u32(header.current_size));
bytes.extend(CommonCodec::encode_u32(header.max_size));
bytes
}

pub fn decode(bytes: &[u8]) -> BustubxResult<DecodedData<BPlusTreeInternalPageHeader>> {
let mut left_bytes = bytes;

let (page_type, offset) = BPlusTreePageTypeCodec::decode(left_bytes)?;
left_bytes = &left_bytes[offset..];

let (current_size, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

let (max_size, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

Ok((
BPlusTreeInternalPageHeader {
page_type,
current_size,
max_size,
},
bytes.len() - left_bytes.len(),
))
}
}

#[cfg(test)]
mod tests {
use crate::catalog::{Column, DataType, Schema};
use crate::common::rid::Rid;
use crate::storage::codec::index_page::BPlusTreePageCodec;
use crate::storage::index_page::{BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage};
use crate::Tuple;
use std::sync::Arc;

#[test]
fn index_page_codec() {
let schema = Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8, true),
Column::new("b".to_string(), DataType::Int32, true),
]));
let tuple1 = Tuple::new(schema.clone(), vec![1i8.into(), 1i32.into()]);
let rid1 = Rid::new(1, 1);
let tuple2 = Tuple::new(schema.clone(), vec![2i8.into(), 2i32.into()]);
let rid2 = Rid::new(2, 2);

let mut leaf_page = BPlusTreeLeafPage::new(schema.clone(), 100);
leaf_page.insert(tuple1.clone(), rid1, &schema);
leaf_page.insert(tuple2.clone(), rid2, &schema);
let page = BPlusTreePage::Leaf(leaf_page);
let (new_page, _) =
BPlusTreePageCodec::decode(&BPlusTreePageCodec::encode(&page), schema.clone()).unwrap();
assert_eq!(new_page, page);

let mut internal_page = BPlusTreeInternalPage::new(schema.clone(), 100);
internal_page.insert(Tuple::empty(schema.clone()), 1, &schema);
internal_page.insert(tuple1, 2, &schema);
internal_page.insert(tuple2, 3, &schema);
let page = BPlusTreePage::Internal(internal_page);
let (new_page, _) =
BPlusTreePageCodec::decode(&BPlusTreePageCodec::encode(&page), schema.clone()).unwrap();
assert_eq!(new_page, page);
}
}
4 changes: 3 additions & 1 deletion bustubx/src/storage/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ mod tuple;
pub use common::CommonCodec;
pub use index_page::BPlusTreePageTypeCodec;
pub use scalar::ScalarValueCodec;
pub use table_page::{TablePageCodec, TablePageHeaderCodec, TablePageHeaderTupleInfoCodec};
pub use table_page::{
RidCodec, TablePageCodec, TablePageHeaderCodec, TablePageHeaderTupleInfoCodec,
};
pub use tuple::TupleCodec;

// data + consumed offset
Expand Down
24 changes: 24 additions & 0 deletions bustubx/src/storage/codec/table_page.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::buffer::BUSTUBX_PAGE_SIZE;
use crate::catalog::SchemaRef;
use crate::common::rid::Rid;
use crate::storage::codec::{CommonCodec, DecodedData};
use crate::storage::table_page::{TablePageHeader, TupleInfo};
use crate::storage::{TablePage, TupleMeta};
Expand Down Expand Up @@ -121,6 +122,29 @@ impl TablePageHeaderTupleInfoCodec {
}
}

pub struct RidCodec;

impl RidCodec {
pub fn encode(rid: &Rid) -> Vec<u8> {
let mut bytes = vec![];
bytes.extend(CommonCodec::encode_u32(rid.page_id));
bytes.extend(CommonCodec::encode_u32(rid.slot_num));
bytes
}

pub fn decode(bytes: &[u8]) -> BustubxResult<DecodedData<Rid>> {
let mut left_bytes = bytes;

let (page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

let (slot_num, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

Ok((Rid::new(page_id, slot_num), bytes.len() - left_bytes.len()))
}
}

#[cfg(test)]
mod tests {
use crate::buffer::INVALID_PAGE_ID;
Expand Down
10 changes: 5 additions & 5 deletions bustubx/src/storage/index_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{catalog::Schema, common::rid::Rid};
pub const INTERNAL_PAGE_HEADER_SIZE: usize = 4 + 4 + 4;
pub const LEAF_PAGE_HEADER_SIZE: usize = 4 + 4 + 4 + 4;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum BPlusTreePage {
// B+树内部节点页
Internal(BPlusTreeInternalPage),
Expand Down Expand Up @@ -104,15 +104,15 @@ pub type LeafKV = (Tuple, Rid);
* | PageType (4) | CurrentSize (4) | MaxSize (4) |
* ----------------------------------------------------------------------------
*/
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BPlusTreeInternalPage {
pub schema: SchemaRef,
pub header: BPlusTreeInternalPageHeader,
// 第一个key为空,n个key对应n+1个value
pub array: Vec<InternalKV>,
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BPlusTreeInternalPageHeader {
pub page_type: BPlusTreePageType,
pub current_size: u32,
Expand Down Expand Up @@ -407,14 +407,14 @@ impl BPlusTreeInternalPage {
* | PageType (4) | CurrentSize (4) | MaxSize (4) | NextPageId (4)
* ---------------------------------------------------------------------
*/
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BPlusTreeLeafPage {
pub schema: SchemaRef,
pub header: BPlusTreeLeafPageHeader,
pub array: Vec<LeafKV>,
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BPlusTreeLeafPageHeader {
pub page_type: BPlusTreePageType,
pub current_size: u32,
Expand Down

0 comments on commit a57d620

Please sign in to comment.