From 230bd9479852310f23839a8b41e52e58d1112d86 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Thu, 24 Oct 2024 18:38:14 +0300 Subject: [PATCH 1/6] Introduce two-step service run Add docs --- ethexe/cli/src/service.rs | 115 +++++++++++++++++++++++++------- ethexe/cli/src/tests.rs | 19 ++---- ethexe/observer/src/lib.rs | 2 +- ethexe/observer/src/observer.rs | 107 ++++++++++++++++++++++------- 4 files changed, 180 insertions(+), 63 deletions(-) diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index a418816637b..22645d63231 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -32,7 +32,7 @@ use ethexe_common::{ use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_ethereum::router::RouterQuery; use ethexe_network::NetworkReceiverEvent; -use ethexe_observer::{RequestBlockData, RequestEvent}; +use ethexe_observer::{EventsStreamProducer, RequestBlockData, RequestEvent}; use ethexe_processor::LocalOutcome; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; @@ -51,7 +51,9 @@ use utils::*; /// ethexe service. pub struct Service { db: Database, - observer: ethexe_observer::Observer, + // Option used to enable "taking" observer from it when calling + // Service methods by `self`. Contract: always `Some`. + observer: Option, query: ethexe_observer::Query, processor: ethexe_processor::Processor, signer: ethexe_signer::Signer, @@ -184,7 +186,7 @@ impl Service { Ok(Self { db, network, - observer, + observer: Some(observer), query, processor, sequencer, @@ -221,7 +223,7 @@ impl Service { ) -> Self { Self { db, - observer, + observer: Some(observer), query, processor, signer, @@ -398,29 +400,45 @@ impl Service { } } - async fn run_inner(self) -> Result<()> { + #[cfg(test)] + pub async fn pending_run(self) -> ServicePendingRun { + ServicePendingRun::new(self).await + } + + pub async fn run(mut self) -> Result<()> { + let Some(observer) = self.observer.take() else { + unreachable!("Contract invalidation; qed.") + }; + + if let Some(metrics_service) = self.metrics_service.take() { + tokio::spawn(metrics_service.run( + observer.get_status_receiver(), + self.sequencer.as_mut().map(|s| s.get_status_receiver()), + )); + } + let events_stream_producer = observer.events_stream_producer(); + + self.run_inner(events_stream_producer).await.map_err(|err| { + log::error!("Service finished work with error: {:?}", err); + err + }) + } + + async fn run_inner(self, events_stream_producer: EventsStreamProducer) -> Result<()> { let Service { db, network, - mut observer, + mut sequencer, mut query, mut processor, - mut sequencer, - signer: _signer, mut validator, - metrics_service, rpc, block_time, + signer: _signer, + .. } = self; - if let Some(metrics_service) = metrics_service { - tokio::spawn(metrics_service.run( - observer.get_status_receiver(), - sequencer.as_mut().map(|s| s.get_status_receiver()), - )); - } - - let observer_events = observer.request_events(); + let observer_events = events_stream_producer.request_events(); futures::pin_mut!(observer_events); let (mut network_sender, mut network_receiver, mut network_handle) = @@ -540,13 +558,6 @@ impl Service { Ok(()) } - pub async fn run(self) -> Result<()> { - self.run_inner().await.map_err(|err| { - log::error!("Service finished work with error: {:?}", err); - err - }) - } - async fn post_process_commitments( code_commitments: Vec, block_commitments: Vec, @@ -783,6 +794,62 @@ impl Service { } } +/// The type was introduced as a solution to the issue#4099. +/// +/// Basically, it splits the events stream creation into two steps: +/// 1) blocks subscription and 2) actually obtaining events in the loop. +/// +/// Usually blocks subscription is done within `async_stream::stream!` in which +/// the events obtaining loop is actually defined. This is a default design and +/// it's too slow as subscription to blocks is scheduled when the async stream is +/// first time polled. +#[cfg(test)] +pub struct ServicePendingRun { + service: Service, + events_stream_producer: EventsStreamProducer, +} + +#[cfg(test)] +impl ServicePendingRun { + async fn new(mut service: Service) -> Self { + let Some(observer) = service.observer.take() else { + unreachable!("Contract invalidation; qed.") + }; + + if let Some(metrics_service) = service.metrics_service.take() { + tokio::spawn(metrics_service.run( + observer.get_status_receiver(), + service.sequencer.as_mut().map(|s| s.get_status_receiver()), + )); + } + + let events_stream_producer = observer + .events_stream_producer() + .with_blocks_subscribed_first() + .await; + + Self { + service, + events_stream_producer, + } + } + + pub async fn complete_run(self) -> Result<()> { + let Self { + service, + events_stream_producer, + } = self; + + service + .run_inner(events_stream_producer) + .await + .map_err(|err| { + log::error!("Service finished work with error: {:?}", err); + err + }) + } +} + mod utils { use super::*; diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index a15a9f1fda9..74f928715fc 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -496,10 +496,6 @@ async fn ping_reorg() { assert_eq!(res.program_id, ping_id); assert_eq!(res.reply_payload, b"PONG"); - // Await for service block with user reply handling - // TODO: this is for better logs reading only, should find a better solution #4099 - tokio::time::sleep(env.block_time).await; - log::info!("📗 Test after reverting to the program creation snapshot"); provider .anvil_revert(program_created_snapshot_id) @@ -541,10 +537,6 @@ async fn ping_reorg() { let res = send_message.wait_for().await.unwrap(); assert_eq!(res.program_id, ping_id); assert_eq!(res.reply_payload, b"PONG"); - - // Await for service block with user reply handling - // TODO: this is for better logs reading only, should find a better solution #4099 - tokio::time::sleep(Duration::from_secs(1)).await; } // Mine 150 blocks - send message - mine 150 blocks. @@ -862,7 +854,7 @@ mod utils { .expect("failed to create observer"); let (broadcaster, _events_stream) = { - let mut observer = observer.clone(); + let observer = observer.clone(); let (sender, mut receiver) = tokio::sync::broadcast::channel::(2048); let sender = Arc::new(Mutex::new(sender)); let cloned_sender = sender.clone(); @@ -870,7 +862,7 @@ mod utils { let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let observer_events = observer.events_all(); + let observer_events = observer.events_stream_producer().events_all(); futures::pin_mut!(observer_events); send_subscription_created.send(()).unwrap(); @@ -1331,12 +1323,9 @@ mod utils { None, None, ); - - let handle = task::spawn(service.run()); + let service_pending_run = service.pending_run().await; + let handle = task::spawn(service_pending_run.complete_run()); self.running_service_handle = Some(handle); - - // Sleep to wait for the new service to start - tokio::time::sleep(Duration::from_secs(1)).await; } pub async fn stop_service(&mut self) { diff --git a/ethexe/observer/src/lib.rs b/ethexe/observer/src/lib.rs index 3a9e1a23e92..5a5524194eb 100644 --- a/ethexe/observer/src/lib.rs +++ b/ethexe/observer/src/lib.rs @@ -25,5 +25,5 @@ mod query; pub use blobs::{BlobReader, ConsensusLayerBlobReader, MockBlobReader}; pub use event::{BlockData, Event, RequestBlockData, RequestEvent, SimpleBlockData}; -pub use observer::{Observer, ObserverStatus}; +pub use observer::{EventsStreamProducer, Observer, ObserverStatus}; pub use query::Query; diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index f4070aef6fb..87f5c295c4f 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -5,7 +5,8 @@ use crate::{ use alloy::{ primitives::Address as AlloyAddress, providers::{Provider, ProviderBuilder, RootProvider}, - rpc::types::eth::{Filter, Topic}, + pubsub::Subscription, + rpc::types::eth::{Block, Filter, Topic}, transports::BoxTransport, }; use anyhow::{anyhow, Result}; @@ -74,18 +75,71 @@ impl Observer { update_fn(&mut self.status); let _ = self.status_sender.send_replace(self.status); } + pub fn provider(&self) -> &ObserverProvider { &self.provider } - pub fn events_all(&mut self) -> impl Stream + '_ { + /// Returns events stream producer, which produces a stream for polling events + /// in observed blocks. + pub fn events_stream_producer(self) -> EventsStreamProducer { + EventsStreamProducer::new(self) + } + + async fn subscribe_blocks(&self) -> Result> { + self.provider.subscribe_blocks().await.map_err(Into::into) + } +} + +/// Events stream producer. +/// +/// Produces events stream that yields obtained from the observed chain blocks. +pub struct EventsStreamProducer { + observer: Observer, + maybe_subscription: Option>, +} + +impl EventsStreamProducer { + /// Creates an events stream producer from the `observer`. + pub fn new(observer: Observer) -> Self { + Self { + observer, + maybe_subscription: None, + } + } + + /// Subscribes to blocks and stores the subscription to be used + /// when events stream is created. + /// + /// For more info read `ethexe_cli::service::ServicePendingRun` docs. + pub async fn with_blocks_subscribed_first(self) -> Self { + let subscription = self + .observer + .subscribe_blocks() + .await + .expect("failed to subscribe to blocks"); + + Self { + observer: self.observer, + maybe_subscription: Some(subscription), + } + } + + /// Returs stream for polling events. + pub fn events_all(self) -> impl Stream + 'static { + let Self { + mut observer, + maybe_subscription, + } = self; async_stream::stream! { - let block_subscription = self - .provider - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks"); - let mut block_stream = block_subscription.into_stream(); + let blocks_subscription = match maybe_subscription { + Some(blocks_subscription) => blocks_subscription, + None => observer + .subscribe_blocks() + .await + .expect("failed to subscribe to blocks") + }; + let mut block_stream = blocks_subscription.into_stream(); let mut futures = FuturesUnordered::new(); loop { @@ -103,7 +157,7 @@ impl Observer { let block_number = block.header.number; let block_timestamp = block.header.timestamp; - let events = match read_block_events(block_hash, &self.provider, self.router_address).await { + let events = match read_block_events(block_hash, &observer.provider, observer.router_address).await { Ok(events) => events, Err(err) => { log::error!("failed to read events: {err}"); @@ -118,7 +172,7 @@ impl Observer { if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { codes_len += 1; - let blob_reader = self.blob_reader.clone(); + let blob_reader = observer.blob_reader.clone(); let code_id = *code_id; let blob_tx_hash = *blob_tx_hash; @@ -136,7 +190,7 @@ impl Observer { } } - self.update_status(|status| { + observer.update_status(|status| { status.eth_block_number = block_number; if codes_len > 0 { status.last_router_state = block_number; @@ -168,14 +222,21 @@ impl Observer { } } - pub fn request_events(&mut self) -> impl Stream + '_ { + /// Returs stream for polling request events. + pub fn request_events(self) -> impl Stream + 'static { + let Self { + mut observer, + maybe_subscription, + } = self; async_stream::stream! { - let block_subscription = self - .provider - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks"); - let mut block_stream = block_subscription.into_stream(); + let blocks_subscription = match maybe_subscription { + Some(blocks_subscription) => blocks_subscription, + None => observer + .subscribe_blocks() + .await + .expect("failed to subscribe to blocks") + }; + let mut block_stream = blocks_subscription.into_stream(); let mut futures = FuturesUnordered::new(); loop { @@ -193,7 +254,7 @@ impl Observer { let block_number = block.header.number; let block_timestamp = block.header.timestamp; - let events = match read_block_request_events(block_hash, &self.provider, self.router_address).await { + let events = match read_block_request_events(block_hash, &observer.provider, observer.router_address).await { Ok(events) => events, Err(err) => { log::error!("failed to read events: {err}"); @@ -209,7 +270,7 @@ impl Observer { if let BlockRequestEvent::Router(RouterRequestEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { codes_len += 1; - let blob_reader = self.blob_reader.clone(); + let blob_reader = observer.blob_reader.clone(); let code_id = *code_id; let blob_tx_hash = *blob_tx_hash; @@ -227,7 +288,7 @@ impl Observer { } } - self.update_status(|status| { + observer.update_status(|status| { status.eth_block_number = block_number; if codes_len > 0 { status.last_router_state = block_number; @@ -563,11 +624,11 @@ mod tests { let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let mut observer = Observer::new(ðereum_rpc, router_address, cloned_blob_reader) + let observer = Observer::new(ðereum_rpc, router_address, cloned_blob_reader) .await .expect("failed to create observer"); - let observer_events = observer.events_all(); + let observer_events = observer.events_stream_producer().events_all(); futures::pin_mut!(observer_events); send_subscription_created.send(()).unwrap(); From 7cdd9af7980d314349d4bb233035e4f3c3e71cbd Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Mon, 28 Oct 2024 17:01:10 +0300 Subject: [PATCH 2/6] Add macro to avoid copy-paste --- ethexe/observer/src/observer.rs | 308 ++++++++++++-------------------- 1 file changed, 115 insertions(+), 193 deletions(-) diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 87f5c295c4f..a9d0330b093 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -91,6 +91,105 @@ impl Observer { } } +macro_rules! define_event_stream_method { + ( + $method_name:ident, + $read_events_fn:ident, + $block_event_type:ty, + $router_event_type:ty, + $block_data_type:ty, + $event_type:ty + ) => { + pub fn $method_name(self) -> impl Stream + 'static { + let Self { + mut observer, + maybe_subscription, + } = self; + async_stream::stream! { + let blocks_subscription = match maybe_subscription { + Some(subscription) => subscription, + None => observer + .subscribe_blocks() + .await + .expect("failed to subscribe to blocks"), + }; + let mut block_stream = blocks_subscription.into_stream(); + let mut futures = FuturesUnordered::new(); + + loop { + tokio::select! { + block = block_stream.next() => { + let Some(block) = block else { + log::info!("Block stream ended"); + break; + }; + + log::trace!("Received block: {:?}", block.header.hash); + + let block_hash = (*block.header.hash).into(); + let parent_hash = (*block.header.parent_hash).into(); + let block_number = block.header.number; + let block_timestamp = block.header.timestamp; + + let events = match $read_events_fn(block_hash, &observer.provider, observer.router_address).await { + Ok(events) => events, + Err(err) => { + log::error!("failed to read events: {err}"); + continue; + } + }; + + let mut codes_len = 0; + + for event in events.iter() { + if let $block_event_type::Router($router_event_type::CodeValidationRequested { code_id, blob_tx_hash }) = event { + codes_len += 1; + + let blob_reader = observer.blob_reader.clone(); + let code_id = *code_id; + let blob_tx_hash = *blob_tx_hash; + + futures.push(async move { + let attempts = Some(3); + read_code_from_tx_hash(blob_reader, code_id, blob_tx_hash, attempts).await + }); + } + } + + observer.update_status(|status| { + status.eth_block_number = block_number; + if codes_len > 0 { + status.last_router_state = block_number; + } + status.pending_upload_code = codes_len as u64; + }); + + let block_data = $block_data_type { + hash: block_hash, + header: BlockHeader { + height: block_number as u32, + timestamp: block_timestamp, + parent_hash, + }, + events, + }; + + yield $event_type::Block(block_data); + }, + future = futures.next(), if !futures.is_empty() => { + match future { + Some(Ok((code_id, code))) => yield $event_type::CodeLoaded { code_id, code }, + Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), + None => continue, + } + } + }; + } + } + } + } +} + /// Events stream producer. /// /// Produces events stream that yields obtained from the observed chain blocks. @@ -125,200 +224,23 @@ impl EventsStreamProducer { } } - /// Returs stream for polling events. - pub fn events_all(self) -> impl Stream + 'static { - let Self { - mut observer, - maybe_subscription, - } = self; - async_stream::stream! { - let blocks_subscription = match maybe_subscription { - Some(blocks_subscription) => blocks_subscription, - None => observer - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks") - }; - let mut block_stream = blocks_subscription.into_stream(); - let mut futures = FuturesUnordered::new(); - - loop { - tokio::select! { - block = block_stream.next() => { - let Some(block) = block else { - log::info!("Block stream ended"); - break; - }; - - log::trace!("Received block: {:?}", block.header.hash); - - let block_hash = (*block.header.hash).into(); - let parent_hash = (*block.header.parent_hash).into(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - - let events = match read_block_events(block_hash, &observer.provider, observer.router_address).await { - Ok(events) => events, - Err(err) => { - log::error!("failed to read events: {err}"); - continue; - } - }; - - let mut codes_len = 0; - - // Create futures to load codes - for event in events.iter() { - if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { - codes_len += 1; - - let blob_reader = observer.blob_reader.clone(); - - let code_id = *code_id; - let blob_tx_hash = *blob_tx_hash; - - futures.push(async move { - let attempts = Some(3); - - read_code_from_tx_hash( - blob_reader, - code_id, - blob_tx_hash, - attempts, - ).await - }); - } - } - - observer.update_status(|status| { - status.eth_block_number = block_number; - if codes_len > 0 { - status.last_router_state = block_number; - } - status.pending_upload_code = codes_len as u64; - }); - - let block_data = BlockData { - hash: block_hash, - header: BlockHeader { - height: block_number as u32, - timestamp: block_timestamp, - parent_hash, - }, - events, - }; - - yield Event::Block(block_data); - }, - future = futures.next(), if !futures.is_empty() => { - match future { - Some(Ok((code_id, code))) => yield Event::CodeLoaded { code_id, code }, - Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), - None => continue, - } - } - }; - } - } - } - - /// Returs stream for polling request events. - pub fn request_events(self) -> impl Stream + 'static { - let Self { - mut observer, - maybe_subscription, - } = self; - async_stream::stream! { - let blocks_subscription = match maybe_subscription { - Some(blocks_subscription) => blocks_subscription, - None => observer - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks") - }; - let mut block_stream = blocks_subscription.into_stream(); - let mut futures = FuturesUnordered::new(); - - loop { - tokio::select! { - block = block_stream.next() => { - let Some(block) = block else { - log::info!("Block stream ended"); - break; - }; - - log::trace!("Received block: {:?}", block.header.hash); - - let block_hash = (*block.header.hash).into(); - let parent_hash = (*block.header.parent_hash).into(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - - let events = match read_block_request_events(block_hash, &observer.provider, observer.router_address).await { - Ok(events) => events, - Err(err) => { - log::error!("failed to read events: {err}"); - continue; - } - }; - - let mut codes_len = 0; - - // Create futures to load codes - // TODO (breathx): remove me from here mb - for event in events.iter() { - if let BlockRequestEvent::Router(RouterRequestEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { - codes_len += 1; - - let blob_reader = observer.blob_reader.clone(); - - let code_id = *code_id; - let blob_tx_hash = *blob_tx_hash; - - futures.push(async move { - let attempts = Some(3); - - read_code_from_tx_hash( - blob_reader, - code_id, - blob_tx_hash, - attempts, - ).await - }); - } - } + define_event_stream_method!( + events_all, + read_block_events, + BlockEvent, + RouterEvent, + BlockData, + Event + ); - observer.update_status(|status| { - status.eth_block_number = block_number; - if codes_len > 0 { - status.last_router_state = block_number; - } - status.pending_upload_code = codes_len as u64; - }); - - let block_data = RequestBlockData { - hash: block_hash, - header: BlockHeader { - height: block_number as u32, - timestamp: block_timestamp, - parent_hash, - }, - events, - }; - - yield RequestEvent::Block(block_data); - }, - future = futures.next(), if !futures.is_empty() => { - match future { - Some(Ok((code_id, code))) => yield RequestEvent::CodeLoaded { code_id, code }, - Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), - None => continue, - } - } - }; - } - } - } + define_event_stream_method!( + request_events, + read_block_request_events, + BlockRequestEvent, + RouterRequestEvent, + RequestBlockData, + RequestEvent + ); } pub(crate) async fn read_code_from_tx_hash( From 6e450c46487c1b2a372002ad7b667cb973a38186 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 12 Nov 2024 12:26:55 +0300 Subject: [PATCH 3/6] Add timer --- ethexe/cli/src/tests.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index b7559dffe31..05b88103026 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -1334,6 +1334,9 @@ mod utils { let service_pending_run = service.pending_run().await; let handle = task::spawn(service_pending_run.complete_run()); self.running_service_handle = Some(handle); + + // Sleep to wait for the new service to start + tokio::time::sleep(Duration::from_secs(1)).await; } pub async fn stop_service(&mut self) { From 3174f9439943c60e94389f3d3f08cb0865aa7d19 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 12 Nov 2024 21:42:11 +0300 Subject: [PATCH 4/6] Trigger CI From a465d72aa3ceae68f3f5a8705bf56829f96ca095 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 12 Nov 2024 21:54:31 +0300 Subject: [PATCH 5/6] Remove timer, set block time, increase attempts amount --- ethexe/cli/src/tests.rs | 8 ++++---- ethexe/ethereum/src/lib.rs | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 05b88103026..06fb53b9117 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -824,7 +824,10 @@ mod utils { (rpc_url, None) } Err(_) => { - let anvil = Anvil::new().try_spawn().unwrap(); + let anvil = Anvil::new() + .block_time(block_time.as_secs()) + .try_spawn() + .unwrap(); log::info!("📍 Anvil started at {}", anvil.ws_endpoint()); (anvil.ws_endpoint(), Some(anvil)) } @@ -1334,9 +1337,6 @@ mod utils { let service_pending_run = service.pending_run().await; let handle = task::spawn(service_pending_run.complete_run()); self.running_service_handle = Some(handle); - - // Sleep to wait for the new service to start - tokio::time::sleep(Duration::from_secs(1)).await; } pub async fn stop_service(&mut self) { diff --git a/ethexe/ethereum/src/lib.rs b/ethexe/ethereum/src/lib.rs index 190fb01934e..5537926fc02 100644 --- a/ethexe/ethereum/src/lib.rs +++ b/ethexe/ethereum/src/lib.rs @@ -297,7 +297,9 @@ impl TryGetReceipt for PendingTransactio Err(err) => err, }; - for _ in 0..3 { + log::trace!("Failed to get transaction receipt for {tx_hash}. Retrying..."); + for n in 0..25 { + log::trace!("Attempt {n}. Error - {err}"); match err { PendingTransactionError::TransportError(RpcError::NullResp) => {} _ => break, From 49b8e2bd5c0b5ab6aea828be90be7e1d0d6cdd17 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 12 Nov 2024 22:11:08 +0300 Subject: [PATCH 6/6] Add TODO --- ethexe/observer/src/observer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index a9d0330b093..3eabe32ea9c 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -108,6 +108,7 @@ macro_rules! define_event_stream_method { async_stream::stream! { let blocks_subscription = match maybe_subscription { Some(subscription) => subscription, + // TODO #4335: always subscribe blocks when Observer is created. None => observer .subscribe_blocks() .await