Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add litep2p network protocol benches #6455

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions prdoc/pr_6455.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
title: Add litep2p network protocol benches
doc:
- audience: Node Dev
description: |-
Adds networking protocol benchmarks with litep2p backend
crates:
- name: sc-network
validate: false
114 changes: 86 additions & 28 deletions substrate/client/network/benches/notifications_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ use criterion::{
};
use sc_network::{
config::{
FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig,
NonReservedPeerMode, NotificationHandshake, Params, ProtocolId, Role, SetConfig,
FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonReservedPeerMode,
NotificationHandshake, Params, ProtocolId, Role, SetConfig,
},
service::traits::NotificationEvent,
NetworkWorker, NotificationMetrics, NotificationService, Roles,
Litep2pNetworkBackend, NetworkBackend, NetworkWorker, NotificationMetrics, NotificationService,
Roles,
};
use sc_network_common::sync::message::BlockAnnouncesHandshake;
use sc_network_common::{sync::message::BlockAnnouncesHandshake, ExHashT};
use sc_network_types::build_multiaddr;
use sp_runtime::traits::Zero;
use sp_core::H256;
use sp_runtime::traits::{Block as BlockT, Zero};
use std::{
net::{IpAddr, Ipv4Addr, TcpListener},
str::FromStr,
Expand Down Expand Up @@ -61,12 +63,20 @@ fn get_listen_address() -> sc_network::Multiaddr {
build_multiaddr!(Ip4(ip), Tcp(port))
}

pub fn create_network_worker(
fn create_network_worker<B, H, N>(
listen_addr: sc_network::Multiaddr,
) -> (NetworkWorker<runtime::Block, runtime::Hash>, Box<dyn NotificationService>) {
) -> (N, Box<dyn NotificationService>)
where
B: BlockT<Hash = H256> + 'static,
H: ExHashT,
N: NetworkBackend<B, H>,
{
let role = Role::Full;
let mut net_conf = NetworkConfiguration::new_local();
net_conf.listen_addresses = vec![listen_addr];
let network_config = FullNetworkConfiguration::<B, H, N>::new(&net_conf, None);
let genesis_hash = runtime::Hash::zero();
let (block_announce_config, notification_service) = NonDefaultSetConfig::new(
let (block_announce_config, notification_service) = N::notification_config(
"/block-announces/1".into(),
vec!["/bench-notifications-protocol/block-announces/1".into()],
MAX_SIZE,
Expand All @@ -82,21 +92,17 @@ pub fn create_network_worker(
reserved_nodes: vec![],
non_reserved_mode: NonReservedPeerMode::Accept,
},
NotificationMetrics::new(None),
network_config.peer_store_handle(),
);
let mut net_conf = NetworkConfiguration::new_local();
net_conf.listen_addresses = vec![listen_addr];
let worker = NetworkWorker::<runtime::Block, runtime::Hash>::new(Params::<
runtime::Block,
runtime::Hash,
NetworkWorker<_, _>,
> {
let worker = N::new(Params::<B, H, N> {
block_announce_config,
role,
executor: Box::new(|f| {
tokio::spawn(f);
}),
genesis_hash,
network_config: FullNetworkConfiguration::new(&net_conf, None),
network_config,
protocol_id: ProtocolId::from("bench-protocol-name"),
fork_id: None,
metrics_registry: None,
Expand All @@ -108,14 +114,21 @@ pub fn create_network_worker(
(worker, notification_service)
}

async fn run_serially(size: usize, limit: usize) {
async fn run_serially<B, H, N>(size: usize, limit: usize)
where
B: BlockT<Hash = H256> + 'static,
H: ExHashT,
N: NetworkBackend<B, H>,
{
let listen_address1 = get_listen_address();
let listen_address2 = get_listen_address();
let (worker1, mut notification_service1) = create_network_worker(listen_address1);
let (worker2, mut notification_service2) = create_network_worker(listen_address2.clone());
let peer_id2: sc_network::PeerId = (*worker2.local_peer_id()).into();
let (worker1, mut notification_service1) = create_network_worker::<B, H, N>(listen_address1);
let (worker2, mut notification_service2) =
create_network_worker::<B, H, N>(listen_address2.clone());
let peer_id2: sc_network::PeerId = worker2.network_service().local_peer_id().into();

worker1
.network_service()
.add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 })
.unwrap();

Expand All @@ -124,24 +137,33 @@ async fn run_serially(size: usize, limit: usize) {
let (tx, rx) = async_channel::bounded(10);

let network1 = tokio::spawn(async move {
let mut sent_counter = 0;
tokio::pin!(network1_run);
loop {
tokio::select! {
_ = &mut network1_run => {},
event = notification_service1.next_event() => {
match event {
Some(NotificationEvent::NotificationStreamOpened { .. }) => {
sent_counter += 1;
notification_service1
.send_async_notification(&peer_id2, vec![0; size])
.await
.unwrap();
},
Some(NotificationEvent::NotificationStreamClosed { .. }) => {
if sent_counter >= limit {
break;
}
panic!("Unexpected stream closure {:?}", event);
}
event => panic!("Unexpected event {:?}", event),
};
},
message = rx.recv() => {
match message {
Ok(Some(_)) => {
sent_counter += 1;
notification_service1
.send_async_notification(&peer_id2, vec![0; size])
.await
Expand Down Expand Up @@ -185,14 +207,21 @@ async fn run_serially(size: usize, limit: usize) {
let _ = tokio::join!(network1, network2);
}

async fn run_with_backpressure(size: usize, limit: usize) {
async fn run_with_backpressure<B, H, N>(size: usize, limit: usize)
where
B: BlockT<Hash = H256> + 'static,
H: ExHashT,
N: NetworkBackend<B, H>,
{
let listen_address1 = get_listen_address();
let listen_address2 = get_listen_address();
let (worker1, mut notification_service1) = create_network_worker(listen_address1);
let (worker2, mut notification_service2) = create_network_worker(listen_address2.clone());
let peer_id2: sc_network::PeerId = (*worker2.local_peer_id()).into();
let (worker1, mut notification_service1) = create_network_worker::<B, H, N>(listen_address1);
let (worker2, mut notification_service2) =
create_network_worker::<B, H, N>(listen_address2.clone());
let peer_id2: sc_network::PeerId = worker2.network_service().local_peer_id().into();

worker1
.network_service()
.add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 })
.unwrap();

Expand Down Expand Up @@ -265,18 +294,47 @@ fn run_benchmark(c: &mut Criterion) {
for &(exponent, label) in EXPONENTS.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(NOTIFICATIONS as u64 * size as u64));

group.bench_with_input(
BenchmarkId::new("libp2p/serially", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| {
run_serially::<runtime::Block, runtime::Hash, NetworkWorker<_, _>>(size, limit)
});
},
);
group.bench_with_input(
BenchmarkId::new("litep2p/serially", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| {
run_serially::<runtime::Block, runtime::Hash, Litep2pNetworkBackend>(
size, limit,
)
});
},
);
group.bench_with_input(
BenchmarkId::new("consistently", label),
BenchmarkId::new("libp2p/with_backpressure", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_serially(size, limit));
b.to_async(&rt).iter(|| {
run_with_backpressure::<runtime::Block, runtime::Hash, NetworkWorker<_, _>>(
size, limit,
)
});
},
);
group.bench_with_input(
BenchmarkId::new("with_backpressure", label),
BenchmarkId::new("litep2p/with_backpressure", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_with_backpressure(size, limit));
b.to_async(&rt).iter(|| {
run_with_backpressure::<runtime::Block, runtime::Hash, Litep2pNetworkBackend>(
size, limit,
)
});
},
);
}
Expand Down
Loading
Loading