Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(publisher): Remove PublishError in favor of anyhow::Result #312

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/fuel-streams-publisher/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::sync::Arc;
use fuel_streams_core::prelude::*;
use tokio::task::JoinHandle;

use crate::packets::{PublishError, PublishOpts, PublishPacket};
use crate::packets::{PublishOpts, PublishPacket};

pub fn publish_task(
block: &Block<Transaction>,
stream: Arc<Stream<Block>>,
opts: &Arc<PublishOpts>,
) -> JoinHandle<Result<(), PublishError>> {
) -> JoinHandle<anyhow::Result<()>> {
let block_height = block.header().consensus().height.into();
let block_producer = (*opts.block_producer).clone();
let packet = PublishPacket::new(
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-streams-publisher/src/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ use tokio::task::JoinHandle;

use crate::{
identifiers::{Identifier, IdsExtractable, PacketIdBuilder},
packets::{PublishError, PublishOpts, PublishPacket},
packets::{PublishOpts, PublishPacket},
};

pub fn publish_tasks(
tx: &Transaction,
tx_id: &Bytes32,
stream: &Stream<Input>,
opts: &Arc<PublishOpts>,
) -> Vec<JoinHandle<Result<(), PublishError>>> {
) -> Vec<JoinHandle<anyhow::Result<()>>> {
let packets: Vec<PublishPacket<Input>> = tx
.inputs()
.par_iter()
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-streams-publisher/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use fuel_streams_core::prelude::*;
use rayon::prelude::*;
use tokio::task::JoinHandle;

use crate::packets::{PublishError, PublishOpts, PublishPacket};
use crate::packets::{PublishOpts, PublishPacket};

pub fn publish_tasks(
tx_id: &Bytes32,
stream: &Stream<Log>,
opts: &Arc<PublishOpts>,
receipts: &Vec<Receipt>,
) -> Vec<JoinHandle<Result<(), PublishError>>> {
) -> Vec<JoinHandle<anyhow::Result<()>>> {
let block_height = (*opts.block_height).clone();
let packets: Vec<PublishPacket<Log>> = receipts
.par_iter()
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-streams-publisher/src/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use tokio::task::JoinHandle;

use crate::{
identifiers::{Identifier, IdsExtractable, PacketIdBuilder},
packets::{PublishError, PublishOpts, PublishPacket},
packets::{PublishOpts, PublishPacket},
};

pub fn publish_tasks(
tx: &Transaction,
tx_id: &Bytes32,
stream: &Stream<Output>,
opts: &Arc<PublishOpts>,
) -> Vec<JoinHandle<Result<(), PublishError>>> {
) -> Vec<JoinHandle<anyhow::Result<()>>> {
let packets: Vec<PublishPacket<Output>> = tx
.outputs()
.par_iter()
Expand Down
21 changes: 3 additions & 18 deletions crates/fuel-streams-publisher/src/packets.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
use std::sync::Arc;

use fuel_streams_core::prelude::*;
use thiserror::Error;
use tokio::{sync::Semaphore, task::JoinHandle};

use crate::telemetry::Telemetry;

#[derive(Error, Debug)]
pub enum PublishError {
#[error("Failed to publish to stream: {0}")]
StreamPublish(String),
#[error("Semaphore acquisition failed: {0}")]
Semaphore(#[from] tokio::sync::AcquireError),
#[error("Unknown error: {0}")]
Unknown(String),
}

#[derive(Clone)]
pub struct PublishOpts {
pub semaphore: Arc<Semaphore>,
Expand Down Expand Up @@ -43,7 +32,7 @@ impl<T: Streamable + 'static> PublishPacket<T> {
&self,
stream: Arc<Stream<T>>,
opts: &Arc<PublishOpts>,
) -> JoinHandle<Result<(), PublishError>> {
) -> JoinHandle<anyhow::Result<()>> {
let stream = Arc::clone(&stream);
let opts = Arc::clone(opts);
let payload = Arc::clone(&self.payload);
Expand All @@ -52,11 +41,7 @@ impl<T: Streamable + 'static> PublishPacket<T> {
let wildcard = self.subject.wildcard();

tokio::spawn(async move {
let _permit = opts
.semaphore
.acquire()
.await
.map_err(PublishError::Semaphore)?;
let _permit = opts.semaphore.acquire().await?;

match stream.publish(&*subject, &payload).await {
Ok(published_data_size) => {
Expand All @@ -82,7 +67,7 @@ impl<T: Streamable + 'static> PublishPacket<T> {
&e.to_string(),
);

Err(PublishError::StreamPublish(e.to_string()))
anyhow::bail!(e.to_string())
}
}
})
Expand Down
68 changes: 13 additions & 55 deletions crates/fuel-streams-publisher/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ use fuel_core_importer::ImporterResult;
use fuel_core_types::fuel_tx::Output;
use fuel_streams::types::Log;
use fuel_streams_core::prelude::*;
use futures::{stream::FuturesUnordered, StreamExt};
use rayon::prelude::*;
use thiserror::Error;
use futures::{future::try_join_all, stream::FuturesUnordered};
use tokio::sync::{broadcast::error::RecvError, Semaphore};

use crate::{
blocks,
packets::{PublishError, PublishOpts},
packets::PublishOpts,
publisher_shutdown::{ShutdownToken, GRACEFUL_SHUTDOWN_TIMEOUT},
telemetry::Telemetry,
transactions,
Expand Down Expand Up @@ -97,14 +95,6 @@ impl Streams {
}
}

#[derive(Error, Debug)]
pub enum PublishBlockError {
#[error("Task execution error: {0}")]
TaskExecution(String),
#[error("Publish error: {0}")]
Publish(#[from] PublishError),
}

#[allow(dead_code)]
#[derive(Clone)]
pub struct Publisher {
Expand Down Expand Up @@ -291,6 +281,7 @@ impl Publisher {
block_producer: &Address,
) -> anyhow::Result<()> {
let start_time = std::time::Instant::now();

let fuel_core = &*self.fuel_core;
let semaphore = Arc::new(Semaphore::new(*PUBLISHER_MAX_THREADS));
let chain_id = Arc::new(*self.fuel_core.chain_id());
Expand All @@ -308,7 +299,7 @@ impl Publisher {
telemetry: self.telemetry.clone(),
});

let tasks =
let publish_tasks =
transactions::publish_all_tasks(txs, streams, opts, fuel_core)
.into_iter()
.chain(std::iter::once(blocks::publish_task(
Expand All @@ -318,48 +309,15 @@ impl Publisher {
)))
.collect::<FuturesUnordered<_>>();

let errors: Vec<PublishBlockError> = tasks
.filter_map(|res| self.handle_task_result(res))
.collect()
.await;

if !errors.is_empty() {
let error_message = errors
.par_iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ");

Err(anyhow::anyhow!(
"Errors occurred during block publishing: {}",
error_message
))
} else {
let elapsed = start_time.elapsed();
tracing::info!(
"Published streams for BlockHeight: {} in {:?}",
*block_height,
elapsed
);

Ok(())
}
}
try_join_all(publish_tasks).await?;

async fn handle_task_result(
&self,
result: Result<Result<(), PublishError>, tokio::task::JoinError>,
) -> Option<PublishBlockError> {
match result {
Ok(Ok(())) => None,
Ok(Err(publish_error)) => {
tracing::error!("Publish error: {:?}", publish_error);
Some(PublishBlockError::Publish(publish_error))
}
Err(join_error) => {
tracing::error!("Task join error: {:?}", join_error);
Some(PublishBlockError::TaskExecution(join_error.to_string()))
}
}
let elapsed = start_time.elapsed();
tracing::info!(
"Published streams for BlockHeight: {} in {:?}",
*block_height,
elapsed
);

Ok(())
}
}
4 changes: 2 additions & 2 deletions crates/fuel-streams-publisher/src/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::task::JoinHandle;

use crate::{
identifiers::{Identifier, IdsExtractable, PacketIdBuilder},
packets::{PublishError, PublishOpts, PublishPacket},
packets::{PublishOpts, PublishPacket},
};

pub fn publish_tasks(
Expand All @@ -16,7 +16,7 @@ pub fn publish_tasks(
stream: &Stream<Receipt>,
opts: &Arc<PublishOpts>,
receipts: &Vec<Receipt>,
) -> Vec<JoinHandle<Result<(), PublishError>>> {
) -> Vec<JoinHandle<anyhow::Result<()>>> {
let packets: Vec<PublishPacket<Receipt>> = receipts
.par_iter()
.enumerate()
Expand Down
6 changes: 3 additions & 3 deletions crates/fuel-streams-publisher/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
inputs::publish_tasks as publish_inputs,
logs::publish_tasks as publish_logs,
outputs::publish_tasks as publish_outputs,
packets::{PublishError, PublishOpts, PublishPacket},
packets::{PublishOpts, PublishPacket},
receipts::publish_tasks as publish_receipts,
sha256,
utxos::publish_tasks as publish_utxos,
Expand All @@ -27,7 +27,7 @@ pub fn publish_all_tasks(
streams: Streams,
opts: &Arc<PublishOpts>,
fuel_core: &dyn FuelCoreLike,
) -> Vec<JoinHandle<Result<(), PublishError>>> {
) -> Vec<JoinHandle<anyhow::Result<()>>> {
let offline_db = fuel_core.database().off_chain().latest_view().unwrap();

transactions
Expand Down Expand Up @@ -84,7 +84,7 @@ fn publish_tasks(
stream: &Stream<Transaction>,
opts: &Arc<PublishOpts>,
receipts: &Vec<Receipt>,
) -> Vec<JoinHandle<Result<(), PublishError>>> {
) -> Vec<JoinHandle<anyhow::Result<()>>> {
let block_height = &opts.block_height;
packets_from_tx(tx_item, tx_id, tx_status, block_height, receipts)
.iter()
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-streams-publisher/src/utxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rayon::prelude::*;
use tokio::task::JoinHandle;

use crate::{
packets::{PublishError, PublishOpts, PublishPacket},
packets::{PublishOpts, PublishPacket},
FuelCoreLike,
};

Expand All @@ -17,7 +17,7 @@ pub fn publish_tasks(
stream: &Stream<Utxo>,
opts: &Arc<PublishOpts>,
fuel_core: &dyn FuelCoreLike,
) -> Vec<JoinHandle<Result<(), PublishError>>> {
) -> Vec<JoinHandle<anyhow::Result<()>>> {
let packets: Vec<(UtxosSubject, Utxo)> = tx
.inputs()
.par_iter()
Expand Down
Loading