Skip to content

Commit

Permalink
Implement TablePageCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 6, 2024
1 parent 0cbf757 commit 1f53975
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 74 deletions.
130 changes: 118 additions & 12 deletions bustubx/src/storage/codec/table_page.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,137 @@
use crate::buffer::BUSTUBX_PAGE_SIZE;
use crate::catalog::SchemaRef;
use crate::storage::codec::{CommonCodec, DecodedData};
use crate::storage::table_page::{TABLE_PAGE_HEADER_SIZE, TABLE_PAGE_TUPLE_INFO_SIZE};
use crate::storage::table_page::{TablePageHeader, TupleInfo};
use crate::storage::{TablePage, TupleMeta};
use crate::BustubxResult;

pub struct TablePageCodec;

impl TablePageCodec {
pub fn encode(page: &TablePage) -> Vec<u8> {
let mut header_bytes = Vec::new();
header_bytes.extend(CommonCodec::encode_u32(page.next_page_id));
header_bytes.extend(CommonCodec::encode_u16(page.num_tuples));
header_bytes.extend(CommonCodec::encode_u16(page.num_deleted_tuples));

todo!()
let header_bytes = TablePageHeaderCodec::encode(&page.header);
let mut all_bytes = page.data.clone();
all_bytes[0..header_bytes.len()].copy_from_slice(&header_bytes);
all_bytes.to_vec()
}

pub fn decode(bytes: &[u8], schema: SchemaRef) -> BustubxResult<DecodedData<TablePage>> {
todo!()
let (header, offset) = TablePageHeaderCodec::decode(bytes)?;
let mut data = [0u8; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&bytes[0..BUSTUBX_PAGE_SIZE]);
Ok((
TablePage {
schema,
header,
data,
},
BUSTUBX_PAGE_SIZE,
))
}
}

pub struct TupleMetaCodec;
pub struct TablePageHeaderCodec;

impl TablePageHeaderCodec {
pub fn encode(header: &TablePageHeader) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend(CommonCodec::encode_u32(header.next_page_id));
bytes.extend(CommonCodec::encode_u16(header.num_tuples));
bytes.extend(CommonCodec::encode_u16(header.num_deleted_tuples));
for tuple_info in header.tuple_infos.iter() {
bytes.extend(CommonCodec::encode_u16(tuple_info.offset));
bytes.extend(CommonCodec::encode_u16(tuple_info.size));
bytes.extend(CommonCodec::encode_u32(tuple_info.meta.insert_txn_id));
bytes.extend(CommonCodec::encode_u32(tuple_info.meta.delete_txn_id));
bytes.extend(CommonCodec::encode_bool(tuple_info.meta.is_deleted));
}
bytes
}

pub fn decode(bytes: &[u8]) -> BustubxResult<DecodedData<TablePageHeader>> {
let mut left_bytes = bytes;

let (next_page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

let (num_tuples, offset) = CommonCodec::decode_u16(left_bytes)?;
left_bytes = &left_bytes[offset..];

let (num_deleted_tuples, offset) = CommonCodec::decode_u16(left_bytes)?;
left_bytes = &left_bytes[offset..];

let mut tuple_infos = vec![];
for _ in 0..num_tuples {
let (tuple_offset, offset) = CommonCodec::decode_u16(left_bytes)?;
left_bytes = &left_bytes[offset..];
let (size, offset) = CommonCodec::decode_u16(left_bytes)?;
left_bytes = &left_bytes[offset..];
let (insert_txn_id, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];
let (delete_txn_id, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];
let (is_deleted, offset) = CommonCodec::decode_bool(left_bytes)?;
left_bytes = &left_bytes[offset..];
tuple_infos.push(TupleInfo {
offset: tuple_offset,
size,
meta: TupleMeta {
insert_txn_id,
delete_txn_id,
is_deleted,
},
});
}
Ok((
TablePageHeader {
next_page_id,
num_tuples,
num_deleted_tuples,
tuple_infos,
},
bytes.len() - left_bytes.len(),
))
}
}

