Skip to content

Commit

Permalink
Refactor buffer pool
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 15, 2024
1 parent f2cce5c commit 67b1e00
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 178 deletions.
159 changes: 64 additions & 95 deletions bustubx/src/buffer/buffer_pool.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::RwLock;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
Expand All @@ -6,6 +7,7 @@ use std::{
use crate::buffer::page::{Page, PageId};
use crate::buffer::BUSTUBX_PAGE_SIZE;
use crate::storage::DiskManager;
use crate::{BustubxError, BustubxResult};

use super::replacer::LRUKReplacer;

Expand All @@ -15,7 +17,7 @@ pub const TABLE_HEAP_BUFFER_POOL_SIZE: usize = 100;

#[derive(Debug)]
pub struct BufferPoolManager {
pool: Vec<Page>,
pool: Vec<Arc<RwLock<Page>>>,
// LRU-K置换算法
pub replacer: LRUKReplacer,
pub disk_manager: Arc<DiskManager>,
Expand All @@ -33,7 +35,7 @@ impl BufferPoolManager {
free_list.push_back(i as FrameId);
}
Self {
pool: vec![Page::new(0); num_pages],
pool: vec![Arc::new(RwLock::new(Page::new(0))); num_pages],
replacer: LRUKReplacer::new(num_pages, 2),
disk_manager,
page_table: HashMap::new(),
Expand All @@ -43,10 +45,10 @@ impl BufferPoolManager {
}

// 从缓冲池创建一个新页
pub fn new_page(&mut self) -> Option<&mut Page> {
pub fn new_page(&mut self) -> BustubxResult<Arc<RwLock<Page>>> {
// 缓冲池已满且无可替换的页
if self.free_list.is_empty() && self.replacer.size() == 0 {
return None;
return Err(BustubxError::Storage("Failed to new page".to_string()));
}

// 分配一个frame
Expand All @@ -56,16 +58,17 @@ impl BufferPoolManager {
} else {
// 无空闲frame,从缓冲池中替换一个页
if let Some(frame_id) = self.replacer.evict() {
let evicted_page = &self.pool[frame_id as usize];
let evicted_page_id = evicted_page.page_id;
let evicted_page = self.pool[frame_id as usize].clone();
let evicted_page_id = evicted_page.read().unwrap().page_id;
// 如果页被修改过,则将其写回磁盘
if evicted_page.is_dirty {
let is_dirty = evicted_page.read().unwrap().is_dirty;
if is_dirty {
self.flush_page(evicted_page_id);
}
self.page_table.remove(&evicted_page_id);
frame_id
} else {
return None;
return Err(BustubxError::Storage("Failed to evict page".to_string()));
}
};

Expand All @@ -74,129 +77,92 @@ impl BufferPoolManager {
self.page_table.insert(new_page_id, frame_id);
let mut new_page = Page::new(new_page_id);
new_page.pin_count = 1;
self.pool[frame_id as usize] = new_page;
self.pool[frame_id as usize] = Arc::new(RwLock::new(new_page));

self.replacer.record_access(frame_id);
self.replacer.set_evictable(frame_id, false);

return Some(&mut self.pool[frame_id as usize]);
Ok(self.pool[frame_id as usize].clone())
}

pub fn fetch_page(&mut self, page_id: PageId) -> Option<&Page> {
pub fn fetch_page(&mut self, page_id: PageId) -> BustubxResult<Arc<RwLock<Page>>> {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = &mut self.pool[frame_id as usize];
page.pin_count += 1;
let page = self.pool[frame_id as usize].clone();
page.write().unwrap().pin_count += 1;
self.replacer.set_evictable(frame_id, false);
return Some(page);
return Ok(page);
} else {
// 分配一个frame
let frame_id = if !self.free_list.is_empty() {
self.free_list.pop_front().unwrap()
} else {
if let Some(frame_id) = self.replacer.evict() {
let evicted_page = &self.pool[frame_id as usize];
let evicted_page_id = evicted_page.page_id;
if evicted_page.is_dirty {
let evicted_page = self.pool[frame_id as usize].clone();
let evicted_page_id = evicted_page.read().unwrap().page_id;
if evicted_page.read().unwrap().is_dirty {
self.flush_page(evicted_page_id);
}
self.page_table.remove(&evicted_page_id);
frame_id
} else {
return None;
return Err(BustubxError::Storage("Failed to evict page".to_string()));
}
};
// 从磁盘读取页
self.page_table.insert(page_id, frame_id);
let mut new_page = Page::new(page_id);
new_page.pin_count = 1;
new_page.data = self.disk_manager.read_page(page_id).unwrap();
self.pool[frame_id as usize] = new_page;
self.pool[frame_id as usize] = Arc::new(RwLock::new(new_page));

self.replacer.record_access(frame_id);
self.replacer.set_evictable(frame_id, false);

return Some(&self.pool[frame_id as usize]);
}
}

// 从缓冲池中获取指定页
pub fn fetch_page_mut(&mut self, page_id: PageId) -> Option<&mut Page> {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = &mut self.pool[frame_id as usize];
page.pin_count += 1;
self.replacer.set_evictable(frame_id, false);
return Some(page);
} else {
// 分配一个frame
let frame_id = if !self.free_list.is_empty() {
self.free_list.pop_front().unwrap()
} else {
if let Some(frame_id) = self.replacer.evict() {
let evicted_page = &self.pool[frame_id as usize];
let evicted_page_id = evicted_page.page_id;
if evicted_page.is_dirty {
self.flush_page(evicted_page_id);
}
self.page_table.remove(&evicted_page_id);
frame_id
} else {
return None;
}
};
// 从磁盘读取页
self.page_table.insert(page_id, frame_id);
let mut new_page = Page::new(page_id);
new_page.pin_count = 1;
new_page.data = self.disk_manager.read_page(page_id).unwrap();
self.pool[frame_id as usize] = new_page;

self.replacer.record_access(frame_id);
self.replacer.set_evictable(frame_id, false);

return Some(&mut self.pool[frame_id as usize]);
Ok(self.pool[frame_id as usize].clone())
}
}

pub fn write_page(&mut self, page_id: PageId, data: [u8; BUSTUBX_PAGE_SIZE]) {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = &mut self.pool[frame_id as usize];
page.data = data;
page.is_dirty = true;
let page = self.pool[frame_id as usize].clone();
page.write().unwrap().data = data;
page.write().unwrap().is_dirty = true;
}
}

// 从缓冲池中取消固定页
pub fn unpin_page(&mut self, page_id: PageId, is_dirty: bool) -> bool {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = &mut self.pool[frame_id as usize];
if page.pin_count == 0 {
let page = self.pool[frame_id as usize].clone();
if page.read().unwrap().pin_count == 0 {
return false;
}
page.pin_count -= 1;
page.is_dirty |= is_dirty;
if page.pin_count == 0 {
page.write().unwrap().pin_count -= 1;
page.write().unwrap().is_dirty |= is_dirty;
if page.read().unwrap().pin_count == 0 {
self.replacer.set_evictable(frame_id, true);
}
return true;
true
} else {
return false;
false
}
}

// 将缓冲池中指定页写回磁盘
pub fn flush_page(&mut self, page_id: PageId) -> bool {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = &mut self.pool[frame_id as usize];
self.disk_manager.write_page(page_id, &page.data).unwrap();
page.is_dirty = false;
return true;
let page = self.pool[frame_id as usize].clone();
self.disk_manager
.write_page(page_id, &page.read().unwrap().data)
.unwrap();
page.write().unwrap().is_dirty = false;
true
} else {
return false;
false
}
}

Expand All @@ -214,14 +180,14 @@ impl BufferPoolManager {
return true;
}
let frame_id = self.page_table[&page_id];
let page = &mut self.pool[frame_id as usize];
if page.pin_count > 0 {
let page = self.pool[frame_id as usize].clone();
if page.read().unwrap().pin_count > 0 {
// 页被固定,无法删除
return false;
}

// 从缓冲池中删除
page.destroy();
page.write().unwrap().destroy();
self.page_table.remove(&page_id);
self.free_list.push_back(frame_id);
self.replacer.remove(frame_id);
Expand All @@ -245,22 +211,28 @@ mod tests {
let disk_manager = DiskManager::try_new(&db_path).unwrap();
let mut buffer_pool_manager = BufferPoolManager::new(3, Arc::new(disk_manager));
let page = buffer_pool_manager.new_page().unwrap().clone();
assert_eq!(page.page_id, 0);
assert_eq!(buffer_pool_manager.pool[0].page_id, page.page_id);
assert_eq!(buffer_pool_manager.page_table[&page.page_id], 0);
assert_eq!(page.read().unwrap().page_id, 0);
assert_eq!(
buffer_pool_manager.pool[0].read().unwrap().page_id,
page.read().unwrap().page_id
);
assert_eq!(
buffer_pool_manager.page_table[&page.read().unwrap().page_id],
0
);
assert_eq!(buffer_pool_manager.free_list.len(), 2);
assert_eq!(buffer_pool_manager.replacer.size(), 0);

let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.page_id, 1);
assert_eq!(page.read().unwrap().page_id, 1);
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.page_id, 2);
assert_eq!(page.read().unwrap().page_id, 2);
let page = buffer_pool_manager.new_page();
assert!(page.is_none());
assert!(page.is_err());

buffer_pool_manager.unpin_page(0, false);
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.page_id, 3);
assert_eq!(page.read().unwrap().page_id, 3);

let _ = remove_file(db_path);
}
Expand All @@ -277,11 +249,11 @@ mod tests {
let page = buffer_pool_manager.new_page().unwrap();
let page = buffer_pool_manager.new_page().unwrap();
let page = buffer_pool_manager.new_page();
assert!(page.is_none());
assert!(page.is_err());

buffer_pool_manager.unpin_page(0, true);
let page = buffer_pool_manager.new_page().unwrap();
assert_eq!(page.page_id, 3);
assert_eq!(page.read().unwrap().page_id, 3);

let _ = remove_file(db_path);
}
Expand All @@ -301,14 +273,12 @@ mod tests {
let page = buffer_pool_manager.new_page().unwrap();
buffer_pool_manager.unpin_page(2, false);

let page = buffer_pool_manager.fetch_page_mut(0);
assert!(page.is_some());
assert_eq!(page.unwrap().page_id, 0);
let page = buffer_pool_manager.fetch_page(0).unwrap();
assert_eq!(page.read().unwrap().page_id, 0);
buffer_pool_manager.unpin_page(0, false);

let page = buffer_pool_manager.fetch_page(1);
assert!(page.is_some());
assert_eq!(page.unwrap().page_id, 1);
let page = buffer_pool_manager.fetch_page(1).unwrap();
assert_eq!(page.read().unwrap().page_id, 1);
buffer_pool_manager.unpin_page(1, false);
assert_eq!(buffer_pool_manager.replacer.size(), 3);

Expand Down Expand Up @@ -337,9 +307,8 @@ mod tests {
assert_eq!(buffer_pool_manager.replacer.size(), 2);
assert_eq!(buffer_pool_manager.page_table.len(), 2);

let page = buffer_pool_manager.fetch_page_mut(1);
assert!(page.is_some());
assert_eq!(page.unwrap().page_id, 1);
let page = buffer_pool_manager.fetch_page(1).unwrap();
assert_eq!(page.read().unwrap().page_id, 1);

let _ = remove_file(db_path);
}
Expand Down
2 changes: 1 addition & 1 deletion bustubx/src/storage/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl DiskManager {

pub fn write_page(&self, page_id: PageId, data: &[u8]) -> BustubxResult<()> {
if data.len() != BUSTUBX_PAGE_SIZE {
return Err(BustubxError::Internal(format!(
return Err(BustubxError::Storage(format!(
"Page size is not {}",
BUSTUBX_PAGE_SIZE
)));
Expand Down
Loading

0 comments on commit 67b1e00

Please sign in to comment.