From 5b03fa0a45a2aa13a9e6ff02471afc455a896557 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 15 Oct 2024 21:02:05 +0530 Subject: [PATCH] chore: refactor cached_view into own module --- crates/services/p2p/src/cached_view.rs | 278 +++++++++++++++++++++++++ crates/services/p2p/src/lib.rs | 2 + crates/services/p2p/src/service.rs | 87 +------- 3 files changed, 283 insertions(+), 84 deletions(-) create mode 100644 crates/services/p2p/src/cached_view.rs diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs new file mode 100644 index 0000000000..fb6831c43b --- /dev/null +++ b/crates/services/p2p/src/cached_view.rs @@ -0,0 +1,278 @@ +use crate::ports::P2pDb; +use dashmap::DashMap; +use fuel_core_metrics::p2p_metrics::{ + increment_p2p_req_res_cache_hits, + increment_p2p_req_res_cache_misses, +}; +use fuel_core_storage::Result as StorageResult; +use fuel_core_types::{ + blockchain::SealedBlockHeader, + services::p2p::Transactions, +}; +use std::ops::Range; + +pub struct CachedView { + sealed_block_headers: DashMap, Vec>, + transactions_on_blocks: DashMap, Vec>, + metrics: bool, +} + +impl CachedView { + pub fn new(metrics: bool) -> Self { + Self { + sealed_block_headers: DashMap::new(), + transactions_on_blocks: DashMap::new(), + metrics, + } + } + + pub fn clear(&self) { + self.sealed_block_headers.clear(); + self.transactions_on_blocks.clear(); + } + + fn update_metrics(&self, update_fn: U) + where + U: FnOnce(), + { + if self.metrics { + update_fn() + } + } + + pub(crate) fn get_sealed_headers( + &self, + view: &V, + block_height_range: Range, + ) -> StorageResult>> + where + V: P2pDb, + { + if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { + self.update_metrics(increment_p2p_req_res_cache_hits); + Ok(Some(headers.clone())) + } else { + self.update_metrics(increment_p2p_req_res_cache_misses); + let headers = view.get_sealed_headers(block_height_range.clone())?; + if let Some(headers) = &headers { + self.sealed_block_headers + .insert(block_height_range, headers.clone()); + } + Ok(headers) + } + } + + pub(crate) fn get_transactions( + &self, + view: &V, + block_height_range: Range, + ) -> StorageResult>> + where + V: P2pDb, + { + if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { + self.update_metrics(increment_p2p_req_res_cache_hits); + Ok(Some(transactions.clone())) + } else { + self.update_metrics(increment_p2p_req_res_cache_misses); + let transactions = view.get_transactions(block_height_range.clone())?; + if let Some(transactions) = &transactions { + self.transactions_on_blocks + .insert(block_height_range, transactions.clone()); + } + Ok(transactions) + } + } +} + +#[allow(non_snake_case)] +#[cfg(test)] +mod tests { + use super::*; + use fuel_core_types::blockchain::consensus::Genesis; + use std::sync::Arc; + use tokio::sync::Notify; + + struct FakeDb { + sender: Arc, + values: bool, + } + + #[inline] + fn default_sealed_headers(range: Range) -> Vec { + vec![SealedBlockHeader::default(); range.len()] + } + + #[inline] + fn default_transactions(range: Range) -> Vec { + vec![Transactions::default(); range.len()] + } + + impl P2pDb for FakeDb { + fn get_sealed_headers( + &self, + range: Range, + ) -> StorageResult>> { + self.sender.notify_waiters(); + if !self.values { + return Ok(None); + } + let headers = default_sealed_headers(range); + Ok(Some(headers)) + } + + fn get_transactions( + &self, + range: Range, + ) -> StorageResult>> { + self.sender.notify_waiters(); + if !self.values { + return Ok(None); + } + let transactions = default_transactions(range); + Ok(Some(transactions)) + } + + fn get_genesis(&self) -> StorageResult { + self.sender.notify_waiters(); + Ok(Genesis::default()) + } + } + + #[tokio::test] + async fn cached_view__get_sealed_headers__cache_hit() { + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + let block_height_range = 0..10; + let sealed_headers = vec![SealedBlockHeader::default()]; + cached_view + .sealed_block_headers + .insert(block_height_range.clone(), sealed_headers.clone()); + + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + assert_eq!(result, Some(sealed_headers)); + } + + #[tokio::test] + async fn cached_view__get_sealed_headers__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..10; + let sealed_headers = default_sealed_headers(block_height_range.clone()); + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert_eq!(result, Some(sealed_headers)); + } + + #[tokio::test] + async fn cached_view__when_response_is_none__get_sealed_headers__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: false, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..10; + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_none()); + } + + #[tokio::test] + async fn cached_view__get_transactions__cache_hit() { + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + let block_height_range = 0..10; + let transactions = default_transactions(block_height_range.clone()); + cached_view + .transactions_on_blocks + .insert(block_height_range.clone(), transactions.clone()); + + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) { + assert_eq!(expected.0, actual.0); + } + } + + #[tokio::test] + async fn cached_view__get_transactions__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..10; + let transactions = default_transactions(block_height_range.clone()); + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) { + assert_eq!(expected.0, actual.0); + } + } + + #[tokio::test] + async fn cached_view__when_response_is_none__get_transactions__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: false, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..10; + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_none()); + } +} diff --git a/crates/services/p2p/src/lib.rs b/crates/services/p2p/src/lib.rs index fbd82c2545..375eeb7351 100644 --- a/crates/services/p2p/src/lib.rs +++ b/crates/services/p2p/src/lib.rs @@ -16,6 +16,8 @@ pub mod request_response; pub mod service; mod utils; +mod cached_view; + pub use gossipsub::config as gossipsub_config; pub use heartbeat::Config; diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 6fdd1b087a..89f097f9ec 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,4 +1,5 @@ use crate::{ + cached_view::CachedView, codecs::postcard::PostcardCodec, config::{ Config, @@ -27,12 +28,7 @@ use crate::{ }, }; use anyhow::anyhow; -use dashmap::DashMap; -use fuel_core_metrics::p2p_metrics::{ - increment_p2p_req_res_cache_hits, - increment_p2p_req_res_cache_misses, - set_blocks_requested, -}; +use fuel_core_metrics::p2p_metrics::set_blocks_requested; use fuel_core_services::{ stream::BoxStream, AsyncProcessor, @@ -43,10 +39,7 @@ use fuel_core_services::{ SyncProcessor, TraceErr, }; -use fuel_core_storage::{ - transactional::AtomicView, - Result as StorageResult, -}; +use fuel_core_storage::transactional::AtomicView; use fuel_core_types::{ blockchain::SealedBlockHeader, fuel_tx::{ @@ -498,80 +491,6 @@ impl Task { } } -struct CachedView { - sealed_block_headers: DashMap, Vec>, - transactions_on_blocks: DashMap, Vec>, - metrics: bool, -} - -impl CachedView { - fn new(metrics: bool) -> Self { - Self { - sealed_block_headers: DashMap::new(), - transactions_on_blocks: DashMap::new(), - metrics, - } - } - - fn clear(&self) { - self.sealed_block_headers.clear(); - self.transactions_on_blocks.clear(); - } - - fn update_metrics(&self, update_fn: U) - where - U: FnOnce(), - { - if self.metrics { - update_fn() - } - } - - fn get_sealed_headers( - &self, - view: &V, - block_height_range: Range, - ) -> StorageResult>> - where - V: P2pDb, - { - if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { - self.update_metrics(increment_p2p_req_res_cache_hits); - Ok(Some(headers.clone())) - } else { - self.update_metrics(increment_p2p_req_res_cache_misses); - let headers = view.get_sealed_headers(block_height_range.clone())?; - if let Some(headers) = &headers { - self.sealed_block_headers - .insert(block_height_range, headers.clone()); - } - Ok(headers) - } - } - - fn get_transactions( - &self, - view: &V, - block_height_range: Range, - ) -> StorageResult>> - where - V: P2pDb, - { - if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { - self.update_metrics(increment_p2p_req_res_cache_hits); - Ok(Some(transactions.clone())) - } else { - self.update_metrics(increment_p2p_req_res_cache_misses); - let transactions = view.get_transactions(block_height_range.clone())?; - if let Some(transactions) = &transactions { - self.transactions_on_blocks - .insert(block_height_range, transactions.clone()); - } - Ok(transactions) - } - } -} - impl Task where P: TaskP2PService + 'static,