From 3f5f663b748bfe52d26118a7eb0b073cd0f6b0be Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Thu, 14 Nov 2024 14:50:13 +0100 Subject: [PATCH] refactor(publisher): remove `PublishError` in favor of `anyhow::Result` This commit removes PublishError in favor of bubbling pending unactionable errors to std when there is an error publishing any subject/payload. Given the focus of the Publish process, only dependent libraries or actionable errors should have an error signature.We can safely do this since it's all application code and we will be relying on our tracing for stack traces to implement certain actions after analyzing the errors. This makes our code more concise for the domain solution it solves. --- crates/fuel-streams-publisher/src/blocks.rs | 4 +- crates/fuel-streams-publisher/src/inputs.rs | 4 +- crates/fuel-streams-publisher/src/logs.rs | 4 +- crates/fuel-streams-publisher/src/outputs.rs | 4 +- crates/fuel-streams-publisher/src/packets.rs | 21 +----- .../fuel-streams-publisher/src/publisher.rs | 68 ++++--------------- crates/fuel-streams-publisher/src/receipts.rs | 4 +- .../src/transactions.rs | 6 +- crates/fuel-streams-publisher/src/utxos.rs | 4 +- 9 files changed, 31 insertions(+), 88 deletions(-) diff --git a/crates/fuel-streams-publisher/src/blocks.rs b/crates/fuel-streams-publisher/src/blocks.rs index e6374e5d..354d5351 100644 --- a/crates/fuel-streams-publisher/src/blocks.rs +++ b/crates/fuel-streams-publisher/src/blocks.rs @@ -3,13 +3,13 @@ use std::sync::Arc; use fuel_streams_core::prelude::*; use tokio::task::JoinHandle; -use crate::packets::{PublishError, PublishOpts, PublishPacket}; +use crate::packets::{PublishOpts, PublishPacket}; pub fn publish_task( block: &Block, stream: Arc>, opts: &Arc, -) -> JoinHandle> { +) -> JoinHandle> { let block_height = block.header().consensus().height.into(); let block_producer = (*opts.block_producer).clone(); let packet = PublishPacket::new( diff --git a/crates/fuel-streams-publisher/src/inputs.rs b/crates/fuel-streams-publisher/src/inputs.rs index 663eb07f..44a295db 100644 --- a/crates/fuel-streams-publisher/src/inputs.rs +++ b/crates/fuel-streams-publisher/src/inputs.rs @@ -15,7 +15,7 @@ use tokio::task::JoinHandle; use crate::{ identifiers::{Identifier, IdsExtractable, PacketIdBuilder}, - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, }; pub fn publish_tasks( @@ -23,7 +23,7 @@ pub fn publish_tasks( tx_id: &Bytes32, stream: &Stream, opts: &Arc, -) -> Vec>> { +) -> Vec>> { let packets: Vec> = tx .inputs() .par_iter() diff --git a/crates/fuel-streams-publisher/src/logs.rs b/crates/fuel-streams-publisher/src/logs.rs index f06e66ae..28286588 100644 --- a/crates/fuel-streams-publisher/src/logs.rs +++ b/crates/fuel-streams-publisher/src/logs.rs @@ -5,14 +5,14 @@ use fuel_streams_core::prelude::*; use rayon::prelude::*; use tokio::task::JoinHandle; -use crate::packets::{PublishError, PublishOpts, PublishPacket}; +use crate::packets::{PublishOpts, PublishPacket}; pub fn publish_tasks( tx_id: &Bytes32, stream: &Stream, opts: &Arc, receipts: &Vec, -) -> Vec>> { +) -> Vec>> { let block_height = (*opts.block_height).clone(); let packets: Vec> = receipts .par_iter() diff --git a/crates/fuel-streams-publisher/src/outputs.rs b/crates/fuel-streams-publisher/src/outputs.rs index 5da7e18d..9109b3bd 100644 --- a/crates/fuel-streams-publisher/src/outputs.rs +++ b/crates/fuel-streams-publisher/src/outputs.rs @@ -7,7 +7,7 @@ use tokio::task::JoinHandle; use crate::{ identifiers::{Identifier, IdsExtractable, PacketIdBuilder}, - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, }; pub fn publish_tasks( @@ -15,7 +15,7 @@ pub fn publish_tasks( tx_id: &Bytes32, stream: &Stream, opts: &Arc, -) -> Vec>> { +) -> Vec>> { let packets: Vec> = tx .outputs() .par_iter() diff --git a/crates/fuel-streams-publisher/src/packets.rs b/crates/fuel-streams-publisher/src/packets.rs index bbb6e3ae..ebc01318 100644 --- a/crates/fuel-streams-publisher/src/packets.rs +++ b/crates/fuel-streams-publisher/src/packets.rs @@ -1,21 +1,10 @@ use std::sync::Arc; use fuel_streams_core::prelude::*; -use thiserror::Error; use tokio::{sync::Semaphore, task::JoinHandle}; use crate::telemetry::Telemetry; -#[derive(Error, Debug)] -pub enum PublishError { - #[error("Failed to publish to stream: {0}")] - StreamPublish(String), - #[error("Semaphore acquisition failed: {0}")] - Semaphore(#[from] tokio::sync::AcquireError), - #[error("Unknown error: {0}")] - Unknown(String), -} - #[derive(Clone)] pub struct PublishOpts { pub semaphore: Arc, @@ -43,7 +32,7 @@ impl PublishPacket { &self, stream: Arc>, opts: &Arc, - ) -> JoinHandle> { + ) -> JoinHandle> { let stream = Arc::clone(&stream); let opts = Arc::clone(opts); let payload = Arc::clone(&self.payload); @@ -52,11 +41,7 @@ impl PublishPacket { let wildcard = self.subject.wildcard(); tokio::spawn(async move { - let _permit = opts - .semaphore - .acquire() - .await - .map_err(PublishError::Semaphore)?; + let _permit = opts.semaphore.acquire().await?; match stream.publish(&*subject, &payload).await { Ok(published_data_size) => { @@ -82,7 +67,7 @@ impl PublishPacket { &e.to_string(), ); - Err(PublishError::StreamPublish(e.to_string())) + anyhow::bail!(e.to_string()) } } }) diff --git a/crates/fuel-streams-publisher/src/publisher.rs b/crates/fuel-streams-publisher/src/publisher.rs index af90c7cf..ae555d96 100644 --- a/crates/fuel-streams-publisher/src/publisher.rs +++ b/crates/fuel-streams-publisher/src/publisher.rs @@ -6,14 +6,12 @@ use fuel_core_importer::ImporterResult; use fuel_core_types::fuel_tx::Output; use fuel_streams::types::Log; use fuel_streams_core::prelude::*; -use futures::{stream::FuturesUnordered, StreamExt}; -use rayon::prelude::*; -use thiserror::Error; +use futures::{future::try_join_all, stream::FuturesUnordered}; use tokio::sync::{broadcast::error::RecvError, Semaphore}; use crate::{ blocks, - packets::{PublishError, PublishOpts}, + packets::PublishOpts, publisher_shutdown::{ShutdownToken, GRACEFUL_SHUTDOWN_TIMEOUT}, telemetry::Telemetry, transactions, @@ -97,14 +95,6 @@ impl Streams { } } -#[derive(Error, Debug)] -pub enum PublishBlockError { - #[error("Task execution error: {0}")] - TaskExecution(String), - #[error("Publish error: {0}")] - Publish(#[from] PublishError), -} - #[allow(dead_code)] #[derive(Clone)] pub struct Publisher { @@ -291,6 +281,7 @@ impl Publisher { block_producer: &Address, ) -> anyhow::Result<()> { let start_time = std::time::Instant::now(); + let fuel_core = &*self.fuel_core; let semaphore = Arc::new(Semaphore::new(*PUBLISHER_MAX_THREADS)); let chain_id = Arc::new(*self.fuel_core.chain_id()); @@ -308,7 +299,7 @@ impl Publisher { telemetry: self.telemetry.clone(), }); - let tasks = + let publish_tasks = transactions::publish_all_tasks(txs, streams, opts, fuel_core) .into_iter() .chain(std::iter::once(blocks::publish_task( @@ -318,48 +309,15 @@ impl Publisher { ))) .collect::>(); - let errors: Vec = tasks - .filter_map(|res| self.handle_task_result(res)) - .collect() - .await; - - if !errors.is_empty() { - let error_message = errors - .par_iter() - .map(|e| e.to_string()) - .collect::>() - .join(", "); - - Err(anyhow::anyhow!( - "Errors occurred during block publishing: {}", - error_message - )) - } else { - let elapsed = start_time.elapsed(); - tracing::info!( - "Published streams for BlockHeight: {} in {:?}", - *block_height, - elapsed - ); - - Ok(()) - } - } + try_join_all(publish_tasks).await?; - async fn handle_task_result( - &self, - result: Result, tokio::task::JoinError>, - ) -> Option { - match result { - Ok(Ok(())) => None, - Ok(Err(publish_error)) => { - tracing::error!("Publish error: {:?}", publish_error); - Some(PublishBlockError::Publish(publish_error)) - } - Err(join_error) => { - tracing::error!("Task join error: {:?}", join_error); - Some(PublishBlockError::TaskExecution(join_error.to_string())) - } - } + let elapsed = start_time.elapsed(); + tracing::info!( + "Published streams for BlockHeight: {} in {:?}", + *block_height, + elapsed + ); + + Ok(()) } } diff --git a/crates/fuel-streams-publisher/src/receipts.rs b/crates/fuel-streams-publisher/src/receipts.rs index 2deca6d7..cde0c8a6 100644 --- a/crates/fuel-streams-publisher/src/receipts.rs +++ b/crates/fuel-streams-publisher/src/receipts.rs @@ -7,7 +7,7 @@ use tokio::task::JoinHandle; use crate::{ identifiers::{Identifier, IdsExtractable, PacketIdBuilder}, - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, }; pub fn publish_tasks( @@ -16,7 +16,7 @@ pub fn publish_tasks( stream: &Stream, opts: &Arc, receipts: &Vec, -) -> Vec>> { +) -> Vec>> { let packets: Vec> = receipts .par_iter() .enumerate() diff --git a/crates/fuel-streams-publisher/src/transactions.rs b/crates/fuel-streams-publisher/src/transactions.rs index 75a53d8d..4a105443 100644 --- a/crates/fuel-streams-publisher/src/transactions.rs +++ b/crates/fuel-streams-publisher/src/transactions.rs @@ -14,7 +14,7 @@ use crate::{ inputs::publish_tasks as publish_inputs, logs::publish_tasks as publish_logs, outputs::publish_tasks as publish_outputs, - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, receipts::publish_tasks as publish_receipts, sha256, utxos::publish_tasks as publish_utxos, @@ -27,7 +27,7 @@ pub fn publish_all_tasks( streams: Streams, opts: &Arc, fuel_core: &dyn FuelCoreLike, -) -> Vec>> { +) -> Vec>> { let offline_db = fuel_core.database().off_chain().latest_view().unwrap(); transactions @@ -84,7 +84,7 @@ fn publish_tasks( stream: &Stream, opts: &Arc, receipts: &Vec, -) -> Vec>> { +) -> Vec>> { let block_height = &opts.block_height; packets_from_tx(tx_item, tx_id, tx_status, block_height, receipts) .iter() diff --git a/crates/fuel-streams-publisher/src/utxos.rs b/crates/fuel-streams-publisher/src/utxos.rs index 751f35ea..18c7e6fa 100644 --- a/crates/fuel-streams-publisher/src/utxos.rs +++ b/crates/fuel-streams-publisher/src/utxos.rs @@ -7,7 +7,7 @@ use rayon::prelude::*; use tokio::task::JoinHandle; use crate::{ - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, FuelCoreLike, }; @@ -17,7 +17,7 @@ pub fn publish_tasks( stream: &Stream, opts: &Arc, fuel_core: &dyn FuelCoreLike, -) -> Vec>> { +) -> Vec>> { let packets: Vec<(UtxosSubject, Utxo)> = tx .inputs() .par_iter()