diff --git a/bin/silius/Cargo.toml b/bin/silius/Cargo.toml index a0139f75..c56a58d5 100644 --- a/bin/silius/Cargo.toml +++ b/bin/silius/Cargo.toml @@ -22,6 +22,7 @@ log = "0.4.19" pin-utils = "0.1" silius-bundler = { path = "../../crates/bundler" } silius-grpc = { path = "../../crates/grpc" } +silius-p2p = { path = "../../crates/p2p" } silius-primitives = { path = "../../crates/primitives" } silius-rpc = { path = "../../crates/rpc" } tokio = { workspace = true } diff --git a/bin/silius/src/cli.rs b/bin/silius/src/cli.rs index c3d964d3..0921c36c 100644 --- a/bin/silius/src/cli.rs +++ b/bin/silius/src/cli.rs @@ -2,8 +2,9 @@ use crate::utils::{parse_address, parse_u256, parse_uopool_mode}; use clap::Parser; use ethers::types::{Address, U256}; use expanded_pathbuf::ExpandedPathBuf; +use silius_p2p::config::{Config, ListenAddr}; use silius_primitives::UoPoolMode; -use std::net::SocketAddr; +use std::net::{Ipv4Addr, SocketAddr}; #[derive(Clone, Debug, Parser)] pub struct UoPoolServiceOpts { @@ -110,6 +111,46 @@ impl RpcServiceOpts { } } +#[derive(Clone, Debug, Parser, PartialEq)] +pub struct P2POpts { + /// enable p2p + #[clap(long)] + pub enable_p2p: bool, + + /// Sets the p2p listen address. + #[clap(long, default_value = "0.0.0.0")] + pub p2p_listen_address: Ipv4Addr, + + /// The ipv4 address to broadcast to peers about which address we are listening on. + #[clap(long)] + pub p2p_broadcast_address: Ipv4Addr, + + /// The udp4 port to broadcast to peers in order to reach back for discovery. + #[clap(long, default_value = "4337")] + pub udp4_port: u16, + + #[clap(long, default_value = "4337")] + pub tcp4_port: u16, +} + +impl P2POpts { + pub fn to_config(&self) -> Config { + Config { + listen_addr: silius_p2p::config::ListenAddress::Ipv4(ListenAddr { + addr: self.p2p_listen_address, + udp_port: self.udp4_port, + tcp_port: self.tcp4_port, + }), + ipv4_addr: Some(self.p2p_broadcast_address), + ipv6_addr: None, + enr_udp4_port: Some(self.udp4_port), + enr_tcp4_port: Some(self.tcp4_port), + enr_udp6_port: None, + enr_tcp6_port: None, + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -118,6 +159,32 @@ mod tests { str::FromStr, }; + #[test] + fn p2p_opts() { + let args = vec![ + "p2popts", + "--enable-p2p", + "--p2p-listen-address", + "0.0.0.0", + "--p2p-broadcast-address", + "127.0.0.1", + "--tcp4-port", + "4337", + "--udp4-port", + "4337", + ]; + assert_eq!( + P2POpts { + enable_p2p: true, + p2p_listen_address: Ipv4Addr::new(0, 0, 0, 0), + p2p_broadcast_address: Ipv4Addr::new(127, 0, 0, 1), + tcp4_port: 4337, + udp4_port: 4337 + }, + P2POpts::try_parse_from(args).unwrap() + ) + } + #[test] fn bundler_opts() { let args = vec![ diff --git a/bin/silius/src/silius-uopool.rs b/bin/silius/src/silius-uopool.rs index 33b9540a..e1a1e06b 100644 --- a/bin/silius/src/silius-uopool.rs +++ b/bin/silius/src/silius-uopool.rs @@ -5,7 +5,7 @@ use ethers::{ types::{Address, U256}, }; use silius::{ - cli::UoPoolServiceOpts, + cli::{P2POpts, UoPoolServiceOpts}, utils::{parse_address, parse_u256, unwrap_path_or_home}, }; use silius_grpc::uopool_service_run; @@ -22,8 +22,8 @@ pub struct Opt { #[clap(flatten)] pub uopool_opts: UoPoolServiceOpts, - #[clap(long, value_delimiter=',', value_parser=parse_address)] - pub entry_points: Vec
, + #[clap(long, value_parser=parse_address)] + pub entry_points: Address, #[clap(long, default_value= "dev", value_parser = SUPPORTED_CHAINS)] pub chain: Option, @@ -34,6 +34,9 @@ pub struct Opt { #[clap(long, value_parser=parse_u256)] pub max_verification_gas: U256, + + #[clap(flatten)] + pub p2p_opts: P2POpts, } #[tokio::main] @@ -78,6 +81,8 @@ async fn main() -> Result<()> { opt.uopool_opts.min_priority_fee_per_gas, opt.uopool_opts.whitelist, opt.uopool_opts.uo_pool_mode, + opt.enable_p2p, + opt.p2p_opts.to_config(), ) .await?; diff --git a/bin/silius/src/silius.rs b/bin/silius/src/silius.rs index 0656f9f0..b72ffae4 100644 --- a/bin/silius/src/silius.rs +++ b/bin/silius/src/silius.rs @@ -6,7 +6,7 @@ use ethers::{ }; use expanded_pathbuf::ExpandedPathBuf; use silius::{ - cli::{BundlerServiceOpts, RpcServiceOpts, UoPoolServiceOpts}, + cli::{BundlerServiceOpts, P2POpts, RpcServiceOpts, UoPoolServiceOpts}, utils::{parse_address, parse_u256, run_until_ctrl_c, unwrap_path_or_home}, }; use silius_grpc::{ @@ -32,12 +32,15 @@ pub struct Opt { #[clap(long)] pub mnemonic_file: ExpandedPathBuf, - #[clap(long, value_delimiter=',', value_parser=parse_address)] - pub entry_points: Vec
, + #[clap(long, value_parser=parse_address)] + pub entry_points: Address, #[clap(long)] pub no_uopool: bool, + #[clap(flatten)] + pub p2p_opts: P2POpts, + #[clap(flatten)] pub uopool_opts: UoPoolServiceOpts, @@ -129,6 +132,8 @@ fn main() -> Result<()> { opt.uopool_opts.min_priority_fee_per_gas, opt.uopool_opts.whitelist, opt.uopool_opts.uo_pool_mode, + opt.p2p_opts.enable_p2p, + opt.p2p_opts.to_config() ) .await?; info!( @@ -149,7 +154,7 @@ fn main() -> Result<()> { bundler_service_run( opt.bundler_opts.bundler_grpc_listen_address, wallet, - opt.entry_points, + vec![opt.entry_points], opt.eth_client_address.clone(), chain, opt.bundler_opts.beneficiary, diff --git a/crates/grpc/Cargo.toml b/crates/grpc/Cargo.toml index a2be058d..4cd42d5e 100644 --- a/crates/grpc/Cargo.toml +++ b/crates/grpc/Cargo.toml @@ -17,6 +17,8 @@ async-trait = { workspace = true } dashmap = "5.4.0" ethers = { workspace = true } expanded-pathbuf = { workspace = true } +futures = "0.3.28" +libp2p-identity = "0.2.3" parking_lot = { workspace = true } prost = "0.11" serde_json = { workspace = true } @@ -24,6 +26,7 @@ silius-bundler = { path = "../bundler" } silius-contracts = { path = "../contracts" } silius-primitives = { path = "../primitives" } silius-uopool = { path = "../uopool" } +silius-p2p = { path = "../p2p" } tokio = { workspace = true } tonic = { version = "0.8", default-features = false, features = [ "codegen", diff --git a/crates/grpc/src/network.rs b/crates/grpc/src/network.rs new file mode 100644 index 00000000..0ce9261c --- /dev/null +++ b/crates/grpc/src/network.rs @@ -0,0 +1,31 @@ +use ethers::providers::Middleware; +use silius_p2p::network::Network; +use silius_primitives::reputation::ReputationEntry; +use silius_uopool::{Mempool, Reputation, VecCh, VecUo}; + +use crate::builder::UoPoolBuilder; + +/// The Integrator is for the integrations between p2p network and the uopool +pub struct NetworkIntegrator +where + M: Middleware + Clone + 'static, + P: Mempool + Send + Sync, + R: Reputation, Error = E> + Send + Sync, +{ + network: Network, + uopool_builder: UoPoolBuilder, +} + +impl NetworkIntegrator +where + M: Middleware + Clone + 'static, + P: Mempool + Send + Sync, + R: Reputation, Error = E> + Send + Sync, +{ + pub fn new(network: Network, uopool_builder: UoPoolBuilder) -> Self { + Self { + network, + uopool_builder, + } + } +} diff --git a/crates/grpc/src/uopool.rs b/crates/grpc/src/uopool.rs index 87ed08d2..59aae113 100644 --- a/crates/grpc/src/uopool.rs +++ b/crates/grpc/src/uopool.rs @@ -12,8 +12,14 @@ use ethers::{ types::{Address, U256}, }; use expanded_pathbuf::ExpandedPathBuf; +use futures::channel::mpsc::{unbounded, UnboundedSender}; +use futures::StreamExt; +use libp2p_identity::Keypair; use silius_contracts::entry_point::EntryPointErr; +use silius_p2p::config::Config; +use silius_p2p::network::Network; use silius_primitives::reputation::ReputationEntry; +use silius_primitives::UserOperation; use silius_primitives::{uopool::AddError, Chain, UoPoolMode}; use silius_uopool::{ init_env, DBError, DatabaseMempool, DatabaseReputation, Mempool, VecCh, VecUo, WriteMap, @@ -25,11 +31,14 @@ use silius_uopool::{ use std::fmt::{Debug, Display}; use std::{net::SocketAddr, sync::Arc, time::Duration}; use tonic::{Code, Request, Response, Status}; -use tracing::{info, warn}; +use tracing::{error, info, warn}; pub const MAX_UOS_PER_UNSTAKED_SENDER: usize = 4; pub const GAS_INCREASE_PERC: u64 = 10; +pub const DB_FOLDER_NAME: &str = "db"; +pub const DISCOVERY_SECRET_FILE_NAME: &str = "discovery-secret"; + type StandardUserPool = UserOperationPool, P, R, E>; @@ -41,6 +50,8 @@ where { pub uo_pools: Arc>>, pub chain: Chain, + // It would be None if p2p is not enabled + pub publish_sd: Option>, } impl UoPoolService @@ -50,8 +61,16 @@ where R: Reputation, Error = E> + Send + Sync, E: Debug + Display, { - pub fn new(uo_pools: Arc>>, chain: Chain) -> Self { - Self { uo_pools, chain } + pub fn new( + uo_pools: Arc>>, + chain: Chain, + publish_sd: Option>, + ) -> Self { + Self { + uo_pools, + chain, + publish_sd, + } } fn get_uo_pool(&self, ep: &Address) -> tonic::Result> { @@ -95,7 +114,12 @@ where } } }; - + match self.publish_sd { + Some(ref sd) => sd + .unbounded_send(uo.clone()) + .expect("Failed to send user operation to publish channel"), + None => (), + }; let mut uo_pool = self.get_uo_pool(&ep)?; match uo_pool.add_user_operation(uo, res).await { @@ -348,7 +372,7 @@ where pub async fn uopool_service_run( grpc_listen_address: SocketAddr, datadir: ExpandedPathBuf, - eps: Vec
, + ep: Address, eth_client: Arc>, chain: Chain, max_verification_gas: U256, @@ -357,6 +381,8 @@ pub async fn uopool_service_run( min_priority_fee_per_gas: U256, whitelist: Vec
, uo_pool_mode: UoPoolMode, + p2p_enabled: bool, + config: Config, ) -> Result<()> { tokio::spawn(async move { let mut builder = tonic::transport::Server::builder(); @@ -371,34 +397,89 @@ pub async fn uopool_service_run( >, >::new()); - let env = Arc::new(init_env::(datadir.join("db")).expect("Init mdbx failed")); + let env = + Arc::new(init_env::(datadir.join(DB_FOLDER_NAME)).expect("Init mdbx failed")); env.create_tables() .expect("Create mdbx database tables failed"); - for ep in eps { - let id = mempool_id(&ep, &U256::from(chain.id())); - let builder = UoPoolBuilder::new( - uo_pool_mode == UoPoolMode::Unsafe, - eth_client.clone(), - ep, + let id = mempool_id(&ep, &U256::from(chain.id())); + let uo_builder = UoPoolBuilder::new( + uo_pool_mode == UoPoolMode::Unsafe, + eth_client.clone(), + ep, + chain, + max_verification_gas, + min_stake, + min_unstake_delay, + min_priority_fee_per_gas, + whitelist.clone(), + DatabaseMempool::new(env.clone()), + DatabaseReputation::new(env.clone()), + ); + + let waiting_to_pub_sd = if p2p_enabled { + let (waiting_to_pub_sd, waiting_to_pub_rv) = unbounded(); + let (p2p_userop_sd, mut p2p_userop_rv) = unbounded::(); + let mut uo_pool = uo_builder.uo_pool(); + + // spawn a task which would consume the userop received from p2p network + tokio::spawn(async move { + while let Some(user_op) = p2p_userop_rv.next().await { + match uo_pool.validate_user_operation(&user_op).await { + Ok(validate_result) => { + match uo_pool.add_user_operation(user_op, validate_result).await { + Ok(_) => {} + Err(e) => error!("Failed to add user operation: {:?} from p2p", e), + } + } + Err(e) => error!( + "Failed to validate user operation {user_op:?}: {:?} from p2p", + e + ), + } + } + }); + + let discovery_secret_file = datadir.join(DISCOVERY_SECRET_FILE_NAME); + let discovery_secret = if discovery_secret_file.exists() { + let content = + std::fs::read(discovery_secret_file).expect("discovery secret file currupted"); + Keypair::from_protobuf_encoding(&content).expect("discovery secret file currupted") + } else { + let keypair = Keypair::generate_secp256k1(); + std::fs::write( + discovery_secret_file, + keypair + .to_protobuf_encoding() + .expect("discovery secret encode failed"), + ) + .expect("write discoveray secret file failed"); + keypair + }; + let p2p_network = Network::new( + discovery_secret, + config, chain, - max_verification_gas, - min_stake, - min_unstake_delay, - min_priority_fee_per_gas, - whitelist.clone(), - DatabaseMempool::new(env.clone()), - DatabaseReputation::new(env.clone()), + ep, + waiting_to_pub_rv, + p2p_userop_sd, ); - m_map.insert(id, builder); - } + + Some(waiting_to_pub_sd) + } else { + None + }; + + m_map.insert(id, uo_builder); let svc = uo_pool_server::UoPoolServer::new(UoPoolService::< Provider, DatabaseMempool, DatabaseReputation, DBError, - >::new(m_map.clone(), chain)); + >::new( + m_map.clone(), chain, waiting_to_pub_sd + )); tokio::spawn(async move { loop {