Skip to content

Commit

Permalink
Use freelist to reuse deallocated pages
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 19, 2024
1 parent b0c41ed commit 4968187
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 152 deletions.
44 changes: 29 additions & 15 deletions bustubx/src/buffer/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,29 +215,43 @@ mod tests {

let disk_manager = DiskManager::try_new(&temp_path).unwrap();
let mut buffer_pool_manager = BufferPoolManager::new(3, Arc::new(disk_manager));
let page = buffer_pool_manager.new_page().unwrap().clone();
assert_eq!(page.read().unwrap().page_id, 1);
let page1 = buffer_pool_manager.new_page().unwrap().clone();
let page1_id = page1.read().unwrap().page_id;
assert_eq!(
buffer_pool_manager.pool[0].read().unwrap().page_id,
page.read().unwrap().page_id
page1_id,
);
assert_eq!(
buffer_pool_manager.page_table[&page.read().unwrap().page_id],
buffer_pool_manager.page_table[&page1.read().unwrap().page_id],
0
);
assert_eq!(buffer_pool_manager.free_list.len(), 2);
assert_eq!(buffer_pool_manager.replacer.size(), 0);

let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.read().unwrap().page_id, 2);
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.read().unwrap().page_id, 3);
let page = buffer_pool_manager.new_page();
assert!(page.is_err());
let page2 = buffer_pool_manager.new_page().unwrap();
let page2_id = page2.read().unwrap().page_id;
assert_eq!(
buffer_pool_manager.pool[1].read().unwrap().page_id,
page2_id,
);

buffer_pool_manager.unpin_page(1, false).unwrap();
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.read().unwrap().page_id, 4);
let page3 = buffer_pool_manager.new_page().unwrap();
let page3_id = page3.read().unwrap().page_id;
assert_eq!(
buffer_pool_manager.pool[2].read().unwrap().page_id,
page3_id,
);

let page4 = buffer_pool_manager.new_page();
assert!(page4.is_err());

buffer_pool_manager.unpin_page(page1_id, false).unwrap();
let page5 = buffer_pool_manager.new_page().unwrap();
let page5_id = page5.read().unwrap().page_id;
assert_eq!(
buffer_pool_manager.pool[0].read().unwrap().page_id,
page5_id,
);
}

#[test]
Expand All @@ -254,9 +268,9 @@ mod tests {
let page = buffer_pool_manager.new_page();
assert!(page.is_err());

buffer_pool_manager.unpin_page(1, true).unwrap();
buffer_pool_manager.unpin_page(2, true).unwrap();
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.read().unwrap().page_id, 4);
assert_eq!(page.read().unwrap().page_id, 5);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion bustubx/src/buffer/page.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub type PageId = u32;

pub const INVALID_PAGE_ID: PageId = u32::MAX;
pub const INVALID_PAGE_ID: PageId = 0;
pub const BUSTUBX_PAGE_SIZE: usize = 4096;

#[derive(Debug, Clone)]
Expand Down
9 changes: 5 additions & 4 deletions bustubx/src/storage/codec/freelist_page.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::buffer::BUSTUBX_PAGE_SIZE;
use crate::storage::codec::{CommonCodec, DecodedData};
use crate::storage::{FreelistPage, FreelistPageHeader};
use crate::BustubxResult;
Expand Down Expand Up @@ -45,6 +46,9 @@ impl FreelistPageCodec {
for i in 0..page.header.current_size {
bytes.extend(CommonCodec::encode_u32(page.array[i as usize]))
}
// make sure length of bytes is BUSTUBX_PAGE_SIZE
assert!(bytes.len() <= BUSTUBX_PAGE_SIZE);
bytes.extend(vec![0; BUSTUBX_PAGE_SIZE - bytes.len()]);
bytes
}

Expand All @@ -61,10 +65,7 @@ impl FreelistPageCodec {
array.push(page_id);
}

Ok((
FreelistPage { header, array },
bytes.len() - left_bytes.len(),
))
Ok((FreelistPage { header, array }, BUSTUBX_PAGE_SIZE))
}
}

Expand Down
8 changes: 2 additions & 6 deletions bustubx/src/storage/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ mod tuple;

pub use common::CommonCodec;
pub use freelist_page::{FreelistPageCodec, FreelistPageHeaderCodec};
pub use index_page::{
BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec, BPlusTreePageCodec, BPlusTreePageTypeCodec,
};
pub use index_page::*;
pub use meta_page::MetaPageCodec;
pub use scalar::ScalarValueCodec;
pub use table_page::{
RidCodec, TablePageCodec, TablePageHeaderCodec, TablePageHeaderTupleInfoCodec,
};
pub use table_page::*;
pub use tuple::TupleCodec;

