Skip to content

Commit

Permalink
Merge branch 'master' into dento/fix-1334
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx authored Oct 3, 2023
2 parents 14bc8ff + 5d1a9a3 commit 01faab2
Show file tree
Hide file tree
Showing 22 changed files with 1,086 additions and 665 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Description of the upcoming release here.

### Changed

- [#1349](https://github.com/FuelLabs/fuel-core/pull/1349): Updated peer-to-peer transactions API to support multiple blocks in a single request, and updated block synchronization to request multiple blocks based on the configured range of headers.
- [#1380](https://github.com/FuelLabs/fuel-core/pull/1380): Add preliminary, hard-coded config values for heartbeat peer reputation, removing `todo`.
- [#1377](https://github.com/FuelLabs/fuel-core/pull/1377): Remove `DiscoveryEvent` and use `KademliaEvent` directly in `DiscoveryBehavior`.
- [#1366](https://github.com/FuelLabs/fuel-core/pull/1366): Improve caching during docker builds in CI by replacing gha
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ anyhow = "1.0"
async-trait = "0.1"
cynic = { version = "2.2.1", features = ["http-reqwest"] }
clap = "4.1"
derive_more = { version = "0.99" }
hyper = { version = "0.14.26" }
rand = "0.8"
parking_lot = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion crates/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ description = "Tx client and schema specification."
[dependencies]
anyhow = { workspace = true }
cynic = { workspace = true }
derive_more = { version = "0.99" }
derive_more = { workspace = true }
eventsource-client = { version = "0.10.2", optional = true }
fuel-core-types = { workspace = true, features = ["serde"] }
futures = { workspace = true, optional = true }
Expand Down
23 changes: 16 additions & 7 deletions crates/fuel-core/src/database/sealed_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use fuel_core_types::{
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use std::ops::Range;

Expand Down Expand Up @@ -127,12 +127,21 @@ impl Database {
}
}

pub fn get_transactions_on_block(
pub fn get_transactions_on_blocks(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
Ok(self
.get_sealed_block_by_id(block_id)?
.map(|Sealed { entity: block, .. }| block.into_inner().1))
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>> {
let transactions = block_height_range
.into_iter()
.map(BlockHeight::from)
.map(|block_height| {
let transactions = self
.get_sealed_block_by_height(&block_height)?
.map(|Sealed { entity: block, .. }| block.into_inner().1)
.map(Transactions);
Ok(transactions)
})
.collect::<StorageResult<_>>()?;
Ok(transactions)
}
}
9 changes: 4 additions & 5 deletions crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::{
blockchain::{
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use std::ops::Range;

Expand Down Expand Up @@ -41,9 +40,9 @@ impl P2pDb for Database {

fn get_transactions(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
self.get_transactions_on_block(block_id)
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>> {
self.get_transactions_on_blocks(block_height_range)
}
}

Expand Down
43 changes: 19 additions & 24 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@ use fuel_core_sync::ports::{
};
use fuel_core_types::{
blockchain::{
primitives::{
BlockId,
DaBlockHeight,
},
primitives::DaBlockHeight,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::{
peer_reputation::{
Expand All @@ -28,6 +24,7 @@ use fuel_core_types::{
},
PeerId,
SourcePeer,
Transactions,
},
};
use std::ops::Range;
Expand All @@ -50,43 +47,41 @@ impl PeerToPeerPort for P2PAdapter {

async fn get_sealed_block_headers(
&self,
block_range_height: Range<u32>,
block_height_range: Range<u32>,
) -> anyhow::Result<SourcePeer<Option<Vec<SealedBlockHeader>>>> {
if let Some(service) = &self.service {
let (peer_id, headers) =
service.get_sealed_block_headers(block_range_height).await?;
let sourced_headers = SourcePeer {
peer_id: peer_id.into(),
data: headers,
};
Ok(sourced_headers)
let result = if let Some(service) = &self.service {
service.get_sealed_block_headers(block_height_range).await
} else {
Err(anyhow::anyhow!("No P2P service available"))
};
match result {
Ok((peer_id, headers)) => {
let peer_id: PeerId = peer_id.into();
let headers = peer_id.bind(headers);
Ok(headers)
}
Err(err) => Err(err),
}
}

async fn get_transactions(
&self,
block: SourcePeer<BlockId>,
) -> anyhow::Result<Option<Vec<Transaction>>> {
range: SourcePeer<Range<u32>>,
) -> anyhow::Result<Option<Vec<Transactions>>> {
let SourcePeer {
peer_id,
data: block,
} = block;
data: range,
} = range;
if let Some(service) = &self.service {
service
.get_transactions_from_peer(peer_id.into(), block)
.get_transactions_from_peer(peer_id.into(), range)
.await
} else {
Err(anyhow::anyhow!("No P2P service available"))
}
}

async fn report_peer(
&self,
peer: PeerId,
report: PeerReportReason,
) -> anyhow::Result<()> {
fn report_peer(&self, peer: PeerId, report: PeerReportReason) -> anyhow::Result<()> {
if let Some(service) = &self.service {
let service_name = "Sync";
let new_report = self.process_report(report);
Expand Down
5 changes: 2 additions & 3 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,12 @@ impl ProtocolName for MessageExchangePostcardProtocol {

#[cfg(test)]
mod tests {
use fuel_core_types::blockchain::primitives::BlockId;

use super::*;

#[test]
fn test_request_size_fits() {
let m = RequestMessage::Transactions(BlockId::default());
let arbitrary_range = 2..6;
let m = RequestMessage::Transactions(arbitrary_range);
assert!(postcard::to_stdvec(&m).unwrap().len() <= MAX_REQUEST_SIZE);
}
}
23 changes: 14 additions & 9 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,15 +696,17 @@ mod tests {
BlockHeader,
PartialBlockHeader,
},
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::{
Transaction,
TransactionBuilder,
},
services::p2p::GossipsubMessageAcceptance,
services::p2p::{
GossipsubMessageAcceptance,
Transactions,
},
};
use futures::{
future::join_all,
Expand Down Expand Up @@ -1557,7 +1559,7 @@ mod tests {
tokio::select! {
message_sent = rx_test_end.recv() => {
// we received a signal to end the test
assert!(message_sent.unwrap(), "Receuved incorrect or missing missing messsage");
assert!(message_sent.unwrap(), "Received incorrect or missing message");
break;
}
node_a_event = node_a.next_event() => {
Expand Down Expand Up @@ -1604,7 +1606,7 @@ mod tests {
}
});
}
RequestMessage::Transactions(_) => {
RequestMessage::Transactions(_range) => {
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::Transactions(tx_orchestrator)).is_ok());
let tx_test_end = tx_test_end.clone();
Expand All @@ -1613,7 +1615,8 @@ mod tests {
let response_message = rx_orchestrator.await;

if let Ok(Some(transactions)) = response_message {
let _ = tx_test_end.send(transactions.len() == 5).await;
let check = transactions.len() == 1 && transactions[0].0.len() == 5;
let _ = tx_test_end.send(check).await;
} else {
tracing::error!("Orchestrator failed to receive a message: {:?}", response_message);
let _ = tx_test_end.send(false).await;
Expand Down Expand Up @@ -1647,7 +1650,8 @@ mod tests {
let _ = node_b.send_response_msg(*request_id, OutboundResponse::SealedHeaders(Some(sealed_headers)));
}
RequestMessage::Transactions(_) => {
let transactions = (0..5).map(|_| Transaction::default_test_tx()).collect();
let txs = (0..5).map(|_| Transaction::default_test_tx()).collect();
let transactions = vec![Transactions(txs)];
let _ = node_b.send_response_msg(*request_id, OutboundResponse::Transactions(Some(Arc::new(transactions))));
}
}
Expand All @@ -1662,8 +1666,8 @@ mod tests {
#[tokio::test]
#[instrument]
async fn request_response_works_with_transactions() {
request_response_works_with(RequestMessage::Transactions(BlockId::default()))
.await
let arbitrary_range = 2..6;
request_response_works_with(RequestMessage::Transactions(arbitrary_range)).await
}

#[tokio::test]
Expand All @@ -1675,7 +1679,8 @@ mod tests {
#[tokio::test]
#[instrument]
async fn request_response_works_with_sealed_headers_range_inclusive() {
request_response_works_with(RequestMessage::SealedHeaders(2..6)).await
let arbitrary_range = 2..6;
request_response_works_with(RequestMessage::SealedHeaders(arbitrary_range)).await
}

#[tokio::test]
Expand Down
7 changes: 3 additions & 4 deletions crates/services/p2p/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::{
blockchain::{
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use std::ops::Range;

Expand All @@ -29,8 +28,8 @@ pub trait P2pDb: Send + Sync {

fn get_transactions(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>>;
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>>;
}

pub trait BlockHeightImporter: Send + Sync {
Expand Down
18 changes: 6 additions & 12 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@ use std::{

use fuel_core_types::{
blockchain::{
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use libp2p::PeerId;
use serde::{
Deserialize,
Serialize,
};
use serde_with::{
serde_as,
FromInto,
};
use thiserror::Error;
use tokio::sync::oneshot;

Expand All @@ -34,33 +29,32 @@ pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::<RequestMessage>(
// This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format.
// The Peer that requested the message receives the response over the wire in `NetworkResponse` format.
// It then unpacks it into `ResponseMessage`.
// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receving channel.
// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receiving channel.
// Client Peer: `RequestMessage` (send request)
// Server Peer: `RequestMessage` (receive request) -> `OutboundResponse` -> `NetworkResponse` (send response)
// Client Peer: `NetworkResponse` (receive response) -> `ResponseMessage(data)` -> `ResponseChannelItem(channel, data)` (handle response)

#[serde_as]
#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)]
pub enum RequestMessage {
Block(BlockHeight),
SealedHeaders(Range<u32>),
Transactions(#[serde_as(as = "FromInto<[u8; 32]>")] BlockId),
Transactions(Range<u32>),
}

/// Final Response Message that p2p service sends to the Orchestrator
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ResponseMessage {
SealedBlock(Box<Option<SealedBlock>>),
SealedHeaders(Option<Vec<SealedBlockHeader>>),
Transactions(Option<Vec<Transaction>>),
Transactions(Option<Vec<Transactions>>),
}

/// Holds oneshot channels for specific responses
#[derive(Debug)]
pub enum ResponseChannelItem {
Block(oneshot::Sender<Option<SealedBlock>>),
SealedHeaders(oneshot::Sender<(PeerId, Option<Vec<SealedBlockHeader>>)>),
Transactions(oneshot::Sender<Option<Vec<Transaction>>>),
Transactions(oneshot::Sender<Option<Vec<Transactions>>>),
}

/// Response that is sent over the wire
Expand All @@ -78,7 +72,7 @@ pub enum NetworkResponse {
pub enum OutboundResponse {
Block(Option<Arc<SealedBlock>>),
SealedHeaders(Option<Vec<SealedBlockHeader>>),
Transactions(Option<Arc<Vec<Transaction>>>),
Transactions(Option<Arc<Vec<Transactions>>>),
}

#[derive(Debug, Error)]
Expand Down
Loading

0 comments on commit 01faab2

Please sign in to comment.