Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
zsluedem committed Jul 28, 2023
1 parent 3ff0a38 commit bcf4d43
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 68 deletions.
1 change: 1 addition & 0 deletions crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ silius-primitives = { path = "../primitives" }
silius-uopool = {path = "../uopool"}
discv5 = {version = "0.3.0", features = ["libp2p"]}
tokio = { workspace = true }
tracing = { workspace = true }

[dependencies.libp2p]
version = "0.51.3"
Expand Down
20 changes: 11 additions & 9 deletions crates/p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
use crate::config::Config;
use crate::discovery::{self, Discovery};
use crate::enr::{build_enr, CombineKeyPubExt};
use crate::enr::build_enr;
use crate::gossipsub::Gossipsub;
use crate::reqrep::protocol::SUPPORTED_PROTOCOL;
use crate::reqrep::{BundlerCodec, BundlerRequestResponse, Request, Response};
use discv5::enr::CombinedKey;
use libp2p::gossipsub;
use libp2p::gossipsub::ConfigBuilder;
use libp2p::request_response;
use libp2p::swarm::NetworkBehaviour;

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event", event_process = false)]
pub struct Behaviour {
gossipsub: Gossipsub,
reqrep: BundlerRequestResponse,
discv5: Discovery,
pub gossipsub: Gossipsub,
pub reqrep: BundlerRequestResponse,
pub discv5: Discovery,
}