// data + consumed offset
Expand Down
139 changes: 116 additions & 23 deletions bustubx/src/storage/disk_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fs::File;
use std::path::Path;
use std::sync::RwLock;
use std::{
io::{Read, Seek, Write},
sync::{atomic::AtomicU32, Mutex, MutexGuard},
Expand All @@ -8,18 +9,18 @@ use tracing::info;

use crate::error::{BustubxError, BustubxResult};

use crate::buffer::{PageId, BUSTUBX_PAGE_SIZE};
use crate::storage::codec::MetaPageCodec;
use crate::buffer::{PageId, BUSTUBX_PAGE_SIZE, INVALID_PAGE_ID};
use crate::storage::codec::{FreelistPageCodec, MetaPageCodec};
use crate::storage::meta_page::MetaPage;
use crate::storage::META_PAGE_SIZE;
use crate::storage::{disk_manager, FreelistPage, META_PAGE_SIZE};

Check warning on line 15 in bustubx/src/storage/disk_manager.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `disk_manager`

static EMPTY_PAGE: [u8; BUSTUBX_PAGE_SIZE] = [0; BUSTUBX_PAGE_SIZE];

#[derive(Debug)]
pub struct DiskManager {
next_page_id: AtomicU32,
db_file: Mutex<File>,
meta: MetaPage,
meta: RwLock<MetaPage>,
}

impl DiskManager {
Expand Down Expand Up @@ -56,13 +57,20 @@ impl DiskManager {
(((db_file_len - *META_PAGE_SIZE as u64) / BUSTUBX_PAGE_SIZE as u64) + 1) as PageId;
info!("Initialized disk_manager next_page_id: {}", next_page_id);

Ok(Self {
let disk_manager = Self {
next_page_id: AtomicU32::new(next_page_id),
// Use a mutex to wrap the file handle to ensure that only one thread
// can access the file at the same time among multiple threads.
db_file: Mutex::new(db_file),
meta,
})
meta: RwLock::new(meta),
};

// new page for freelist
let freelist_page_id = disk_manager.allocate_page()?;
disk_manager.meta.write().unwrap().freelist_page_id = freelist_page_id;
disk_manager.write_meta_page()?;

Ok(disk_manager)
}

pub fn read_page(&self, page_id: PageId) -> BustubxResult<[u8; BUSTUBX_PAGE_SIZE]> {
Expand Down Expand Up @@ -90,28 +98,96 @@ impl DiskManager {
Self::write_page_internal(&mut guard, page_id, data)
}

// TODO 使用bitmap管理
pub fn allocate_page(&self) -> BustubxResult<PageId> {
let mut guard = self.db_file.lock().unwrap();
if let Some(page_id) = self.freelist_pop()? {
Ok(page_id)
} else {
let mut guard = self.db_file.lock().unwrap();

// fetch current value and increment page id
let page_id = self
.next_page_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
// fetch current value and increment page id
let page_id = self
.next_page_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

// Write an empty page (all zeros) to the allocated page.
Self::write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
// Write an empty page (all zeros) to the allocated page.
Self::write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;

Ok(page_id)
Ok(page_id)
}
}

pub fn deallocate_page(&self, page_id: PageId) -> BustubxResult<()> {
// TODO 利用pageId或者释放的空间
let mut guard = self.db_file.lock().unwrap();

// Write an empty page (all zeros) to the deallocated page.
// But this page is not deallocated, only data will be written with null or zeros.
Self::write_page_internal(&mut guard, page_id, &EMPTY_PAGE)
let mut guard = self.db_file.lock().unwrap();
Self::write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
drop(guard);

self.freelist_push(page_id)?;
Ok(())
}

fn freelist_push(&self, page_id: PageId) -> BustubxResult<()> {
let mut curr_page_id = INVALID_PAGE_ID;
let mut next_page_id = self.meta.read().unwrap().freelist_page_id;
loop {
let mut freelist_page = if next_page_id == INVALID_PAGE_ID {
next_page_id = self.allocate_page()?;
// init freelist page
let freelist_page = FreelistPage::new();
self.write_page(next_page_id, &FreelistPageCodec::encode(&freelist_page))?;
if curr_page_id != INVALID_PAGE_ID {
let (mut curr_freelist_page, _) =
FreelistPageCodec::decode(&self.read_page(curr_page_id)?)?;
curr_freelist_page.header.next_page_id = next_page_id;
self.write_page(
curr_page_id,
&FreelistPageCodec::encode(&curr_freelist_page),
)?;
}

freelist_page
} else {
let (freelist_page, _) = FreelistPageCodec::decode(&self.read_page(next_page_id)?)?;
freelist_page
};

if freelist_page.is_full() {
curr_page_id = next_page_id;
next_page_id = freelist_page.header.next_page_id;
} else {
freelist_page.push(page_id);
// persist page data
self.write_page(next_page_id, &FreelistPageCodec::encode(&freelist_page))?;
break;
}
}
Ok(())
}

fn freelist_pop(&self) -> BustubxResult<Option<PageId>> {
let mut freelist_page_id = self.meta.read().unwrap().freelist_page_id;
loop {
if freelist_page_id != INVALID_PAGE_ID {
let (mut freelist_page, _) =
FreelistPageCodec::decode(&self.read_page(freelist_page_id)?)?;
if let Some(page_id) = freelist_page.pop() {
self.write_page(freelist_page_id, &FreelistPageCodec::encode(&freelist_page))?;
return Ok(Some(page_id));
} else {
freelist_page_id = freelist_page.header.next_page_id;
}
} else {
return Ok(None);
}
}
}

fn write_meta_page(&self) -> BustubxResult<()> {
let mut guard = self.db_file.lock().unwrap();
guard.write_all(&MetaPageCodec::encode(&self.meta.read().unwrap()))?;
guard.flush()?;
Ok(())
}

fn write_page_internal(
Expand Down Expand Up @@ -150,15 +226,15 @@ mod tests {
let disk_manager = super::DiskManager::try_new(&temp_path).unwrap();

let page_id1 = disk_manager.allocate_page().unwrap();
assert_eq!(page_id1, 1);
assert_eq!(page_id1, 2);
let mut page1 = vec![1, 2, 3];
page1.extend(vec![0; BUSTUBX_PAGE_SIZE - 3]);
disk_manager.write_page(page_id1, &page1).unwrap();
let page = disk_manager.read_page(page_id1).unwrap();
assert_eq!(page, page1.as_slice());

let page_id2 = disk_manager.allocate_page().unwrap();
assert_eq!(page_id2, 2);
assert_eq!(page_id2, 3);
let mut page2 = vec![0; BUSTUBX_PAGE_SIZE - 3];
page2.extend(vec![4, 5, 6]);
disk_manager.write_page(page_id2, &page2).unwrap();
Expand All @@ -168,7 +244,24 @@ mod tests {
let db_file_len = disk_manager.db_file_len().unwrap();
assert_eq!(
db_file_len as usize,
BUSTUBX_PAGE_SIZE * 2 + MetaPageCodec::encode(&EMPTY_META_PAGE).len()
BUSTUBX_PAGE_SIZE * 3 + MetaPageCodec::encode(&EMPTY_META_PAGE).len()
);
}

#[test]
pub fn test_disk_manager_freelist() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().join("test.db");

let disk_manager = super::DiskManager::try_new(&temp_path).unwrap();

let page_id1 = disk_manager.allocate_page().unwrap();
let page_id2 = disk_manager.allocate_page().unwrap();
let page_id3 = disk_manager.allocate_page().unwrap();

disk_manager.deallocate_page(page_id1).unwrap();

let page_id4 = disk_manager.allocate_page().unwrap();
assert_eq!(page_id1, page_id4);
}
}
30 changes: 30 additions & 0 deletions bustubx/src/storage/freelist_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,33 @@ pub struct FreelistPageHeader {
pub current_size: u32,
pub max_size: u32,
}

impl FreelistPage {
pub fn new() -> Self {
Self {
header: FreelistPageHeader {
next_page_id: INVALID_PAGE_ID,
current_size: 0,
max_size: FREELIST_PAGE_MAX_SIZE.clone() as u32,
},
array: vec![],
}
}

pub fn is_full(&self) -> bool {
self.header.current_size >= self.header.max_size
}

pub fn push(&mut self, page_id: PageId) {
self.array.push(page_id);
self.header.current_size += 1;
}

pub fn pop(&mut self) -> Option<PageId> {
let page_id = self.array.pop();
if page_id.is_some() {
self.header.current_size -= 1;
}
page_id
}
}
Loading

0 comments on commit 4968187

Please sign in to comment.