Skip to content

Commit

Permalink
Apply TablePageCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 6, 2024
1 parent 1f53975 commit 2db2761
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 146 deletions.
47 changes: 32 additions & 15 deletions bustubx/src/storage/table_heap.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::buffer::{PageId, INVALID_PAGE_ID};
use crate::buffer::{PageId, BUSTUBX_PAGE_SIZE, INVALID_PAGE_ID};
use crate::catalog::SchemaRef;
use crate::storage::codec::TablePageCodec;
use crate::{buffer::BufferPoolManager, common::rid::Rid};

use super::{
Expand All @@ -23,7 +24,9 @@ impl TableHeap {
.expect("Can not new page for table heap");
let first_page_id = first_page.page_id;
let table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
first_page.data = table_page.to_bytes();
let mut data = [0u8; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&TablePageCodec::encode(&table_page));
first_page.data = data;
buffer_pool_manager.unpin_page(first_page_id, true);

Self {
Expand Down Expand Up @@ -54,7 +57,8 @@ impl TableHeap {
.expect("Can not fetch last page");

// Loop until a suitable page is found for inserting the tuple
let mut last_table_page = TablePage::from_bytes(self.schema.clone(), &last_page.data);
let (mut last_table_page, _) =
TablePageCodec::decode(&last_page.data, self.schema.clone()).unwrap();
loop {
if last_table_page.get_next_tuple_offset(meta, tuple).is_some() {
break;
Expand All @@ -74,12 +78,17 @@ impl TableHeap {
.expect("cannot allocate page");
let next_page_id = next_page.page_id;
let next_table_page = TablePage::new(self.schema.clone(), INVALID_PAGE_ID);
next_page.data = next_table_page.to_bytes();
let mut data = [0u8; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&TablePageCodec::encode(&next_table_page));
next_page.data = data;

// Update and release the previous page
last_table_page.header.next_page_id = next_page_id;
self.buffer_pool_manager
.write_page(last_page_id, last_table_page.to_bytes());

let mut data = [0u8; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&TablePageCodec::encode(&last_table_page));

self.buffer_pool_manager.write_page(last_page_id, data);
self.buffer_pool_manager.unpin_page(last_page_id, true);

// Update last_page_id.
Expand All @@ -90,8 +99,11 @@ impl TableHeap {

// Insert the tuple into the chosen page
let slot_id = last_table_page.insert_tuple(meta, tuple);
self.buffer_pool_manager
.write_page(last_page_id, last_table_page.to_bytes());

let mut data = [0u8; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&TablePageCodec::encode(&last_table_page));

self.buffer_pool_manager.write_page(last_page_id, data);
self.buffer_pool_manager.unpin_page(last_page_id, true);

// Map the slot_id to a Rid and return
Expand All @@ -103,9 +115,13 @@ impl TableHeap {
.buffer_pool_manager
.fetch_page_mut(rid.page_id)
.expect("Can not fetch page");
let mut table_page = TablePage::from_bytes(self.schema.clone(), &page.data);
let (mut table_page, _) = TablePageCodec::decode(&page.data, self.schema.clone()).unwrap();
table_page.update_tuple_meta(meta, &rid);
page.data = table_page.to_bytes();

let mut data = [0u8; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&TablePageCodec::encode(&table_page));

page.data = data;
self.buffer_pool_manager.unpin_page(rid.page_id, true);
}

Expand All @@ -114,7 +130,7 @@ impl TableHeap {
.buffer_pool_manager
.fetch_page_mut(rid.page_id)
.expect("Can not fetch page");
let mut table_page = TablePage::from_bytes(self.schema.clone(), &page.data);
let (mut table_page, _) = TablePageCodec::decode(&page.data, self.schema.clone()).unwrap();
let result = table_page.get_tuple(&rid);
self.buffer_pool_manager.unpin_page(rid.page_id, false);
result
Expand All @@ -125,7 +141,7 @@ impl TableHeap {
.buffer_pool_manager
.fetch_page_mut(rid.page_id)
.expect("Can not fetch page");
let mut table_page = TablePage::from_bytes(self.schema.clone(), &page.data);
let (mut table_page, _) = TablePageCodec::decode(&page.data, self.schema.clone()).unwrap();
let result = table_page.get_tuple_meta(&rid);
self.buffer_pool_manager.unpin_page(rid.page_id, false);
result
Expand All @@ -136,7 +152,7 @@ impl TableHeap {
.buffer_pool_manager
.fetch_page_mut(self.first_page_id)
.expect("Can not fetch page");
let table_page = TablePage::from_bytes(self.schema.clone(), &page.data);
let (table_page, _) = TablePageCodec::decode(&page.data, self.schema.clone()).unwrap();
self.buffer_pool_manager
.unpin_page(self.first_page_id, false);
if table_page.header.num_tuples == 0 {
Expand All @@ -152,7 +168,7 @@ impl TableHeap {
.buffer_pool_manager
.fetch_page_mut(rid.page_id)
.expect("Can not fetch page");
let table_page = TablePage::from_bytes(self.schema.clone(), &page.data);
let (table_page, _) = TablePageCodec::decode(&page.data, self.schema.clone()).unwrap();
self.buffer_pool_manager.unpin_page(rid.page_id, false);
let next_rid = table_page.get_next_rid(&rid);
if next_rid.is_some() {
Expand All @@ -166,7 +182,8 @@ impl TableHeap {
.buffer_pool_manager
.fetch_page_mut(table_page.header.next_page_id)
.expect("Can not fetch page");
let next_table_page = TablePage::from_bytes(self.schema.clone(), &next_page.data);
let (next_table_page, _) =
TablePageCodec::decode(&next_page.data, self.schema.clone()).unwrap();
self.buffer_pool_manager
.unpin_page(table_page.header.next_page_id, false);
if next_table_page.header.num_tuples == 0 {
Expand Down
132 changes: 1 addition & 131 deletions bustubx/src/storage/table_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,78 +154,6 @@ impl TablePage {

return Some(Rid::new(rid.page_id, tuple_id + 1));
}

// Parse real data from disk pages into memory pages.
pub fn from_bytes(schema: SchemaRef, data: &[u8]) -> Self {
let (next_page_id, _) = CommonCodec::decode_u32(data).unwrap();
let mut table_page = Self::new(schema, next_page_id);
table_page.header.num_tuples = u16::from_be_bytes([data[4], data[5]]);
table_page.header.num_deleted_tuples = u16::from_be_bytes([data[6], data[7]]);

for i in 0..table_page.header.num_tuples as usize {
let offset = 8 + i * TABLE_PAGE_TUPLE_INFO_SIZE;
let tuple_offset = u16::from_be_bytes([data[offset], data[offset + 1]]);
let tuple_size = u16::from_be_bytes([data[offset + 2], data[offset + 3]]);
let insert_txn_id = u32::from_be_bytes([
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
]);
let delete_txn_id = u32::from_be_bytes([
data[offset + 8],
data[offset + 9],
data[offset + 10],
data[offset + 11],
]);
let is_deleted = u32::from_be_bytes([
data[offset + 12],
data[offset + 13],
data[offset + 14],
data[offset + 15],
]) == 1;
table_page.header.tuple_infos.push(TupleInfo {
offset: tuple_offset,
size: tuple_size,
meta: TupleMeta {
insert_txn_id,
delete_txn_id,
is_deleted,
},
});
}

table_page.data.copy_from_slice(data);

return table_page;
}

pub fn to_bytes(&self) -> [u8; BUSTUBX_PAGE_SIZE] {
let mut bytes = [0; BUSTUBX_PAGE_SIZE];
bytes[0..4].copy_from_slice(CommonCodec::encode_u32(self.header.next_page_id).as_slice());
bytes[4..6].copy_from_slice(CommonCodec::encode_u16(self.header.num_tuples).as_slice());
bytes[6..8]
.copy_from_slice(CommonCodec::encode_u16(self.header.num_deleted_tuples).as_slice());
for i in 0..self.header.num_tuples as usize {
let offset = 8 + i * TABLE_PAGE_TUPLE_INFO_SIZE;
let tuple_offset = self.header.tuple_infos[i].offset;
let tuple_size = self.header.tuple_infos[i].size;
let meta = self.header.tuple_infos[i].meta;
bytes[offset..offset + 2].copy_from_slice(&tuple_offset.to_be_bytes());
bytes[offset + 2..offset + 4].copy_from_slice(&tuple_size.to_be_bytes());
bytes[offset + 4..offset + 8].copy_from_slice(&meta.insert_txn_id.to_be_bytes());
bytes[offset + 8..offset + 12].copy_from_slice(&meta.delete_txn_id.to_be_bytes());
let is_deleted = if meta.is_deleted { 1u32 } else { 0u32 };
bytes[offset + 12..offset + 16].copy_from_slice(&is_deleted.to_be_bytes());
}
bytes[TABLE_PAGE_HEADER_SIZE
+ self.header.num_tuples as usize * TABLE_PAGE_TUPLE_INFO_SIZE..]
.copy_from_slice(
&self.data[TABLE_PAGE_HEADER_SIZE
+ self.header.num_tuples as usize * TABLE_PAGE_TUPLE_INFO_SIZE..],
);
bytes
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
Expand All @@ -247,9 +175,8 @@ pub struct TupleInfo {
mod tests {
use crate::buffer::BUSTUBX_PAGE_SIZE;
use crate::catalog::{Column, DataType, Schema};
use crate::storage::codec::TupleCodec;
use crate::storage::codec::{TablePageCodec, TupleCodec};
use crate::storage::Tuple;
use std::alloc::handle_alloc_error;
use std::sync::Arc;

#[test]
Expand Down Expand Up @@ -325,61 +252,4 @@ mod tests {
assert_eq!(tuple_meta.delete_txn_id, 1);
assert_eq!(tuple_meta.insert_txn_id, 2);
}

#[test]
pub fn test_table_page_from_to_bytes() {
let schema = Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8, false),
Column::new("b".to_string(), DataType::Int16, false),
]));
let mut table_page = super::TablePage::new(schema.clone(), 1);
let meta = super::TupleMeta {
insert_txn_id: 0,
delete_txn_id: 0,
is_deleted: false,
};

let tuple1 = Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]);
let tuple1_size = TupleCodec::encode(&tuple1).len() as u16;
let tuple_id1 = table_page.insert_tuple(&meta, &tuple1);

let tuple2 = Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]);
let tuple2_size = TupleCodec::encode(&tuple2).len() as u16;
let tuple_id2 = table_page.insert_tuple(&meta, &tuple2);

let tuple3 = Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]);
let tuple3_size = TupleCodec::encode(&tuple3).len() as u16;
let tuple_id3 = table_page.insert_tuple(&meta, &tuple3);

let bytes = table_page.to_bytes();
let table_page2 = super::TablePage::from_bytes(schema.clone(), &bytes);
assert_eq!(table_page2.header.next_page_id, 1);
assert_eq!(table_page2.header.num_tuples, 3);
assert_eq!(table_page2.header.num_deleted_tuples, 0);
assert_eq!(table_page2.header.tuple_infos.len(), 3);

assert_eq!(
table_page2.header.tuple_infos[0].offset,
BUSTUBX_PAGE_SIZE as u16 - tuple1_size
);
assert_eq!(
table_page2.header.tuple_infos[0].size,
TupleCodec::encode(&tuple1).len() as u16
);
assert_eq!(table_page2.header.tuple_infos[0].meta, meta);

assert_eq!(
table_page2.header.tuple_infos[1].offset,
BUSTUBX_PAGE_SIZE as u16 - tuple1_size - tuple2_size
);
assert_eq!(table_page2.header.tuple_infos[1].size, tuple2_size);
assert_eq!(table_page2.header.tuple_infos[1].meta, meta);

assert_eq!(
table_page2.header.tuple_infos[2].offset,
BUSTUBX_PAGE_SIZE as u16 - tuple1_size - tuple2_size - tuple3_size
);
assert_eq!(table_page2.header.tuple_infos[2].size, tuple3_size);
assert_eq!(table_page2.header.tuple_infos[2].meta, meta);
}
}

0 comments on commit 2db2761

Please sign in to comment.