-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test(ethexe): Introduce two-step Service
run
#4311
base: master
Are you sure you want to change the base?
Changes from all commits
230bd94
7cdd9af
8a2d291
53c4517
6e450c4
3174f94
a465d72
49b8e2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -32,7 +32,7 @@ use ethexe_common::{ | |||||
use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; | ||||||
use ethexe_ethereum::{primitives::U256, router::RouterQuery}; | ||||||
use ethexe_network::{db_sync, NetworkReceiverEvent}; | ||||||
use ethexe_observer::{RequestBlockData, RequestEvent}; | ||||||
use ethexe_observer::{EventsStreamProducer, RequestBlockData, RequestEvent}; | ||||||
use ethexe_processor::{LocalOutcome, ProcessorConfig}; | ||||||
use ethexe_sequencer::agro::AggregatedCommitments; | ||||||
use ethexe_signer::{Digest, PublicKey, Signature, Signer}; | ||||||
|
@@ -51,7 +51,9 @@ use utils::*; | |||||
/// ethexe service. | ||||||
pub struct Service { | ||||||
db: Database, | ||||||
observer: ethexe_observer::Observer, | ||||||
// Option used to enable "taking" observer from it when calling | ||||||
// Service methods by `self`. Contract: always `Some`. | ||||||
observer: Option<ethexe_observer::Observer>, | ||||||
query: ethexe_observer::Query, | ||||||
router_query: RouterQuery, | ||||||
processor: ethexe_processor::Processor, | ||||||
|
@@ -201,7 +203,7 @@ impl Service { | |||||
Ok(Self { | ||||||
db, | ||||||
network, | ||||||
observer, | ||||||
observer: Some(observer), | ||||||
query, | ||||||
router_query, | ||||||
processor, | ||||||
|
@@ -240,7 +242,7 @@ impl Service { | |||||
) -> Self { | ||||||
Self { | ||||||
db, | ||||||
observer, | ||||||
observer: Some(observer), | ||||||
query, | ||||||
router_query, | ||||||
processor, | ||||||
|
@@ -418,30 +420,46 @@ impl Service { | |||||
} | ||||||
} | ||||||
|
||||||
async fn run_inner(self) -> Result<()> { | ||||||
#[cfg(test)] | ||||||
pub async fn pending_run(self) -> ServicePendingRun { | ||||||
ServicePendingRun::new(self).await | ||||||
} | ||||||
|
||||||
pub async fn run(mut self) -> Result<()> { | ||||||
let Some(observer) = self.observer.take() else { | ||||||
unreachable!("Contract invalidation; qed.") | ||||||
}; | ||||||
|
||||||
if let Some(metrics_service) = self.metrics_service.take() { | ||||||
tokio::spawn(metrics_service.run( | ||||||
observer.get_status_receiver(), | ||||||
self.sequencer.as_mut().map(|s| s.get_status_receiver()), | ||||||
)); | ||||||
} | ||||||
let events_stream_producer = observer.events_stream_producer(); | ||||||
|
||||||
self.run_inner(events_stream_producer).await.map_err(|err| { | ||||||
log::error!("Service finished work with error: {:?}", err); | ||||||
err | ||||||
}) | ||||||
} | ||||||
|
||||||
async fn run_inner(self, events_stream_producer: EventsStreamProducer) -> Result<()> { | ||||||
let Service { | ||||||
db, | ||||||
network, | ||||||
mut observer, | ||||||
mut sequencer, | ||||||
mut query, | ||||||
mut router_query, | ||||||
mut processor, | ||||||
mut sequencer, | ||||||
signer: _signer, | ||||||
mut validator, | ||||||
metrics_service, | ||||||
rpc, | ||||||
block_time, | ||||||
signer: _signer, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
.. | ||||||
} = self; | ||||||
|
||||||
if let Some(metrics_service) = metrics_service { | ||||||
tokio::spawn(metrics_service.run( | ||||||
observer.get_status_receiver(), | ||||||
sequencer.as_mut().map(|s| s.get_status_receiver()), | ||||||
)); | ||||||
} | ||||||
|
||||||
let observer_events = observer.request_events(); | ||||||
let observer_events = events_stream_producer.request_events(); | ||||||
futures::pin_mut!(observer_events); | ||||||
|
||||||
let (mut network_sender, mut network_receiver, mut network_handle) = | ||||||
|
@@ -577,13 +595,6 @@ impl Service { | |||||
Ok(()) | ||||||
} | ||||||
|
||||||
pub async fn run(self) -> Result<()> { | ||||||
self.run_inner().await.map_err(|err| { | ||||||
log::error!("Service finished work with error: {:?}", err); | ||||||
err | ||||||
}) | ||||||
} | ||||||
|
||||||
async fn post_process_commitments( | ||||||
code_commitments: Vec<CodeCommitment>, | ||||||
block_commitments: Vec<BlockCommitment>, | ||||||
|
@@ -844,6 +855,62 @@ impl Service { | |||||
} | ||||||
} | ||||||
|
||||||
/// The type was introduced as a solution to the issue#4099. | ||||||
/// | ||||||
/// Basically, it splits the events stream creation into two steps: | ||||||
/// 1) blocks subscription and 2) actually obtaining events in the loop. | ||||||
/// | ||||||
/// Usually blocks subscription is done within `async_stream::stream!` in which | ||||||
/// the events obtaining loop is actually defined. This is a default design and | ||||||
/// it's too slow as subscription to blocks is scheduled when the async stream is | ||||||
/// first time polled. | ||||||
#[cfg(test)] | ||||||
pub struct ServicePendingRun { | ||||||
service: Service, | ||||||
events_stream_producer: EventsStreamProducer, | ||||||
} | ||||||
|
||||||
#[cfg(test)] | ||||||
impl ServicePendingRun { | ||||||
async fn new(mut service: Service) -> Self { | ||||||
let Some(observer) = service.observer.take() else { | ||||||
unreachable!("Contract invalidation; qed.") | ||||||
}; | ||||||
|
||||||
if let Some(metrics_service) = service.metrics_service.take() { | ||||||
tokio::spawn(metrics_service.run( | ||||||
observer.get_status_receiver(), | ||||||
service.sequencer.as_mut().map(|s| s.get_status_receiver()), | ||||||
)); | ||||||
} | ||||||
|
||||||
let events_stream_producer = observer | ||||||
.events_stream_producer() | ||||||
.with_blocks_subscribed_first() | ||||||
.await; | ||||||
|
||||||
Self { | ||||||
service, | ||||||
events_stream_producer, | ||||||
} | ||||||
} | ||||||
|
||||||
pub async fn complete_run(self) -> Result<()> { | ||||||
let Self { | ||||||
service, | ||||||
events_stream_producer, | ||||||
} = self; | ||||||
|
||||||
service | ||||||
.run_inner(events_stream_producer) | ||||||
.await | ||||||
.map_err(|err| { | ||||||
log::error!("Service finished work with error: {:?}", err); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
err | ||||||
}) | ||||||
} | ||||||
} | ||||||
|
||||||
mod utils { | ||||||
use super::*; | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -297,7 +297,9 @@ impl<T: Transport + Clone, N: Network> TryGetReceipt<T, N> for PendingTransactio | |
Err(err) => err, | ||
}; | ||
|
||
for _ in 0..3 { | ||
log::trace!("Failed to get transaction receipt for {tx_hash}. Retrying..."); | ||
for n in 0..25 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's ok for tests, but bad for production |
||
log::trace!("Attempt {n}. Error - {err}"); | ||
match err { | ||
PendingTransactionError::TransportError(RpcError::NullResp) => {} | ||
_ => break, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.