impl Behaviour {
pub fn new(key: CombinedKey, config: Config) -> anyhow::Result<Self> {
let enr = build_enr(&key, &config)?;
let gossipsub = Gossipsub::new(
gossipsub::MessageAuthenticity::Author(enr.public_key().to_peer_id()?),
gossipsub::Config::default(),
)
.map_err(|e| anyhow::anyhow!(e))?;
let gossipsub_config = ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Anonymous)
.build()
.map_err(|e| anyhow::anyhow!(e))?;
let gossipsub = Gossipsub::new(gossipsub::MessageAuthenticity::Anonymous, gossipsub_config)
.map_err(|e| anyhow::anyhow!(e))?;
let reqrep = BundlerRequestResponse::new(
BundlerCodec {},
SUPPORTED_PROTOCOL.clone().into_iter(),
Expand Down
71 changes: 58 additions & 13 deletions crates/p2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,51 @@
use std::net::{Ipv4Addr, Ipv6Addr};

use discv5::ListenConfig;
use libp2p::{multiaddr::Protocol, Multiaddr};

const DEFAULT_UDP_PORT: u16 = 9000;
const DEFAULT_TCP_PORT: u16 = 9000;
#[derive(Clone, Debug)]
pub struct ListenAddr<Ip> {
pub addr: Ip,
pub udp_port: u16,
pub tcp_port: u16,
}

pub enum ListenAddress {
Ipv4(ListenAddr<Ipv4Addr>),
Ipv6(ListenAddr<Ipv6Addr>),
Dual(ListenAddr<Ipv4Addr>, ListenAddr<Ipv6Addr>),
}

impl ListenAddress {
pub fn to_multi_addr(&self) -> Vec<Multiaddr> {
match self {
ListenAddress::Ipv4(v) => vec![Multiaddr::from(v.addr).with(Protocol::Tcp(v.tcp_port))],
ListenAddress::Ipv6(v) => vec![Multiaddr::from(v.addr).with(Protocol::Tcp(v.tcp_port))],
ListenAddress::Dual(ipv4, ipv6) => {
vec![
Multiaddr::from(ipv4.addr).with(Protocol::Tcp(ipv4.tcp_port)),
Multiaddr::from(ipv6.addr).with(Protocol::Tcp(ipv6.tcp_port)),
]
}
}
}
}

impl Default for ListenAddress {
fn default() -> Self {
Self::Ipv4(ListenAddr {
addr: Ipv4Addr::UNSPECIFIED,
udp_port: DEFAULT_UDP_PORT,
tcp_port: DEFAULT_TCP_PORT,
})
}
}

pub struct Config {
pub listen_addr: ListenAddress,

/// The ipv4 address to broadcast to peers about which address we are listening on.
pub ipv4_addr: Option<Ipv4Addr>,

Expand All @@ -27,6 +68,11 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
listen_addr: ListenAddress::Ipv4(ListenAddr {
addr: Ipv4Addr::UNSPECIFIED,
udp_port: DEFAULT_UDP_PORT,
tcp_port: DEFAULT_TCP_PORT,
}),
ipv4_addr: Some(Ipv4Addr::UNSPECIFIED),
ipv6_addr: None,
enr_udp4_port: Some(DEFAULT_UDP_PORT),
Expand All @@ -39,21 +85,20 @@ impl Default for Config {

impl Config {
pub fn to_listen_config(&self) -> ListenConfig {
match (self.ipv4_addr, self.ipv6_addr) {
(None, None) => todo!(),
(None, Some(ip)) => ListenConfig::Ipv6 {
ip,
port: self.enr_udp6_port.unwrap_or(DEFAULT_UDP_PORT),
match &self.listen_addr {
ListenAddress::Ipv4(v) => ListenConfig::Ipv4 {
ip: v.addr,
port: v.udp_port,
},
(Some(ip), None) => ListenConfig::Ipv4 {
ip,
port: self.enr_udp4_port.unwrap_or(DEFAULT_UDP_PORT),
ListenAddress::Ipv6(v) => ListenConfig::Ipv6 {
ip: v.addr,
port: v.udp_port,
},
(Some(ipv4), Some(ipv6)) => ListenConfig::DualStack {
ipv4,
ipv4_port: self.enr_udp4_port.unwrap_or(DEFAULT_UDP_PORT),
ipv6,
ipv6_port: self.enr_udp6_port.unwrap_or(DEFAULT_UDP_PORT),
ListenAddress::Dual(ipv4, ipv6) => ListenConfig::DualStack {
ipv4: ipv4.addr,
ipv4_port: ipv4.udp_port,
ipv6: ipv6.addr,
ipv6_port: ipv6.udp_port,
},
}
}
Expand Down
49 changes: 3 additions & 46 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,7 @@ pub mod config;
pub mod discovery;
pub mod enr;
pub mod gossipsub;
pub mod network;
pub mod reqrep;

struct TokioExecutor;
impl libp2p::swarm::Executor for TokioExecutor {
fn exec(&self, future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>) {
tokio::spawn(future);
}
}

pub fn setup(private_key: Keypair) -> anyhow::Result<Swarm<behaviour::Behaviour>> {
let transport = build_transport(private_key.clone())?;
let combine_key = keypair_to_combine(private_key)?;
let config = config::Config::default();
let enr = enr::build_enr(&combine_key, &config)?;
let behaviour = Behaviour::new(combine_key, config)?;
let peer_id = enr.public_key().to_peer_id()?;

Ok(SwarmBuilder::with_executor(transport, behaviour, peer_id, TokioExecutor).build())
}

pub fn build_transport(private_key: Keypair) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>> {
let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true));
let transport = libp2p::dns::TokioDnsConfig::system(tcp)?;

// mplex config
let mut mplex_config = libp2p::mplex::MplexConfig::new();
mplex_config.set_max_buffer_size(256);
mplex_config.set_max_buffer_behaviour(libp2p::mplex::MaxBufferBehaviour::Block);

// yamux config
let mut yamux_config = libp2p::yamux::Config::default();
yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::on_read());

// noise config
let noise_config = noise::Config::new(&private_key)
.expect("signing can fail only once during starting a node");
// Authentication
Ok(transport
.upgrade(core::upgrade::Version::V1)
.authenticate(noise_config)
.multiplex(core::upgrade::SelectUpgrade::new(
yamux_config,
mplex_config,
))
.timeout(Duration::from_secs(10))
.boxed())
}
#[cfg(test)]
mod tests;
92 changes: 92 additions & 0 deletions crates/p2p/src/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::{
task::{Context, Poll},
time::Duration,
};

use discv5::enr::CombinedKey;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed, upgrade},
futures::StreamExt,
identity::Keypair,
noise,
swarm::SwarmBuilder,
PeerId, Swarm, Transport,
};
use tracing::info;

use crate::{
behaviour::Behaviour,
config::Config,
enr::{build_enr, keypair_to_combine, CombineKeyPubExt},
};

struct TokioExecutor;
impl libp2p::swarm::Executor for TokioExecutor {
fn exec(&self, future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>) {
tokio::spawn(future);
}
}

pub struct Network {
swarm: Swarm<Behaviour>,
}

impl Network {
pub fn new(key: Keypair, config: Config) -> anyhow::Result<Self> {
let transport = Self::build_transport(key.clone())?;
let combine_key = keypair_to_combine(key)?;
let enr = build_enr(&combine_key, &config)?;
let behaviour = Behaviour::new(combine_key, config)?;
let peer_id = enr.public_key().to_peer_id()?;

let swarm =
SwarmBuilder::with_executor(transport, behaviour, peer_id, TokioExecutor).build();
Ok(Self { swarm })
}

fn build_transport(private_key: Keypair) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>> {
let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true));
let transport = libp2p::dns::TokioDnsConfig::system(tcp)?;

// mplex config
let mut mplex_config = libp2p::mplex::MplexConfig::new();
mplex_config.set_max_buffer_size(256);
mplex_config.set_max_buffer_behaviour(libp2p::mplex::MaxBufferBehaviour::Block);

// yamux config
let mut yamux_config = libp2p::yamux::Config::default();
yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::on_read());

// noise config
let noise_config = noise::Config::new(&private_key)
.expect("signing can fail only once during starting a node");
// Authentication
Ok(transport
.upgrade(upgrade::Version::V1)
.authenticate(noise_config)
.multiplex(upgrade::SelectUpgrade::new(yamux_config, mplex_config))
.timeout(Duration::from_secs(10))
.boxed())
}

pub fn poll_network(&mut self, cx: &mut Context) -> Poll<()> {
while let Poll::Ready(Some(swarm_event)) = self.swarm.poll_next_unpin(cx) {
info!("Swarm event: {:?}", swarm_event);
// match swarm_event{
// libp2p::swarm::SwarmEvent::Behaviour(_) => todo!(),
// libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established, concurrent_dial_errors, established_in } => todo!(),
// libp2p::swarm::SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause } => todo!(),
// libp2p::swarm::SwarmEvent::IncomingConnection { local_addr, send_back_addr } => todo!(),
// libp2p::swarm::SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error } => todo!(),
// libp2p::swarm::SwarmEvent::OutgoingConnectionError { peer_id, error } => todo!(),
// libp2p::swarm::SwarmEvent::BannedPeer { peer_id, endpoint } => todo!(),
// libp2p::swarm::SwarmEvent::NewListenAddr { listener_id, address } => todo!(),
// libp2p::swarm::SwarmEvent::ExpiredListenAddr { listener_id, address } => todo!(),
// libp2p::swarm::SwarmEvent::ListenerClosed { listener_id, addresses, reason } => todo!(),
// libp2p::swarm::SwarmEvent::ListenerError { listener_id, error } => todo!(),
// libp2p::swarm::SwarmEvent::Dialing(_) => todo!(),
// }
}
Poll::Pending
}
}
24 changes: 24 additions & 0 deletions crates/p2p/src/reqrep/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,27 @@ impl ProtocolName for ProtocolId {
self.protocol_id.as_bytes()
}
}

