From 72895c9547d6850eac36613f7064abbb4a8589e5 Mon Sep 17 00:00:00 2001 From: asmaa-starkware <163830216+asmaastarkware@users.noreply.github.com> Date: Wed, 15 May 2024 13:32:08 +0300 Subject: [PATCH] refactor(network): rename BlockHeaderDBExecutor to DBExecutor (#1977) --- crates/papyrus_network/src/db_executor/mod.rs | 12 +++++------ .../papyrus_network/src/db_executor/test.rs | 20 ++++++++++++------- .../src/network_manager/mod.rs | 11 +++++----- .../src/network_manager/test.rs | 6 +++--- 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/crates/papyrus_network/src/db_executor/mod.rs b/crates/papyrus_network/src/db_executor/mod.rs index 511664cf95..b5ce3a7af6 100644 --- a/crates/papyrus_network/src/db_executor/mod.rs +++ b/crates/papyrus_network/src/db_executor/mod.rs @@ -182,10 +182,10 @@ impl DBExecutorError { } } -/// Db executor is a stream of queries. Each result is marks the end of a query fulfillment. +/// DBExecutorTrait is a stream of queries. Each result is marks the end of a query fulfillment. /// A query can either succeed (and return Ok(QueryId)) or fail (and return Err(DBExecutorError)). /// The stream is never exhausted, and it is the responsibility of the user to poll it. -pub trait DBExecutor: Stream> + Unpin { +pub trait DBExecutorTrait: Stream> + Unpin { // TODO: add writer functionality fn register_query( &mut self, @@ -196,19 +196,19 @@ pub trait DBExecutor: Stream> + Unpin { } // TODO: currently this executor returns only block headers and signatures. -pub struct BlockHeaderDBExecutor { +pub struct DBExecutor { next_query_id: usize, storage_reader: StorageReader, query_execution_set: FuturesUnordered>>, } -impl BlockHeaderDBExecutor { +impl DBExecutor { pub fn new(storage_reader: StorageReader) -> Self { Self { next_query_id: 0, storage_reader, query_execution_set: FuturesUnordered::new() } } } -impl DBExecutor for BlockHeaderDBExecutor { +impl DBExecutorTrait for DBExecutor { fn register_query( &mut self, query: InternalQuery, @@ -266,7 +266,7 @@ impl DBExecutor for BlockHeaderDBExecutor { } } -impl Stream for BlockHeaderDBExecutor { +impl Stream for DBExecutor { type Item = Result; fn poll_next( diff --git a/crates/papyrus_network/src/db_executor/test.rs b/crates/papyrus_network/src/db_executor/test.rs index e0f584fb21..0a25fb1ec5 100644 --- a/crates/papyrus_network/src/db_executor/test.rs +++ b/crates/papyrus_network/src/db_executor/test.rs @@ -14,14 +14,20 @@ use starknet_api::block::{BlockHash, BlockHeader, BlockNumber, BlockSignature}; use starknet_api::state::ThinStateDiff; use super::Data::BlockHeaderAndSignature; -use crate::db_executor::{DBExecutor, DBExecutorError, Data, MockFetchBlockDataFromDb, QueryId}; +use crate::db_executor::{ + DBExecutorError, + DBExecutorTrait, + Data, + MockFetchBlockDataFromDb, + QueryId, +}; use crate::{BlockHashOrNumber, DataType, Direction, InternalQuery}; const BUFFER_SIZE: usize = 10; #[tokio::test] async fn header_db_executor_can_register_and_run_a_query() { let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage(); - let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader); + let mut db_executor = super::DBExecutor::new(storage_reader); // put some data in the storage. const NUM_OF_BLOCKS: u64 = 10; @@ -107,7 +113,7 @@ async fn header_db_executor_start_block_given_by_hash() { .unwrap() .block_hash; - let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader); + let mut db_executor = super::DBExecutor::new(storage_reader); // register a query. let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE); @@ -137,7 +143,7 @@ async fn header_db_executor_start_block_given_by_hash() { #[tokio::test] async fn header_db_executor_query_of_missing_block() { let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage(); - let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader); + let mut db_executor = super::DBExecutor::new(storage_reader); const NUM_OF_BLOCKS: u64 = 15; insert_to_storage_test_blocks_up_to(NUM_OF_BLOCKS, &mut storage_writer); @@ -180,7 +186,7 @@ async fn header_db_executor_query_of_missing_block() { #[test] fn header_db_executor_stream_pending_with_no_query() { let ((storage_reader, _), _temp_dir) = get_test_storage(); - let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader); + let mut db_executor = super::DBExecutor::new(storage_reader); // poll without registering a query. assert!(poll_fn(|cx| db_executor.poll_next_unpin(cx)).now_or_never().is_none()); @@ -189,7 +195,7 @@ fn header_db_executor_stream_pending_with_no_query() { #[tokio::test] async fn header_db_executor_can_receive_queries_after_stream_is_exhausted() { let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage(); - let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader); + let mut db_executor = super::DBExecutor::new(storage_reader); const NUM_OF_BLOCKS: u64 = 10; insert_to_storage_test_blocks_up_to(NUM_OF_BLOCKS, &mut storage_writer); @@ -228,7 +234,7 @@ async fn header_db_executor_can_receive_queries_after_stream_is_exhausted() { #[tokio::test] async fn header_db_executor_drop_receiver_before_query_is_done() { let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage(); - let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader); + let mut db_executor = super::DBExecutor::new(storage_reader); const NUM_OF_BLOCKS: u64 = 10; insert_to_storage_test_blocks_up_to(NUM_OF_BLOCKS, &mut storage_writer); diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index 8469b040ce..05def4f5f6 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace}; use self::swarm_trait::SwarmTrait; use crate::bin_utils::build_swarm; use crate::converters::{Router, RouterError}; -use crate::db_executor::{self, BlockHeaderDBExecutor, DBExecutor, Data, QueryId}; +use crate::db_executor::{self, DBExecutor, DBExecutorTrait, Data, QueryId}; use crate::mixed_behaviour::{self, BridgedBehaviour}; use crate::streamed_bytes::{self, InboundSessionId, OutboundSessionId, SessionId}; use crate::{DataType, NetworkConfig, Protocol, Query, ResponseReceivers}; @@ -33,7 +33,7 @@ pub enum NetworkError { DialError(#[from] libp2p::swarm::DialError), } -pub struct GenericNetworkManager { +pub struct GenericNetworkManager { swarm: SwarmT, db_executor: DBExecutorT, header_buffer_size: usize, @@ -46,7 +46,7 @@ pub struct GenericNetworkManager { num_active_outbound_sessions: usize, } -impl GenericNetworkManager { +impl GenericNetworkManager { pub async fn run(mut self) -> Result<(), NetworkError> { loop { tokio::select! { @@ -364,8 +364,7 @@ impl GenericNetworkManager>; +pub type NetworkManager = GenericNetworkManager>; impl NetworkManager { pub fn new(config: NetworkConfig, storage_reader: StorageReader) -> Self { @@ -397,7 +396,7 @@ impl NetworkManager { ) }); - let db_executor = BlockHeaderDBExecutor::new(storage_reader); + let db_executor = DBExecutor::new(storage_reader); Self::generic_new(swarm, db_executor, header_buffer_size) } diff --git a/crates/papyrus_network/src/network_manager/test.rs b/crates/papyrus_network/src/network_manager/test.rs index a047f4c555..a4b0a7b536 100644 --- a/crates/papyrus_network/src/network_manager/test.rs +++ b/crates/papyrus_network/src/network_manager/test.rs @@ -25,8 +25,8 @@ use super::swarm_trait::{Event, SwarmTrait}; use super::GenericNetworkManager; use crate::db_executor::{ poll_query_execution_set, - DBExecutor, DBExecutorError, + DBExecutorTrait, Data, FetchBlockDataFromDb, QueryId, @@ -193,8 +193,8 @@ impl Stream for MockDBExecutor { } } -impl DBExecutor for MockDBExecutor { - // TODO(shahak): Consider fixing code duplication with BlockHeaderDBExecutor. +impl DBExecutorTrait for MockDBExecutor { + // TODO(shahak): Consider fixing code duplication with DBExecutor. fn register_query( &mut self, query: InternalQuery,