diff --git a/crates/fuel-streams-publisher/src/blocks.rs b/crates/fuel-streams-publisher/src/blocks.rs index e6374e5..354d535 100644 --- a/crates/fuel-streams-publisher/src/blocks.rs +++ b/crates/fuel-streams-publisher/src/blocks.rs @@ -3,13 +3,13 @@ use std::sync::Arc; use fuel_streams_core::prelude::*; use tokio::task::JoinHandle; -use crate::packets::{PublishError, PublishOpts, PublishPacket}; +use crate::packets::{PublishOpts, PublishPacket}; pub fn publish_task( block: &Block, stream: Arc>, opts: &Arc, -) -> JoinHandle> { +) -> JoinHandle> { let block_height = block.header().consensus().height.into(); let block_producer = (*opts.block_producer).clone(); let packet = PublishPacket::new( diff --git a/crates/fuel-streams-publisher/src/inputs.rs b/crates/fuel-streams-publisher/src/inputs.rs index 663eb07..44a295d 100644 --- a/crates/fuel-streams-publisher/src/inputs.rs +++ b/crates/fuel-streams-publisher/src/inputs.rs @@ -15,7 +15,7 @@ use tokio::task::JoinHandle; use crate::{ identifiers::{Identifier, IdsExtractable, PacketIdBuilder}, - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, }; pub fn publish_tasks( @@ -23,7 +23,7 @@ pub fn publish_tasks( tx_id: &Bytes32, stream: &Stream, opts: &Arc, -) -> Vec>> { +) -> Vec>> { let packets: Vec> = tx .inputs() .par_iter() diff --git a/crates/fuel-streams-publisher/src/logs.rs b/crates/fuel-streams-publisher/src/logs.rs index f06e66a..2828658 100644 --- a/crates/fuel-streams-publisher/src/logs.rs +++ b/crates/fuel-streams-publisher/src/logs.rs @@ -5,14 +5,14 @@ use fuel_streams_core::prelude::*; use rayon::prelude::*; use tokio::task::JoinHandle; -use crate::packets::{PublishError, PublishOpts, PublishPacket}; +use crate::packets::{PublishOpts, PublishPacket}; pub fn publish_tasks( tx_id: &Bytes32, stream: &Stream, opts: &Arc, receipts: &Vec, -) -> Vec>> { +) -> Vec>> { let block_height = (*opts.block_height).clone(); let packets: Vec> = receipts .par_iter() diff --git a/crates/fuel-streams-publisher/src/outputs.rs b/crates/fuel-streams-publisher/src/outputs.rs index 5da7e18..9109b3b 100644 --- a/crates/fuel-streams-publisher/src/outputs.rs +++ b/crates/fuel-streams-publisher/src/outputs.rs @@ -7,7 +7,7 @@ use tokio::task::JoinHandle; use crate::{ identifiers::{Identifier, IdsExtractable, PacketIdBuilder}, - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, }; pub fn publish_tasks( @@ -15,7 +15,7 @@ pub fn publish_tasks( tx_id: &Bytes32, stream: &Stream, opts: &Arc, -) -> Vec>> { +) -> Vec>> { let packets: Vec> = tx .outputs() .par_iter() diff --git a/crates/fuel-streams-publisher/src/packets.rs b/crates/fuel-streams-publisher/src/packets.rs index bbb6e3a..ebc0131 100644 --- a/crates/fuel-streams-publisher/src/packets.rs +++ b/crates/fuel-streams-publisher/src/packets.rs @@ -1,21 +1,10 @@ use std::sync::Arc; use fuel_streams_core::prelude::*; -use thiserror::Error; use tokio::{sync::Semaphore, task::JoinHandle}; use crate::telemetry::Telemetry; -#[derive(Error, Debug)] -pub enum PublishError { - #[error("Failed to publish to stream: {0}")] - StreamPublish(String), - #[error("Semaphore acquisition failed: {0}")] - Semaphore(#[from] tokio::sync::AcquireError), - #[error("Unknown error: {0}")] - Unknown(String), -} - #[derive(Clone)] pub struct PublishOpts { pub semaphore: Arc, @@ -43,7 +32,7 @@ impl PublishPacket { &self, stream: Arc>, opts: &Arc, - ) -> JoinHandle> { + ) -> JoinHandle> { let stream = Arc::clone(&stream); let opts = Arc::clone(opts); let payload = Arc::clone(&self.payload); @@ -52,11 +41,7 @@ impl PublishPacket { let wildcard = self.subject.wildcard(); tokio::spawn(async move { - let _permit = opts - .semaphore - .acquire() - .await - .map_err(PublishError::Semaphore)?; + let _permit = opts.semaphore.acquire().await?; match stream.publish(&*subject, &payload).await { Ok(published_data_size) => { @@ -82,7 +67,7 @@ impl PublishPacket { &e.to_string(), ); - Err(PublishError::StreamPublish(e.to_string())) + anyhow::bail!(e.to_string()) } } }) diff --git a/crates/fuel-streams-publisher/src/publisher.rs b/crates/fuel-streams-publisher/src/publisher.rs index af90c7c..ae555d9 100644 --- a/crates/fuel-streams-publisher/src/publisher.rs +++ b/crates/fuel-streams-publisher/src/publisher.rs @@ -6,14 +6,12 @@ use fuel_core_importer::ImporterResult; use fuel_core_types::fuel_tx::Output; use fuel_streams::types::Log; use fuel_streams_core::prelude::*; -use futures::{stream::FuturesUnordered, StreamExt}; -use rayon::prelude::*; -use thiserror::Error; +use futures::{future::try_join_all, stream::FuturesUnordered}; use tokio::sync::{broadcast::error::RecvError, Semaphore}; use crate::{ blocks, - packets::{PublishError, PublishOpts}, + packets::PublishOpts, publisher_shutdown::{ShutdownToken, GRACEFUL_SHUTDOWN_TIMEOUT}, telemetry::Telemetry, transactions, @@ -97,14 +95,6 @@ impl Streams { } } -#[derive(Error, Debug)] -pub enum PublishBlockError { - #[error("Task execution error: {0}")] - TaskExecution(String), - #[error("Publish error: {0}")] - Publish(#[from] PublishError), -} - #[allow(dead_code)] #[derive(Clone)] pub struct Publisher { @@ -291,6 +281,7 @@ impl Publisher { block_producer: &Address, ) -> anyhow::Result<()> { let start_time = std::time::Instant::now(); + let fuel_core = &*self.fuel_core; let semaphore = Arc::new(Semaphore::new(*PUBLISHER_MAX_THREADS)); let chain_id = Arc::new(*self.fuel_core.chain_id()); @@ -308,7 +299,7 @@ impl Publisher { telemetry: self.telemetry.clone(), }); - let tasks = + let publish_tasks = transactions::publish_all_tasks(txs, streams, opts, fuel_core) .into_iter() .chain(std::iter::once(blocks::publish_task( @@ -318,48 +309,15 @@ impl Publisher { ))) .collect::>(); - let errors: Vec = tasks - .filter_map(|res| self.handle_task_result(res)) - .collect() - .await; - - if !errors.is_empty() { - let error_message = errors - .par_iter() - .map(|e| e.to_string()) - .collect::>() - .join(", "); - - Err(anyhow::anyhow!( - "Errors occurred during block publishing: {}", - error_message - )) - } else { - let elapsed = start_time.elapsed(); - tracing::info!( - "Published streams for BlockHeight: {} in {:?}", - *block_height, - elapsed - ); - - Ok(()) - } - } + try_join_all(publish_tasks).await?; - async fn handle_task_result( - &self, - result: Result, tokio::task::JoinError>, - ) -> Option { - match result { - Ok(Ok(())) => None, - Ok(Err(publish_error)) => { - tracing::error!("Publish error: {:?}", publish_error); - Some(PublishBlockError::Publish(publish_error)) - } - Err(join_error) => { - tracing::error!("Task join error: {:?}", join_error); - Some(PublishBlockError::TaskExecution(join_error.to_string())) - } - } + let elapsed = start_time.elapsed(); + tracing::info!( + "Published streams for BlockHeight: {} in {:?}", + *block_height, + elapsed + ); + + Ok(()) } } diff --git a/crates/fuel-streams-publisher/src/receipts.rs b/crates/fuel-streams-publisher/src/receipts.rs index 2deca6d..cde0c8a 100644 --- a/crates/fuel-streams-publisher/src/receipts.rs +++ b/crates/fuel-streams-publisher/src/receipts.rs @@ -7,7 +7,7 @@ use tokio::task::JoinHandle; use crate::{ identifiers::{Identifier, IdsExtractable, PacketIdBuilder}, - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, }; pub fn publish_tasks( @@ -16,7 +16,7 @@ pub fn publish_tasks( stream: &Stream, opts: &Arc, receipts: &Vec, -) -> Vec>> { +) -> Vec>> { let packets: Vec> = receipts .par_iter() .enumerate() diff --git a/crates/fuel-streams-publisher/src/transactions.rs b/crates/fuel-streams-publisher/src/transactions.rs index 75a53d8..4a10544 100644 --- a/crates/fuel-streams-publisher/src/transactions.rs +++ b/crates/fuel-streams-publisher/src/transactions.rs @@ -14,7 +14,7 @@ use crate::{ inputs::publish_tasks as publish_inputs, logs::publish_tasks as publish_logs, outputs::publish_tasks as publish_outputs, - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, receipts::publish_tasks as publish_receipts, sha256, utxos::publish_tasks as publish_utxos, @@ -27,7 +27,7 @@ pub fn publish_all_tasks( streams: Streams, opts: &Arc, fuel_core: &dyn FuelCoreLike, -) -> Vec>> { +) -> Vec>> { let offline_db = fuel_core.database().off_chain().latest_view().unwrap(); transactions @@ -84,7 +84,7 @@ fn publish_tasks( stream: &Stream, opts: &Arc, receipts: &Vec, -) -> Vec>> { +) -> Vec>> { let block_height = &opts.block_height; packets_from_tx(tx_item, tx_id, tx_status, block_height, receipts) .iter() diff --git a/crates/fuel-streams-publisher/src/utxos.rs b/crates/fuel-streams-publisher/src/utxos.rs index 751f35e..18c7e6f 100644 --- a/crates/fuel-streams-publisher/src/utxos.rs +++ b/crates/fuel-streams-publisher/src/utxos.rs @@ -7,7 +7,7 @@ use rayon::prelude::*; use tokio::task::JoinHandle; use crate::{ - packets::{PublishError, PublishOpts, PublishPacket}, + packets::{PublishOpts, PublishPacket}, FuelCoreLike, }; @@ -17,7 +17,7 @@ pub fn publish_tasks( stream: &Stream, opts: &Arc, fuel_core: &dyn FuelCoreLike, -) -> Vec>> { +) -> Vec>> { let packets: Vec<(UtxosSubject, Utxo)> = tx .inputs() .par_iter()