#[cfg(test)]
mod test {
use super::SUPPORTED_PROTOCOL;

#[test]
fn test_protoco() {
let protocols = SUPPORTED_PROTOCOL
.iter()
.map(|(protocol_id, _)| protocol_id.protocol_id.clone())
.collect::<Vec<_>>();
assert_eq!(
protocols,
vec![
"/account_abstraction/erc4337/req/status/1/ssz_snappy",
"/account_abstraction/erc4337/req/goodbye/1/ssz_snappy",
"/account_abstraction/erc4337/req/ping/1/ssz_snappy",
"/account_abstraction/erc4337/req/metadata/1/ssz_snappy",
"/account_abstraction/erc4337/req/pooled_user_ops_hashes/1/ssz_snappy",
"/account_abstraction/erc4337/req/pooled_user_ops_by_hash/1/ssz_snappy"
]
)
}
}
20 changes: 20 additions & 0 deletions crates/p2p/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::net::TcpListener;
mod swarm;

pub fn get_available_port() -> Option<u16> {
let unused_port: u16;
loop {
let socket_addr = std::net::SocketAddr::new(std::net::Ipv4Addr::LOCALHOST.into(), 0);
match TcpListener::bind(socket_addr) {
Ok(listener) => match listener.local_addr().map(|s| s.port()) {
Ok(p) => {
unused_port = p;
break;
}
Err(_) => {}
},
Err(_) => {}
}
}
Some(unused_port)
}
Loading

0 comments on commit bcf4d43

Please sign in to comment.