Skip to content

Commit

Permalink
Implement initial meta page
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 18, 2024
1 parent 4872691 commit d9a9fab
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 69 deletions.
71 changes: 42 additions & 29 deletions bustubx/src/buffer/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ impl BufferPoolManager {
pub fn new_page(&mut self) -> BustubxResult<Arc<RwLock<Page>>> {
// 缓冲池已满且无可替换的页
if self.free_list.is_empty() && self.replacer.size() == 0 {
return Err(BustubxError::Storage("Failed to new page".to_string()));
return Err(BustubxError::Storage(
"Cannot new page because buffer pool is full and no page to evict".to_string(),
));
}

// 分配一个frame
Expand Down Expand Up @@ -213,7 +215,7 @@ mod tests {
let disk_manager = DiskManager::try_new(&db_path).unwrap();
let mut buffer_pool_manager = BufferPoolManager::new(3, Arc::new(disk_manager));
let page = buffer_pool_manager.new_page().unwrap().clone();
assert_eq!(page.read().unwrap().page_id, 0);
assert_eq!(page.read().unwrap().page_id, 1);
assert_eq!(
buffer_pool_manager.pool[0].read().unwrap().page_id,
page.read().unwrap().page_id
Expand All @@ -225,16 +227,16 @@ mod tests {
assert_eq!(buffer_pool_manager.free_list.len(), 2);
assert_eq!(buffer_pool_manager.replacer.size(), 0);

let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.read().unwrap().page_id, 1);
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.read().unwrap().page_id, 2);
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.read().unwrap().page_id, 3);
let page = buffer_pool_manager.new_page();
assert!(page.is_err());

buffer_pool_manager.unpin_page(0, false).unwrap();
buffer_pool_manager.unpin_page(1, false).unwrap();
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.read().unwrap().page_id, 3);
assert_eq!(page.read().unwrap().page_id, 4);

let _ = remove_file(db_path);
}
Expand All @@ -253,9 +255,9 @@ mod tests {
let page = buffer_pool_manager.new_page();
assert!(page.is_err());

buffer_pool_manager.unpin_page(0, true).unwrap();
buffer_pool_manager.unpin_page(1, true).unwrap();
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.read().unwrap().page_id, 3);
assert_eq!(page.read().unwrap().page_id, 4);

