Skip to content

Commit

Permalink
Merge branch 'master' into feature/wrong-receipt-encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx authored Jul 27, 2023
2 parents dc48643 + 678f6ae commit 412fb4d
Show file tree
Hide file tree
Showing 18 changed files with 793 additions and 671 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ hyper = { version = "0.14.26" }
rand = "0.8"
parking_lot = "0.12"
tokio = { version = "1.27", default-features = false }
tokio-rayon = "2.1.0"
tokio-stream = "0.1"
tracing = "0.1"
thiserror = "1.0"
Expand Down
6 changes: 5 additions & 1 deletion crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,16 @@ pub trait DatabaseChain {
fn base_chain_height(&self) -> StorageResult<DaBlockHeight>;
}

#[async_trait]
pub trait TxPoolPort: Send + Sync {
fn transaction(&self, id: TxId) -> Option<Transaction>;

fn submission_time(&self, id: TxId) -> Option<Tai64>;

fn insert(&self, txs: Vec<Arc<Transaction>>) -> Vec<anyhow::Result<InsertionResult>>;
async fn insert(
&self,
txs: Vec<Arc<Transaction>>,
) -> Vec<anyhow::Result<InsertionResult>>;

fn tx_update_subscribe(
&self,
Expand Down
3 changes: 2 additions & 1 deletion crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,15 @@ impl Node {
}

/// Insert the test transactions into the node's transaction pool.
pub fn insert_txs(&self) -> HashMap<Bytes32, Transaction> {
pub async fn insert_txs(&self) -> HashMap<Bytes32, Transaction> {
let mut expected = HashMap::new();
for tx in &self.test_txs {
let tx_result = self
.node
.shared
.txpool
.insert(vec![Arc::new(tx.clone())])
.await
.pop()
.unwrap()
.unwrap();
Expand Down
6 changes: 4 additions & 2 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,10 @@ impl TxMutation {
let txpool = ctx.data_unchecked::<TxPool>();
let config = ctx.data_unchecked::<Config>();
let tx = FuelTx::from_bytes(&tx.0)?;
// TODO: use spawn_blocking here

let _: Vec<_> = txpool
.insert(vec![Arc::new(tx.clone())])
.await
.into_iter()
.try_collect()?;
let id = tx.id(&config.transaction_parameters.chain_id);
Expand Down Expand Up @@ -331,9 +332,10 @@ impl TxStatusSubscription {
let tx = FuelTx::from_bytes(&tx.0)?;
let tx_id = tx.id(&config.transaction_parameters.chain_id);
let subscription = txpool.tx_update_subscribe(tx_id).await;
// TODO: use spawn_blocking here

let _: Vec<_> = txpool
.insert(vec![Arc::new(tx)])
.await
.into_iter()
.try_collect()?;

Expand Down
8 changes: 6 additions & 2 deletions crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl DatabaseChain for Database {

impl DatabasePort for Database {}

#[async_trait]
impl TxPoolPort for TxPoolAdapter {
fn transaction(&self, id: TxId) -> Option<Transaction> {
self.service
Expand All @@ -217,8 +218,11 @@ impl TxPoolPort for TxPoolAdapter {
.map(|info| Tai64::from_unix(info.submitted_time().as_secs() as i64))
}

fn insert(&self, txs: Vec<Arc<Transaction>>) -> Vec<anyhow::Result<InsertionResult>> {
self.service.insert(txs)
async fn insert(
&self,
txs: Vec<Arc<Transaction>>,
) -> Vec<anyhow::Result<InsertionResult>> {
self.service.insert(txs).await
}

fn tx_update_subscribe(&self, id: TxId) -> BoxFuture<BoxStream<TxStatusMessage>> {
Expand Down
7 changes: 4 additions & 3 deletions crates/fuel-core/src/service/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ use super::*;

impl FuelService {
/// Submit a transaction to the txpool.
pub fn submit(&self, tx: Transaction) -> anyhow::Result<InsertionResult> {
pub async fn submit(&self, tx: Transaction) -> anyhow::Result<InsertionResult> {
let results: Vec<_> = self
.shared
.txpool
.insert(vec![Arc::new(tx)])
.await
.into_iter()
.collect::<Result<_, _>>()?;
results
Expand All @@ -49,7 +50,7 @@ impl FuelService {
.transaction_parameters
.chain_id);
let stream = self.transaction_status_change(id).await;
self.submit(tx)?;
self.submit(tx).await?;
Ok(stream)
}

Expand All @@ -68,7 +69,7 @@ impl FuelService {
futures::future::ready(!matches!(status, Ok(TransactionStatus::Submitted(_))))
});
futures::pin_mut!(stream);
self.submit(tx)?;
self.submit(tx).await?;
stream
.next()
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/services/producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fuel-core-storage = { workspace = true }
fuel-core-types = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-rayon = "2"
tokio-rayon = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions crates/services/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ fuel-core-metrics = { workspace = true }
fuel-core-services = { workspace = true }
fuel-core-storage = { workspace = true }
fuel-core-types = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, default-features = false, features = ["sync"] }
tokio-rayon = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }

Expand Down
115 changes: 90 additions & 25 deletions crates/services/txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ use crate::{
TxPoolDb,
},
transaction_selector::select_transactions,
txpool::{
check_single_tx,
check_transactions,
},
Config,
Error as TxPoolError,
TxInfo,
TxPool,
};

use fuel_core_services::{
stream::BoxStream,
RunnableService,
Expand Down Expand Up @@ -45,6 +50,7 @@ use fuel_core_types::{
},
tai64::Tai64,
};

use parking_lot::Mutex as ParkingMutex;
use std::sync::Arc;
use tokio::{
Expand Down Expand Up @@ -114,15 +120,19 @@ pub struct SharedState<P2P, DB> {
txpool: Arc<ParkingMutex<TxPool<DB>>>,
p2p: Arc<P2P>,
consensus_params: ConsensusParameters,
db: DB,
config: Config,
}

impl<P2P, DB> Clone for SharedState<P2P, DB> {
impl<P2P, DB: Clone> Clone for SharedState<P2P, DB> {
fn clone(&self) -> Self {
Self {
tx_status_sender: self.tx_status_sender.clone(),
txpool: self.txpool.clone(),
p2p: self.p2p.clone(),
consensus_params: self.consensus_params,
db: self.db.clone(),
config: self.config.clone(),
}
}
}
Expand All @@ -138,7 +148,7 @@ pub struct Task<P2P, DB> {
impl<P2P, DB> RunnableService for Task<P2P, DB>
where
P2P: PeerToPeer<GossipedTransaction = TransactionGossipData> + Send + Sync,
DB: TxPoolDb,
DB: TxPoolDb + Clone,
{
const NAME: &'static str = "TxPool";

Expand Down Expand Up @@ -205,28 +215,45 @@ where
new_transaction = self.gossiped_tx_stream.next() => {
if let Some(GossipData { data: Some(tx), message_id, peer_id }) = new_transaction {
let id = tx.id(&self.shared.consensus_params.chain_id);
let txs = vec!(Arc::new(tx));
let mut result = tracing::info_span!("Received tx via gossip", %id)
.in_scope(|| {
self.shared.txpool.lock().insert(
&self.shared.tx_status_sender,
&txs
)
});

if let Some(acceptance) = match result.pop() {
Some(Ok(_)) => {
Some(GossipsubMessageAcceptance::Accept)
},
Some(Err(_)) => {
Some(GossipsubMessageAcceptance::Reject)
let current_height = self.shared.db.current_block_height()?;

// verify tx
let checked_tx = check_single_tx(tx, current_height, &self.shared.config).await;

let acceptance = match checked_tx {
Ok(tx) => {
let txs = vec![tx];

// insert tx
let mut result = tracing::info_span!("Received tx via gossip", %id)
.in_scope(|| {
self.shared.txpool.lock().insert(
&self.shared.tx_status_sender,
txs
)
});

match result.pop() {
Some(Ok(_)) => {
GossipsubMessageAcceptance::Accept
},
Some(Err(_)) => {
GossipsubMessageAcceptance::Reject
}
_ => GossipsubMessageAcceptance::Ignore
}
}
_ => None
} {
Err(_) => {
GossipsubMessageAcceptance::Reject
}
};

if acceptance != GossipsubMessageAcceptance::Ignore {
let message_info = GossipsubMessageInfo {
message_id,
peer_id,
};

let _ = self.shared.p2p.notify_gossip_transaction_validity(message_info, acceptance);
}

Expand Down Expand Up @@ -313,13 +340,36 @@ where
DB: TxPoolDb,
{
#[tracing::instrument(name = "insert_submitted_txn", skip_all)]
pub fn insert(
pub async fn insert(
&self,
txs: Vec<Arc<Transaction>>,
) -> Vec<anyhow::Result<InsertionResult>> {
let insert = { self.txpool.lock().insert(&self.tx_status_sender, &txs) };
// verify txs
let block_height = self.db.current_block_height();
let current_height = match block_height {
Ok(val) => val,
Err(e) => return vec![Err(e.into())],
};

let checked_txs = check_transactions(&txs, current_height, &self.config).await;

let mut valid_txs = vec![];

let checked_txs: Vec<_> = checked_txs
.into_iter()
.map(|tx_check| match tx_check {
Ok(tx) => {
valid_txs.push(tx);
None
}
Err(err) => Some(err),
})
.collect();

// insert txs
let insertion = { self.txpool.lock().insert(&self.tx_status_sender, valid_txs) };

for (ret, tx) in insert.iter().zip(txs.into_iter()) {
for (ret, tx) in insertion.iter().zip(txs.into_iter()) {
match ret {
Ok(_) => {
let result = self.p2p.broadcast_transaction(tx.clone());
Expand All @@ -334,7 +384,20 @@ where
Err(_) => {}
}
}
insert

let mut insertion = insertion.into_iter();

checked_txs
.into_iter()
.map(|check_result| match check_result {
None => insertion.next().unwrap_or_else(|| {
unreachable!(
"the number of inserted txs matches the number of `None` results"
)
}),
Some(err) => Err(err),
})
.collect()
}
}

Expand Down Expand Up @@ -373,7 +436,7 @@ pub fn new_service<P2P, Importer, DB>(
where
Importer: BlockImporter,
P2P: PeerToPeer<GossipedTransaction = TransactionGossipData> + 'static,
DB: TxPoolDb + 'static,
DB: TxPoolDb + Clone + 'static,
{
let p2p = Arc::new(p2p);
let gossiped_tx_stream = p2p.gossiped_transaction_events();
Expand All @@ -382,7 +445,7 @@ where
ttl_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
let consensus_params = config.chain_config.transaction_parameters;
let number_of_active_subscription = config.number_of_active_subscription;
let txpool = Arc::new(ParkingMutex::new(TxPool::new(config, db)));
let txpool = Arc::new(ParkingMutex::new(TxPool::new(config.clone(), db.clone())));
let task = Task {
gossiped_tx_stream,
committed_block_stream,
Expand All @@ -391,6 +454,8 @@ where
txpool,
p2p,
consensus_params,
db,
config,
},
ttl_timer,
};
Expand Down
Loading

0 comments on commit 412fb4d

Please sign in to comment.