#[cfg(test)]
mod tests {
use crate::buffer::INVALID_PAGE_ID;
use crate::catalog::{Column, DataType, Schema};
use crate::storage::codec::table_page::TablePageHeaderCodec;
use crate::storage::codec::TablePageCodec;
use crate::storage::{TablePage, TupleMeta};
use crate::Tuple;
use std::sync::Arc;

#[test]
fn table_page_codec() {
let schema = Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8, true),
Column::new("b".to_string(), DataType::Int32, true),
]));
let tuple1 = Tuple::new(schema.clone(), vec![1i8.into(), 1i32.into()]);
let tuple1_meta = TupleMeta {
insert_txn_id: 1,
delete_txn_id: 2,
is_deleted: false,
};
let tuple2 = Tuple::new(schema.clone(), vec![2i8.into(), 2i32.into()]);
let tuple2_meta = TupleMeta {
insert_txn_id: 3,
delete_txn_id: 4,
is_deleted: true,
};

let mut table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
table_page.insert_tuple(&tuple1_meta, &tuple1);
table_page.insert_tuple(&tuple2_meta, &tuple2);

impl TupleMetaCodec {
pub fn encode(meta: &TupleMeta) -> Vec<u8> {
todo!()
let (new_page, _) =
TablePageCodec::decode(&TablePageCodec::encode(&table_page), schema.clone()).unwrap();
assert_eq!(new_page.schema, table_page.schema);
assert_eq!(new_page.header, table_page.header);
let header_size = TablePageHeaderCodec::encode(&table_page.header).len();
assert_eq!(new_page.data[header_size..], table_page.data[header_size..]);
}
}
16 changes: 8 additions & 8 deletions bustubx/src/storage/table_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl TableHeap {
// if there's no tuple in the page, and we can't insert the tuple,
// then this tuple is too large.
assert!(
last_table_page.num_tuples > 0,
last_table_page.header.num_tuples > 0,
"tuple is too large, cannot insert"
);

Expand All @@ -77,7 +77,7 @@ impl TableHeap {
next_page.data = next_table_page.to_bytes();

// Update and release the previous page
last_table_page.next_page_id = next_page_id;
last_table_page.header.next_page_id = next_page_id;
self.buffer_pool_manager
.write_page(last_page_id, last_table_page.to_bytes());
self.buffer_pool_manager.unpin_page(last_page_id, true);
Expand Down Expand Up @@ -139,7 +139,7 @@ impl TableHeap {
let table_page = TablePage::from_bytes(self.schema.clone(), &page.data);
self.buffer_pool_manager
.unpin_page(self.first_page_id, false);
if table_page.num_tuples == 0 {
if table_page.header.num_tuples == 0 {
// TODO 忽略删除的tuple
return None;
} else {
Expand All @@ -159,21 +159,21 @@ impl TableHeap {
return next_rid;
}

if table_page.next_page_id == INVALID_PAGE_ID {
if table_page.header.next_page_id == INVALID_PAGE_ID {
return None;
}
let next_page = self
.buffer_pool_manager
.fetch_page_mut(table_page.next_page_id)
.fetch_page_mut(table_page.header.next_page_id)
.expect("Can not fetch page");
let next_table_page = TablePage::from_bytes(self.schema.clone(), &next_page.data);
self.buffer_pool_manager
.unpin_page(table_page.next_page_id, false);
if next_table_page.num_tuples == 0 {
.unpin_page(table_page.header.next_page_id, false);
if next_table_page.header.num_tuples == 0 {
// TODO 忽略删除的tuple
return None;
} else {
return Some(Rid::new(table_page.next_page_id, 0));
return Some(Rid::new(table_page.header.next_page_id, 0));
}
}

Expand Down
Loading

0 comments on commit 1f53975

Please sign in to comment.