let _ = remove_file(db_path);
}
Expand All @@ -268,20 +270,26 @@ mod tests {
let disk_manager = DiskManager::try_new(&db_path).unwrap();
let mut buffer_pool_manager = BufferPoolManager::new(3, Arc::new(disk_manager));

let page = buffer_pool_manager.new_page().unwrap();
buffer_pool_manager.unpin_page(0, true).unwrap();
let page = buffer_pool_manager.new_page().unwrap();
buffer_pool_manager.unpin_page(1, false).unwrap();
let page = buffer_pool_manager.new_page().unwrap();
buffer_pool_manager.unpin_page(2, false).unwrap();
let page1 = buffer_pool_manager.new_page().unwrap();
let page1_id = page1.read().unwrap().page_id;
buffer_pool_manager.unpin_page(page1_id, true).unwrap();

let page = buffer_pool_manager.fetch_page(0).unwrap();
assert_eq!(page.read().unwrap().page_id, 0);
buffer_pool_manager.unpin_page(0, false).unwrap();
let page2 = buffer_pool_manager.new_page().unwrap();
let page2_id = page2.read().unwrap().page_id;
buffer_pool_manager.unpin_page(page2_id, false).unwrap();

let page3 = buffer_pool_manager.new_page().unwrap();
let page3_id = page3.read().unwrap().page_id;
buffer_pool_manager.unpin_page(page3_id, false).unwrap();

let page = buffer_pool_manager.fetch_page(page1_id).unwrap();
assert_eq!(page.read().unwrap().page_id, page1_id);
buffer_pool_manager.unpin_page(page1_id, false).unwrap();

let page = buffer_pool_manager.fetch_page(page2_id).unwrap();
assert_eq!(page.read().unwrap().page_id, page2_id);
buffer_pool_manager.unpin_page(page2_id, false).unwrap();

let page = buffer_pool_manager.fetch_page(1).unwrap();
assert_eq!(page.read().unwrap().page_id, 1);
buffer_pool_manager.unpin_page(1, false).unwrap();
assert_eq!(buffer_pool_manager.replacer.size(), 3);

let _ = remove_file(db_path);
Expand All @@ -295,22 +303,27 @@ mod tests {
let disk_manager = DiskManager::try_new(&db_path).unwrap();
let mut buffer_pool_manager = BufferPoolManager::new(3, Arc::new(disk_manager));

let page_id = buffer_pool_manager.new_page().unwrap();
buffer_pool_manager.unpin_page(0, true).unwrap();
let page_id = buffer_pool_manager.new_page().unwrap();
buffer_pool_manager.unpin_page(1, true).unwrap();
let page_id = buffer_pool_manager.new_page().unwrap();
buffer_pool_manager.unpin_page(2, false).unwrap();
let page1 = buffer_pool_manager.new_page().unwrap();
let page1_id = page1.read().unwrap().page_id;
buffer_pool_manager.unpin_page(page1_id, true).unwrap();

let page2 = buffer_pool_manager.new_page().unwrap();
let page2_id = page2.read().unwrap().page_id;
buffer_pool_manager.unpin_page(page2_id, true).unwrap();

let res = buffer_pool_manager.delete_page(0).unwrap();
let page3 = buffer_pool_manager.new_page().unwrap();
let page3_id = page3.read().unwrap().page_id;
buffer_pool_manager.unpin_page(page3_id, false).unwrap();

let res = buffer_pool_manager.delete_page(page1_id).unwrap();
assert!(res);
assert_eq!(buffer_pool_manager.pool.len(), 3);
assert_eq!(buffer_pool_manager.free_list.len(), 1);
assert_eq!(buffer_pool_manager.replacer.size(), 2);
assert_eq!(buffer_pool_manager.page_table.len(), 2);

let page = buffer_pool_manager.fetch_page(1).unwrap();
assert_eq!(page.read().unwrap().page_id, 1);
let page = buffer_pool_manager.fetch_page(page1_id).unwrap();
assert_eq!(page.read().unwrap().page_id, page1_id);

let _ = remove_file(db_path);
}
Expand Down
31 changes: 31 additions & 0 deletions bustubx/src/storage/codec/meta_page.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::storage::codec::{CommonCodec, DecodedData};
use crate::storage::meta_page::MetaPage;
use crate::BustubxResult;

pub struct MetaPageCodec;

impl MetaPageCodec {
pub fn encode(page: &MetaPage) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend(CommonCodec::encode_u32(page.major_version));
bytes.extend(CommonCodec::encode_u32(page.minor_version));
bytes
}

pub fn decode(bytes: &[u8]) -> BustubxResult<DecodedData<MetaPage>> {
let mut left_bytes = bytes;

let (major_version, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];
let (minor_version, offset) = CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

Ok((
MetaPage {
major_version,
minor_version,
},
bytes.len() - left_bytes.len(),
))
}
}
2 changes: 2 additions & 0 deletions bustubx/src/storage/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod common;
mod index_page;
mod meta_page;
mod scalar;
mod table_page;
mod tuple;
Expand All @@ -8,6 +9,7 @@ pub use common::CommonCodec;
pub use index_page::{
BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec, BPlusTreePageCodec, BPlusTreePageTypeCodec,

Check warning on line 10 in bustubx/src/storage/codec/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `BPlusTreePageTypeCodec`
};
pub use meta_page::MetaPageCodec;
pub use scalar::ScalarValueCodec;
pub use table_page::{
RidCodec, TablePageCodec, TablePageHeaderCodec, TablePageHeaderTupleInfoCodec,
Expand Down
54 changes: 39 additions & 15 deletions bustubx/src/storage/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,59 @@ use tracing::info;
use crate::error::{BustubxError, BustubxResult};

use crate::buffer::{PageId, BUSTUBX_PAGE_SIZE};
use crate::storage::codec::MetaPageCodec;
use crate::storage::meta_page::MetaPage;
use crate::storage::{EMPTY_META_PAGE, META_PAGE_SIZE};

Check warning on line 14 in bustubx/src/storage/disk_manager.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `EMPTY_META_PAGE`

static EMPTY_PAGE: [u8; BUSTUBX_PAGE_SIZE] = [0; BUSTUBX_PAGE_SIZE];

#[derive(Debug)]
pub struct DiskManager {
next_page_id: AtomicU32,
db_file: Mutex<File>,
meta: MetaPage,
}

impl DiskManager {
pub fn try_new(db_path: impl AsRef<Path>) -> BustubxResult<Self> {
// Create a file if not exists
let db_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(db_path)?;
let (db_file, meta) = if db_path.as_ref().exists() {
let mut db_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(db_path)?;
let mut buf = vec![0; *META_PAGE_SIZE];
db_file.read_exact(&mut buf)?;
let (meta_page, _) = MetaPageCodec::decode(&buf)?;
(db_file, meta_page)
} else {
let mut db_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(db_path)?;
let meta_page = MetaPage::try_new()?;
db_file.write(&MetaPageCodec::encode(&meta_page))?;
(db_file, meta_page)
};

// calculate next page id
let db_file_len = db_file.metadata()?.len();
if db_file_len % BUSTUBX_PAGE_SIZE as u64 != 0 {
if (db_file_len - *META_PAGE_SIZE as u64) % BUSTUBX_PAGE_SIZE as u64 != 0 {
return Err(BustubxError::Internal(format!(
"db file size not a multiple of {}",
BUSTUBX_PAGE_SIZE
"db file size not a multiple of {} + meta page size {}",
BUSTUBX_PAGE_SIZE, *META_PAGE_SIZE,
)));
}
let next_page_id = (db_file_len / BUSTUBX_PAGE_SIZE as u64) as PageId;
let next_page_id =
(((db_file_len - *META_PAGE_SIZE as u64) / BUSTUBX_PAGE_SIZE as u64) + 1) as PageId;
info!("Initialized disk_manager next_page_id: {}", next_page_id);

Ok(Self {
next_page_id: AtomicU32::new(next_page_id),
// Use a mutex to wrap the file handle to ensure that only one thread
// can access the file at the same time among multiple threads.
db_file: Mutex::new(db_file),
meta,
})
}

Expand All @@ -52,7 +71,7 @@ impl DiskManager {

// set offset and read page data
guard.seek(std::io::SeekFrom::Start(
(page_id as usize * BUSTUBX_PAGE_SIZE) as u64,
(*META_PAGE_SIZE + (page_id - 1) as usize * BUSTUBX_PAGE_SIZE) as u64,
))?;
// Read buf.len() bytes of data from the file, and store the data in the buf array.
guard.read_exact(&mut buf)?;
Expand Down Expand Up @@ -102,7 +121,7 @@ impl DiskManager {
) -> BustubxResult<()> {
// Seek to the start of the page in the database file and write the data.
guard.seek(std::io::SeekFrom::Start(
(page_id as usize * BUSTUBX_PAGE_SIZE) as u64,
(*META_PAGE_SIZE + (page_id - 1) as usize * BUSTUBX_PAGE_SIZE) as u64,
))?;
guard.write_all(data)?;
guard.flush()?;
Expand All @@ -119,6 +138,8 @@ impl DiskManager {
#[cfg(test)]
mod tests {
use crate::buffer::BUSTUBX_PAGE_SIZE;
use crate::storage::codec::MetaPageCodec;
use crate::storage::EMPTY_META_PAGE;
use tempfile::TempDir;

#[test]
Expand All @@ -129,22 +150,25 @@ mod tests {
let disk_manager = super::DiskManager::try_new(&temp_path).unwrap();

let page_id1 = disk_manager.allocate_page().unwrap();
assert_eq!(page_id1, 0);
assert_eq!(page_id1, 1);
let mut page1 = vec![1, 2, 3];
page1.extend(vec![0; BUSTUBX_PAGE_SIZE - 3]);
disk_manager.write_page(page_id1, &page1).unwrap();
let page = disk_manager.read_page(page_id1).unwrap();
assert_eq!(page, page1.as_slice());

let page_id2 = disk_manager.allocate_page().unwrap();
assert_eq!(page_id2, 1);
assert_eq!(page_id2, 2);
let mut page2 = vec![0; BUSTUBX_PAGE_SIZE - 3];
page2.extend(vec![4, 5, 6]);
disk_manager.write_page(page_id2, &page2).unwrap();
let page = disk_manager.read_page(page_id2).unwrap();
assert_eq!(page, page2.as_slice());

let db_file_len = disk_manager.db_file_len().unwrap();
assert_eq!(db_file_len as usize, BUSTUBX_PAGE_SIZE * 2);
assert_eq!(
db_file_len as usize,
BUSTUBX_PAGE_SIZE * 2 + MetaPageCodec::encode(&EMPTY_META_PAGE).len()
);
}
}
Loading

0 comments on commit d9a9fab

Please sign in to comment.