Skip to content

Commit

Permalink
refactor(network): rename BlockHeaderDBExecutor to DBExecutor (#1977)
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware authored May 15, 2024
1 parent fab814d commit 72895c9
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 22 deletions.
12 changes: 6 additions & 6 deletions crates/papyrus_network/src/db_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ impl DBExecutorError {
}
}

/// Db executor is a stream of queries. Each result is marks the end of a query fulfillment.
/// DBExecutorTrait is a stream of queries. Each result is marks the end of a query fulfillment.
/// A query can either succeed (and return Ok(QueryId)) or fail (and return Err(DBExecutorError)).
/// The stream is never exhausted, and it is the responsibility of the user to poll it.
pub trait DBExecutor: Stream<Item = Result<QueryId, DBExecutorError>> + Unpin {
pub trait DBExecutorTrait: Stream<Item = Result<QueryId, DBExecutorError>> + Unpin {
// TODO: add writer functionality
fn register_query(
&mut self,
Expand All @@ -196,19 +196,19 @@ pub trait DBExecutor: Stream<Item = Result<QueryId, DBExecutorError>> + Unpin {
}

// TODO: currently this executor returns only block headers and signatures.
pub struct BlockHeaderDBExecutor {
pub struct DBExecutor {
next_query_id: usize,
storage_reader: StorageReader,
query_execution_set: FuturesUnordered<JoinHandle<Result<QueryId, DBExecutorError>>>,
}

impl BlockHeaderDBExecutor {
impl DBExecutor {
pub fn new(storage_reader: StorageReader) -> Self {
Self { next_query_id: 0, storage_reader, query_execution_set: FuturesUnordered::new() }
}
}

impl DBExecutor for BlockHeaderDBExecutor {
impl DBExecutorTrait for DBExecutor {
fn register_query(
&mut self,
query: InternalQuery,
Expand Down Expand Up @@ -266,7 +266,7 @@ impl DBExecutor for BlockHeaderDBExecutor {
}
}

impl Stream for BlockHeaderDBExecutor {
impl Stream for DBExecutor {
type Item = Result<QueryId, DBExecutorError>;

fn poll_next(
Expand Down
20 changes: 13 additions & 7 deletions crates/papyrus_network/src/db_executor/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ use starknet_api::block::{BlockHash, BlockHeader, BlockNumber, BlockSignature};
use starknet_api::state::ThinStateDiff;

use super::Data::BlockHeaderAndSignature;
use crate::db_executor::{DBExecutor, DBExecutorError, Data, MockFetchBlockDataFromDb, QueryId};
use crate::db_executor::{
DBExecutorError,
DBExecutorTrait,
Data,
MockFetchBlockDataFromDb,
QueryId,
};
use crate::{BlockHashOrNumber, DataType, Direction, InternalQuery};
const BUFFER_SIZE: usize = 10;

#[tokio::test]
async fn header_db_executor_can_register_and_run_a_query() {
let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage();
let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader);
let mut db_executor = super::DBExecutor::new(storage_reader);

// put some data in the storage.
const NUM_OF_BLOCKS: u64 = 10;
Expand Down Expand Up @@ -107,7 +113,7 @@ async fn header_db_executor_start_block_given_by_hash() {
.unwrap()
.block_hash;

let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader);
let mut db_executor = super::DBExecutor::new(storage_reader);

// register a query.
let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE);
Expand Down Expand Up @@ -137,7 +143,7 @@ async fn header_db_executor_start_block_given_by_hash() {
#[tokio::test]
async fn header_db_executor_query_of_missing_block() {
let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage();
let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader);
let mut db_executor = super::DBExecutor::new(storage_reader);

const NUM_OF_BLOCKS: u64 = 15;
insert_to_storage_test_blocks_up_to(NUM_OF_BLOCKS, &mut storage_writer);
Expand Down Expand Up @@ -180,7 +186,7 @@ async fn header_db_executor_query_of_missing_block() {
#[test]
fn header_db_executor_stream_pending_with_no_query() {
let ((storage_reader, _), _temp_dir) = get_test_storage();
let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader);
let mut db_executor = super::DBExecutor::new(storage_reader);

// poll without registering a query.
assert!(poll_fn(|cx| db_executor.poll_next_unpin(cx)).now_or_never().is_none());
Expand All @@ -189,7 +195,7 @@ fn header_db_executor_stream_pending_with_no_query() {
#[tokio::test]
async fn header_db_executor_can_receive_queries_after_stream_is_exhausted() {
let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage();
let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader);
let mut db_executor = super::DBExecutor::new(storage_reader);

const NUM_OF_BLOCKS: u64 = 10;
insert_to_storage_test_blocks_up_to(NUM_OF_BLOCKS, &mut storage_writer);
Expand Down Expand Up @@ -228,7 +234,7 @@ async fn header_db_executor_can_receive_queries_after_stream_is_exhausted() {
#[tokio::test]
async fn header_db_executor_drop_receiver_before_query_is_done() {
let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage();
let mut db_executor = super::BlockHeaderDBExecutor::new(storage_reader);
let mut db_executor = super::DBExecutor::new(storage_reader);

const NUM_OF_BLOCKS: u64 = 10;
insert_to_storage_test_blocks_up_to(NUM_OF_BLOCKS, &mut storage_writer);
Expand Down
11 changes: 5 additions & 6 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace};
use self::swarm_trait::SwarmTrait;
use crate::bin_utils::build_swarm;
use crate::converters::{Router, RouterError};
use crate::db_executor::{self, BlockHeaderDBExecutor, DBExecutor, Data, QueryId};
use crate::db_executor::{self, DBExecutor, DBExecutorTrait, Data, QueryId};
use crate::mixed_behaviour::{self, BridgedBehaviour};
use crate::streamed_bytes::{self, InboundSessionId, OutboundSessionId, SessionId};
use crate::{DataType, NetworkConfig, Protocol, Query, ResponseReceivers};
Expand All @@ -33,7 +33,7 @@ pub enum NetworkError {
DialError(#[from] libp2p::swarm::DialError),
}

pub struct GenericNetworkManager<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> {
pub struct GenericNetworkManager<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> {
swarm: SwarmT,
db_executor: DBExecutorT,
header_buffer_size: usize,
Expand All @@ -46,7 +46,7 @@ pub struct GenericNetworkManager<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> {
num_active_outbound_sessions: usize,
}

impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecutorT, SwarmT> {
impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBExecutorT, SwarmT> {
pub async fn run(mut self) -> Result<(), NetworkError> {
loop {
tokio::select! {
Expand Down Expand Up @@ -364,8 +364,7 @@ impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecut
}
}

pub type NetworkManager =
GenericNetworkManager<BlockHeaderDBExecutor, Swarm<mixed_behaviour::MixedBehaviour>>;
pub type NetworkManager = GenericNetworkManager<DBExecutor, Swarm<mixed_behaviour::MixedBehaviour>>;

impl NetworkManager {
pub fn new(config: NetworkConfig, storage_reader: StorageReader) -> Self {
Expand Down Expand Up @@ -397,7 +396,7 @@ impl NetworkManager {
)
});

let db_executor = BlockHeaderDBExecutor::new(storage_reader);
let db_executor = DBExecutor::new(storage_reader);
Self::generic_new(swarm, db_executor, header_buffer_size)
}

Expand Down
6 changes: 3 additions & 3 deletions crates/papyrus_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use super::swarm_trait::{Event, SwarmTrait};
use super::GenericNetworkManager;
use crate::db_executor::{
poll_query_execution_set,
DBExecutor,
DBExecutorError,
DBExecutorTrait,
Data,
FetchBlockDataFromDb,
QueryId,
Expand Down Expand Up @@ -193,8 +193,8 @@ impl Stream for MockDBExecutor {
}
}

impl DBExecutor for MockDBExecutor {
// TODO(shahak): Consider fixing code duplication with BlockHeaderDBExecutor.
impl DBExecutorTrait for MockDBExecutor {
// TODO(shahak): Consider fixing code duplication with DBExecutor.
fn register_query(
&mut self,
query: InternalQuery,
Expand Down

0 comments on commit 72895c9

Please sign in to comment.