Skip to content

Commit

Permalink
chore: refactor cached_view into own module
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc committed Oct 15, 2024
1 parent b6bbf61 commit 009dc14
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 84 deletions.
2 changes: 2 additions & 0 deletions crates/services/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub mod request_response;
pub mod service;
mod utils;

mod cached_view;

pub use gossipsub::config as gossipsub_config;
pub use heartbeat::Config;

Expand Down
87 changes: 3 additions & 84 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
cached_view::CachedView,
codecs::postcard::PostcardCodec,
config::{
Config,
Expand Down Expand Up @@ -27,12 +28,7 @@ use crate::{
},
};
use anyhow::anyhow;
use dashmap::DashMap;
use fuel_core_metrics::p2p_metrics::{
increment_p2p_req_res_cache_hits,
increment_p2p_req_res_cache_misses,
set_blocks_requested,
};
use fuel_core_metrics::p2p_metrics::set_blocks_requested;
use fuel_core_services::{
stream::BoxStream,
AsyncProcessor,
Expand All @@ -43,10 +39,7 @@ use fuel_core_services::{
SyncProcessor,
TraceErr,
};
use fuel_core_storage::{
transactional::AtomicView,
Result as StorageResult,
};
use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::{
blockchain::SealedBlockHeader,
fuel_tx::{
Expand Down Expand Up @@ -498,80 +491,6 @@ impl<P: TaskP2PService, V: AtomicView, B: Broadcast, T> Task<P, V, B, T> {
}
}

struct CachedView {
sealed_block_headers: DashMap<Range<u32>, Vec<SealedBlockHeader>>,
transactions_on_blocks: DashMap<Range<u32>, Vec<Transactions>>,
metrics: bool,
}

impl CachedView {
fn new(metrics: bool) -> Self {
Self {
sealed_block_headers: DashMap::new(),
transactions_on_blocks: DashMap::new(),
metrics,
}
}

fn clear(&self) {
self.sealed_block_headers.clear();
self.transactions_on_blocks.clear();
}

fn update_metrics<U>(&self, update_fn: U)
where
U: FnOnce(),
{
if self.metrics {
update_fn()
}
}

fn get_sealed_headers<V>(
&self,
view: &V,
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<SealedBlockHeader>>>
where
V: P2pDb,
{
if let Some(headers) = self.sealed_block_headers.get(&block_height_range) {
self.update_metrics(increment_p2p_req_res_cache_hits);
Ok(Some(headers.clone()))
} else {
self.update_metrics(increment_p2p_req_res_cache_misses);
let headers = view.get_sealed_headers(block_height_range.clone())?;
if let Some(headers) = &headers {
self.sealed_block_headers
.insert(block_height_range, headers.clone());
}
Ok(headers)
}
}

fn get_transactions<V>(
&self,
view: &V,
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>>
where
V: P2pDb,
{
if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) {
self.update_metrics(increment_p2p_req_res_cache_hits);
Ok(Some(transactions.clone()))
} else {
self.update_metrics(increment_p2p_req_res_cache_misses);
let transactions = view.get_transactions(block_height_range.clone())?;
if let Some(transactions) = &transactions {
self.transactions_on_blocks
.insert(block_height_range, transactions.clone());
}
Ok(transactions)
}
}
}

impl<P, V, B, T> Task<P, V, B, T>
where
P: TaskP2PService + 'static,
Expand Down

0 comments on commit 009dc14

Please sign in to comment.