diff --git a/Cargo.lock b/Cargo.lock index cc3b08bb..c8c3abb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,7 @@ dependencies = [ "silius-uopool", "snap", "tokio", + "tracing", ] [[package]] diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index 50759686..6177f240 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -22,6 +22,7 @@ silius-primitives = { path = "../primitives" } silius-uopool = {path = "../uopool"} discv5 = {version = "0.3.0", features = ["libp2p"]} tokio = { workspace = true } +tracing = { workspace = true } [dependencies.libp2p] version = "0.51.3" diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 04de3158..a63cb6ae 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -1,30 +1,32 @@ use crate::config::Config; use crate::discovery::{self, Discovery}; -use crate::enr::{build_enr, CombineKeyPubExt}; +use crate::enr::build_enr; use crate::gossipsub::Gossipsub; use crate::reqrep::protocol::SUPPORTED_PROTOCOL; use crate::reqrep::{BundlerCodec, BundlerRequestResponse, Request, Response}; use discv5::enr::CombinedKey; use libp2p::gossipsub; +use libp2p::gossipsub::ConfigBuilder; use libp2p::request_response; use libp2p::swarm::NetworkBehaviour; #[derive(NetworkBehaviour)] #[behaviour(out_event = "Event", event_process = false)] pub struct Behaviour { - gossipsub: Gossipsub, - reqrep: BundlerRequestResponse, - discv5: Discovery, + pub gossipsub: Gossipsub, + pub reqrep: BundlerRequestResponse, + pub discv5: Discovery, } impl Behaviour { pub fn new(key: CombinedKey, config: Config) -> anyhow::Result { let enr = build_enr(&key, &config)?; - let gossipsub = Gossipsub::new( - gossipsub::MessageAuthenticity::Author(enr.public_key().to_peer_id()?), - gossipsub::Config::default(), - ) - .map_err(|e| anyhow::anyhow!(e))?; + let gossipsub_config = ConfigBuilder::default() + .validation_mode(gossipsub::ValidationMode::Anonymous) + .build() + .map_err(|e| anyhow::anyhow!(e))?; + let gossipsub = Gossipsub::new(gossipsub::MessageAuthenticity::Anonymous, gossipsub_config) + .map_err(|e| anyhow::anyhow!(e))?; let reqrep = BundlerRequestResponse::new( BundlerCodec {}, SUPPORTED_PROTOCOL.clone().into_iter(), diff --git a/crates/p2p/src/config.rs b/crates/p2p/src/config.rs index 942e9712..1e6b8011 100644 --- a/crates/p2p/src/config.rs +++ b/crates/p2p/src/config.rs @@ -1,10 +1,51 @@ use std::net::{Ipv4Addr, Ipv6Addr}; use discv5::ListenConfig; +use libp2p::{multiaddr::Protocol, Multiaddr}; const DEFAULT_UDP_PORT: u16 = 9000; +const DEFAULT_TCP_PORT: u16 = 9000; +#[derive(Clone, Debug)] +pub struct ListenAddr { + pub addr: Ip, + pub udp_port: u16, + pub tcp_port: u16, +} + +pub enum ListenAddress { + Ipv4(ListenAddr), + Ipv6(ListenAddr), + Dual(ListenAddr, ListenAddr), +} + +impl ListenAddress { + pub fn to_multi_addr(&self) -> Vec { + match self { + ListenAddress::Ipv4(v) => vec![Multiaddr::from(v.addr).with(Protocol::Tcp(v.tcp_port))], + ListenAddress::Ipv6(v) => vec![Multiaddr::from(v.addr).with(Protocol::Tcp(v.tcp_port))], + ListenAddress::Dual(ipv4, ipv6) => { + vec![ + Multiaddr::from(ipv4.addr).with(Protocol::Tcp(ipv4.tcp_port)), + Multiaddr::from(ipv6.addr).with(Protocol::Tcp(ipv6.tcp_port)), + ] + } + } + } +} + +impl Default for ListenAddress { + fn default() -> Self { + Self::Ipv4(ListenAddr { + addr: Ipv4Addr::UNSPECIFIED, + udp_port: DEFAULT_UDP_PORT, + tcp_port: DEFAULT_TCP_PORT, + }) + } +} pub struct Config { + pub listen_addr: ListenAddress, + /// The ipv4 address to broadcast to peers about which address we are listening on. pub ipv4_addr: Option, @@ -27,6 +68,11 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { + listen_addr: ListenAddress::Ipv4(ListenAddr { + addr: Ipv4Addr::UNSPECIFIED, + udp_port: DEFAULT_UDP_PORT, + tcp_port: DEFAULT_TCP_PORT, + }), ipv4_addr: Some(Ipv4Addr::UNSPECIFIED), ipv6_addr: None, enr_udp4_port: Some(DEFAULT_UDP_PORT), @@ -39,21 +85,20 @@ impl Default for Config { impl Config { pub fn to_listen_config(&self) -> ListenConfig { - match (self.ipv4_addr, self.ipv6_addr) { - (None, None) => todo!(), - (None, Some(ip)) => ListenConfig::Ipv6 { - ip, - port: self.enr_udp6_port.unwrap_or(DEFAULT_UDP_PORT), + match &self.listen_addr { + ListenAddress::Ipv4(v) => ListenConfig::Ipv4 { + ip: v.addr, + port: v.udp_port, }, - (Some(ip), None) => ListenConfig::Ipv4 { - ip, - port: self.enr_udp4_port.unwrap_or(DEFAULT_UDP_PORT), + ListenAddress::Ipv6(v) => ListenConfig::Ipv6 { + ip: v.addr, + port: v.udp_port, }, - (Some(ipv4), Some(ipv6)) => ListenConfig::DualStack { - ipv4, - ipv4_port: self.enr_udp4_port.unwrap_or(DEFAULT_UDP_PORT), - ipv6, - ipv6_port: self.enr_udp6_port.unwrap_or(DEFAULT_UDP_PORT), + ListenAddress::Dual(ipv4, ipv6) => ListenConfig::DualStack { + ipv4: ipv4.addr, + ipv4_port: ipv4.udp_port, + ipv6: ipv6.addr, + ipv6_port: ipv6.udp_port, }, } } diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 9511f2e8..361857a9 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -17,50 +17,7 @@ pub mod config; pub mod discovery; pub mod enr; pub mod gossipsub; +pub mod network; pub mod reqrep; - -struct TokioExecutor; -impl libp2p::swarm::Executor for TokioExecutor { - fn exec(&self, future: std::pin::Pin + Send>>) { - tokio::spawn(future); - } -} - -pub fn setup(private_key: Keypair) -> anyhow::Result> { - let transport = build_transport(private_key.clone())?; - let combine_key = keypair_to_combine(private_key)?; - let config = config::Config::default(); - let enr = enr::build_enr(&combine_key, &config)?; - let behaviour = Behaviour::new(combine_key, config)?; - let peer_id = enr.public_key().to_peer_id()?; - - Ok(SwarmBuilder::with_executor(transport, behaviour, peer_id, TokioExecutor).build()) -} - -pub fn build_transport(private_key: Keypair) -> std::io::Result> { - let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)); - let transport = libp2p::dns::TokioDnsConfig::system(tcp)?; - - // mplex config - let mut mplex_config = libp2p::mplex::MplexConfig::new(); - mplex_config.set_max_buffer_size(256); - mplex_config.set_max_buffer_behaviour(libp2p::mplex::MaxBufferBehaviour::Block); - - // yamux config - let mut yamux_config = libp2p::yamux::Config::default(); - yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::on_read()); - - // noise config - let noise_config = noise::Config::new(&private_key) - .expect("signing can fail only once during starting a node"); - // Authentication - Ok(transport - .upgrade(core::upgrade::Version::V1) - .authenticate(noise_config) - .multiplex(core::upgrade::SelectUpgrade::new( - yamux_config, - mplex_config, - )) - .timeout(Duration::from_secs(10)) - .boxed()) -} +#[cfg(test)] +mod tests; diff --git a/crates/p2p/src/network.rs b/crates/p2p/src/network.rs new file mode 100644 index 00000000..1d4ef559 --- /dev/null +++ b/crates/p2p/src/network.rs @@ -0,0 +1,92 @@ +use std::{ + task::{Context, Poll}, + time::Duration, +}; + +use discv5::enr::CombinedKey; +use libp2p::{ + core::{muxing::StreamMuxerBox, transport::Boxed, upgrade}, + futures::StreamExt, + identity::Keypair, + noise, + swarm::SwarmBuilder, + PeerId, Swarm, Transport, +}; +use tracing::info; + +use crate::{ + behaviour::Behaviour, + config::Config, + enr::{build_enr, keypair_to_combine, CombineKeyPubExt}, +}; + +struct TokioExecutor; +impl libp2p::swarm::Executor for TokioExecutor { + fn exec(&self, future: std::pin::Pin + Send>>) { + tokio::spawn(future); + } +} + +pub struct Network { + swarm: Swarm, +} + +impl Network { + pub fn new(key: Keypair, config: Config) -> anyhow::Result { + let transport = Self::build_transport(key.clone())?; + let combine_key = keypair_to_combine(key)?; + let enr = build_enr(&combine_key, &config)?; + let behaviour = Behaviour::new(combine_key, config)?; + let peer_id = enr.public_key().to_peer_id()?; + + let swarm = + SwarmBuilder::with_executor(transport, behaviour, peer_id, TokioExecutor).build(); + Ok(Self { swarm }) + } + + fn build_transport(private_key: Keypair) -> std::io::Result> { + let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)); + let transport = libp2p::dns::TokioDnsConfig::system(tcp)?; + + // mplex config + let mut mplex_config = libp2p::mplex::MplexConfig::new(); + mplex_config.set_max_buffer_size(256); + mplex_config.set_max_buffer_behaviour(libp2p::mplex::MaxBufferBehaviour::Block); + + // yamux config + let mut yamux_config = libp2p::yamux::Config::default(); + yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::on_read()); + + // noise config + let noise_config = noise::Config::new(&private_key) + .expect("signing can fail only once during starting a node"); + // Authentication + Ok(transport + .upgrade(upgrade::Version::V1) + .authenticate(noise_config) + .multiplex(upgrade::SelectUpgrade::new(yamux_config, mplex_config)) + .timeout(Duration::from_secs(10)) + .boxed()) + } + + pub fn poll_network(&mut self, cx: &mut Context) -> Poll<()> { + while let Poll::Ready(Some(swarm_event)) = self.swarm.poll_next_unpin(cx) { + info!("Swarm event: {:?}", swarm_event); + // match swarm_event{ + // libp2p::swarm::SwarmEvent::Behaviour(_) => todo!(), + // libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established, concurrent_dial_errors, established_in } => todo!(), + // libp2p::swarm::SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause } => todo!(), + // libp2p::swarm::SwarmEvent::IncomingConnection { local_addr, send_back_addr } => todo!(), + // libp2p::swarm::SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error } => todo!(), + // libp2p::swarm::SwarmEvent::OutgoingConnectionError { peer_id, error } => todo!(), + // libp2p::swarm::SwarmEvent::BannedPeer { peer_id, endpoint } => todo!(), + // libp2p::swarm::SwarmEvent::NewListenAddr { listener_id, address } => todo!(), + // libp2p::swarm::SwarmEvent::ExpiredListenAddr { listener_id, address } => todo!(), + // libp2p::swarm::SwarmEvent::ListenerClosed { listener_id, addresses, reason } => todo!(), + // libp2p::swarm::SwarmEvent::ListenerError { listener_id, error } => todo!(), + // libp2p::swarm::SwarmEvent::Dialing(_) => todo!(), + // } + } + Poll::Pending + } +} diff --git a/crates/p2p/src/reqrep/protocol.rs b/crates/p2p/src/reqrep/protocol.rs index 4ea6e345..6b9f222f 100644 --- a/crates/p2p/src/reqrep/protocol.rs +++ b/crates/p2p/src/reqrep/protocol.rs @@ -117,3 +117,27 @@ impl ProtocolName for ProtocolId { self.protocol_id.as_bytes() } } + +#[cfg(test)] +mod test { + use super::SUPPORTED_PROTOCOL; + + #[test] + fn test_protoco() { + let protocols = SUPPORTED_PROTOCOL + .iter() + .map(|(protocol_id, _)| protocol_id.protocol_id.clone()) + .collect::>(); + assert_eq!( + protocols, + vec![ + "/account_abstraction/erc4337/req/status/1/ssz_snappy", + "/account_abstraction/erc4337/req/goodbye/1/ssz_snappy", + "/account_abstraction/erc4337/req/ping/1/ssz_snappy", + "/account_abstraction/erc4337/req/metadata/1/ssz_snappy", + "/account_abstraction/erc4337/req/pooled_user_ops_hashes/1/ssz_snappy", + "/account_abstraction/erc4337/req/pooled_user_ops_by_hash/1/ssz_snappy" + ] + ) + } +} diff --git a/crates/p2p/src/tests/mod.rs b/crates/p2p/src/tests/mod.rs new file mode 100644 index 00000000..28993e04 --- /dev/null +++ b/crates/p2p/src/tests/mod.rs @@ -0,0 +1,20 @@ +use std::net::TcpListener; +mod swarm; + +pub fn get_available_port() -> Option { + let unused_port: u16; + loop { + let socket_addr = std::net::SocketAddr::new(std::net::Ipv4Addr::LOCALHOST.into(), 0); + match TcpListener::bind(socket_addr) { + Ok(listener) => match listener.local_addr().map(|s| s.port()) { + Ok(p) => { + unused_port = p; + break; + } + Err(_) => {} + }, + Err(_) => {} + } + } + Some(unused_port) +} diff --git a/crates/p2p/src/tests/swarm.rs b/crates/p2p/src/tests/swarm.rs new file mode 100644 index 00000000..58dd2bac --- /dev/null +++ b/crates/p2p/src/tests/swarm.rs @@ -0,0 +1,60 @@ +use std::{net::Ipv4Addr, time::Duration}; + +use libp2p::{gossipsub::TopicHash, identity::Keypair}; + +use crate::{ + config::{Config, ListenAddr}, + setup, +}; + +use super::get_available_port; + +#[tokio::test] +async fn req_rep() -> anyhow::Result<()> { + let key1 = Keypair::generate_secp256k1(); + let config1 = Config { + listen_addr: crate::config::ListenAddress::Ipv4(ListenAddr { + addr: Ipv4Addr::LOCALHOST, + udp_port: get_available_port().unwrap(), + tcp_port: get_available_port().unwrap(), + }), + ipv4_addr: Some(Ipv4Addr::LOCALHOST), + ipv6_addr: None, + enr_udp4_port: get_available_port(), + enr_tcp4_port: None, + enr_udp6_port: None, + enr_tcp6_port: None, + }; + let peer1_listen_addr = config1.listen_addr.to_multi_addr().first().unwrap().clone(); + let mut peer1 = setup(key1, config1)?; + peer1.listen_on(peer1_listen_addr)?; + + let key2 = Keypair::generate_secp256k1(); + let config2 = Config { + listen_addr: crate::config::ListenAddress::Ipv4(ListenAddr { + addr: Ipv4Addr::LOCALHOST, + udp_port: get_available_port().unwrap(), + tcp_port: get_available_port().unwrap(), + }), + ipv4_addr: Some(Ipv4Addr::LOCALHOST), + ipv6_addr: None, + enr_udp4_port: get_available_port(), + enr_tcp4_port: None, + enr_udp6_port: None, + enr_tcp6_port: None, + }; + let peer2_listen_addr = config2.listen_addr.to_multi_addr().first().unwrap().clone(); + let mut peer2 = setup(key2, config2)?; + peer2.listen_on(peer2_listen_addr.clone())?; + + peer1.dial(peer2_listen_addr)?; + tokio::time::sleep(Duration::from_secs(2)).await; + + peer1 + .behaviour_mut() + .gossipsub + .publish(TopicHash::from_raw("test"), "hello world")?; + tokio::time::sleep(Duration::from_secs(10)).await; + + Ok(()) +}