diff --git a/Cargo.lock b/Cargo.lock index c4f6a90..f622e01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3443,6 +3443,7 @@ dependencies = [ "quick-protobuf", "rand", "rw-stream-sink", + "serde", "smallvec", "thiserror", "tracing", @@ -3514,6 +3515,7 @@ dependencies = [ "quick-protobuf-codec 0.3.1", "rand", "regex", + "serde", "sha2", "smallvec", "tracing", @@ -3555,6 +3557,7 @@ dependencies = [ "multihash", "quick-protobuf", "rand", + "serde", "sha2", "thiserror", "tracing", @@ -3582,6 +3585,7 @@ dependencies = [ "quick-protobuf", "quick-protobuf-codec 0.3.1", "rand", + "serde", "sha2", "smallvec", "thiserror", @@ -4142,6 +4146,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "076d548d76a0e2a0d4ab471d0b1c36c577786dfc4471242035d97a12a735c492" dependencies = [ "core2", + "serde", "unsigned-varint 0.7.2", ] diff --git a/Cargo.toml b/Cargo.toml index 3e34a9e..ea59f88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,8 @@ libp2p = { version = "0.53.2", features = [ "quic", "relay", "autonat", - "dcutr" + "dcutr", + "serde", ] } libp2p-relay-manager = "0.2.2" tokio = { version = "1.36.0", features = ["full"] } diff --git a/src/api/routes.rs b/src/api/routes.rs index d628338..03ee499 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -1,3 +1,14 @@ +use std::path::Display; + +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::Json; +use const_hex::ToHexExt; +use itertools::Itertools; +use libp2p::autonat::NatStatus; +use serde::Serialize; +use sqlx::{Executor, Row}; + use crate::api::AppState; use crate::controller::ControllerCommands; use crate::p2p::NetworkState; @@ -5,11 +16,6 @@ use crate::rules::Results; use crate::storage; use crate::storage::{get_for_id_and_kind, QueryOptions}; use crate::types::{PremintName, PremintTypes}; -use axum::extract::{Path, Query, State}; -use axum::http::StatusCode; -use axum::Json; -use serde::Serialize; -use sqlx::{Executor, Row}; pub async fn list_all( State(state): State, @@ -185,7 +191,23 @@ pub struct NodeInfoResponse { pub num_peers: u64, pub dht_peers: Vec>, pub gossipsub_peers: Vec, - pub all_external_addresses: Vec>, + pub external_addresses: Vec, + pub providing: Vec, + pub listeners: Vec, + pub nat_status: String, +} + +trait StringOrHex { + fn to_string(&self) -> String; +} + +impl StringOrHex for Vec { + fn to_string(&self) -> String { + match String::from_utf8(self.clone()) { + Ok(value) => value, + Err(_) => self.encode_hex(), + } + } } impl From for NodeInfoResponse { @@ -195,24 +217,41 @@ impl From for NodeInfoResponse { network_info, dht_peers, gossipsub_peers, - all_external_addresses, + external_addresses, + listeners, + providing, + nat_status, .. } = state; + let dht_peers = dht_peers .into_iter() - .map(|peer| peer.iter().map(|p| p.to_string()).collect()) + .map(|peer| peer.iter().map(ToString::to_string).collect()) .collect(); - let gossipsub_peers = gossipsub_peers.into_iter().map(|p| p.to_string()).collect(); - let all_external_addresses = all_external_addresses - .into_iter() - .map(|peer| peer.into_iter().map(|p| p.to_string()).collect()) + let gossipsub_peers = gossipsub_peers.iter().map(ToString::to_string).collect(); + let external_addresses = external_addresses.iter().map(ToString::to_string).collect(); + let providing = providing + .iter() + .take(100) + .map(|record| record.key.to_vec().to_string()) .collect(); + let listeners = listeners.iter().map(ToString::to_string).collect(); + let nat_status = match nat_status { + NatStatus::Private => "Private", + NatStatus::Unknown => "Unknown", + NatStatus::Public(..) => "Public", + } + .to_string(); + Self { local_peer_id: local_peer_id.to_string(), num_peers: network_info.num_peers() as u64, dht_peers, gossipsub_peers, - all_external_addresses, + external_addresses, + providing, + listeners, + nat_status, } } } diff --git a/src/config.rs b/src/config.rs index d0881de..75afff3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -78,6 +78,9 @@ pub struct Config { #[envconfig(from = "SYNC_LOOKBACK_HOURS", default = "6")] pub sync_lookback_hours: u64, + + #[envconfig(from = "ENABLE_RELAY", default = "false")] + pub enable_relay: bool, } impl Config { @@ -104,6 +107,7 @@ impl Config { rate_limit_rps: 1, boot_nodes: BootNodes::None, sync_lookback_hours: 6, + enable_relay: false, } } } @@ -198,7 +202,7 @@ pub fn init() -> Config { #[cfg(test)] mod test { - use crate::config::{BootNodes, ChainInclusionMode}; + use crate::config::{BootNodes, ChainInclusionMode, Config}; use std::env; use std::str::FromStr; @@ -210,28 +214,9 @@ mod test { #[test] fn test_premint_names() { - let config = super::Config { - secret: "0x7948efac1e9dbfb77691541df857b3142ea88f5b75b37dfca506f1f1c5d659ee" - .to_string(), - peer_port: 7777, - connect_external: false, - db_url: None, - persist_state: false, - prune_minted_premints: false, - api_port: 0, - peer_limit: 1000, + let config = Config { supported_premint_types: "simple,zora_premint_v2".to_string(), - chain_inclusion_mode: ChainInclusionMode::Check, - supported_chain_ids: "7777777".to_string(), - trusted_peers: None, - node_id: None, - external_address: None, - interactive: false, - enable_rpc: true, - admin_api_secret: None, - rate_limit_rps: 1, - boot_nodes: BootNodes::Chain, - sync_lookback_hours: 0, + ..Config::test_default() }; let names = config.premint_names(); @@ -240,27 +225,8 @@ mod test { assert_eq!(names[1].0, "zora_premint_v2"); let config = super::Config { - secret: "0x7948efac1e9dbfb77691541df857b3142ea88f5b75b37dfca506f1f1c5d659ee" - .to_string(), - peer_port: 7777, - connect_external: false, - db_url: None, - persist_state: false, - prune_minted_premints: false, - api_port: 0, - peer_limit: 1000, supported_premint_types: "zora_premint_v2".to_string(), - chain_inclusion_mode: ChainInclusionMode::Check, - supported_chain_ids: "7777777".to_string(), - trusted_peers: None, - node_id: None, - external_address: None, - interactive: false, - enable_rpc: true, - admin_api_secret: None, - rate_limit_rps: 1, - boot_nodes: BootNodes::None, - sync_lookback_hours: 0, + ..Config::test_default() }; let names = config.premint_names(); diff --git a/src/lib.rs b/src/lib.rs index 57f01d6..2ca0d18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod chain_list; pub mod config; pub mod controller; pub mod metrics; +pub mod multiaddr_ext; pub mod p2p; pub mod premints; pub mod rules; diff --git a/src/multiaddr_ext.rs b/src/multiaddr_ext.rs new file mode 100644 index 0000000..cbc60fd --- /dev/null +++ b/src/multiaddr_ext.rs @@ -0,0 +1,26 @@ +use libp2p::multiaddr::Protocol; +use libp2p::PeerId; + +pub trait MultiaddrExt { + // get the last peer id from this address + fn peer_id(&self) -> Option; + + fn is_relayed(&self) -> bool; +} + +impl MultiaddrExt for libp2p::Multiaddr { + fn peer_id(&self) -> Option { + let mut last = None; + self.iter().for_each(|component| { + if let Protocol::P2p(key) = component { + last = Some(key.clone()); + } + }); + + last + } + + fn is_relayed(&self) -> bool { + self.iter().any(|addr| matches!(addr, Protocol::P2pCircuit)) + } +} diff --git a/src/p2p.rs b/src/p2p.rs index 3d8afb4..37df7c9 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -1,24 +1,29 @@ +use std::borrow::Cow; use std::hash::Hasher; use std::time::Duration; use eyre::WrapErr; use futures_ticker::Ticker; +use itertools::Itertools; +use libp2p::autonat::NatStatus; use libp2p::core::ConnectedPoint; use libp2p::futures::StreamExt; use libp2p::gossipsub::Version; use libp2p::identify::Event; use libp2p::identity::Keypair; -use libp2p::kad::store::MemoryStore; +use libp2p::kad::store::{MemoryStore, RecordStore}; use libp2p::kad::GetProvidersOk::FoundProviders; -use libp2p::kad::{Addresses, QueryResult, RecordKey}; +use libp2p::kad::{Addresses, ProviderRecord, QueryResult, Record, RecordKey}; use libp2p::multiaddr::Protocol; use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport, ResponseChannel}; -use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent}; +use libp2p::swarm::behaviour::toggle::Toggle; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; +use libp2p::swarm::DialError::DialPeerConditionFalse; +use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent, ToSwarm}; use libp2p::{ autonat, dcutr, gossipsub, kad, noise, relay, request_response, tcp, yamux, Multiaddr, PeerId, StreamProtocol, }; - use rand::prelude::SliceRandom; use serde::{Deserialize, Serialize}; use sha256::digest; @@ -26,6 +31,7 @@ use tokio::select; use crate::config::Config; use crate::controller::{P2PEvent, SwarmCommand}; +use crate::multiaddr_ext::MultiaddrExt; use crate::storage::QueryOptions; use crate::types::{ claims_topic_hashes, InclusionClaim, MintpoolNodeInfo, PeerInclusionClaim, Premint, @@ -39,7 +45,7 @@ pub struct MintpoolBehaviour { identify: libp2p::identify::Behaviour, ping: libp2p::ping::Behaviour, request_response: request_response::cbor::Behaviour, - relay: relay::Behaviour, + relay: Toggle, relay_client: relay::client::Behaviour, relay_manager: libp2p_relay_manager::Behaviour, autonat: autonat::Behaviour, @@ -64,7 +70,8 @@ impl SwarmController { command_receiver: tokio::sync::mpsc::Receiver, event_sender: tokio::sync::mpsc::Sender, ) -> Self { - let mut swarm = Self::make_swarm_controller(id_keys).expect("Invalid config for swarm"); + let mut swarm = Self::make_swarm_controller(id_keys, config.enable_relay) + .expect("Invalid config for swarm"); // add external address if configured config @@ -103,13 +110,16 @@ impl SwarmController { MintpoolNodeInfo { peer_id, addr } } - fn make_swarm_controller(id_keys: Keypair) -> eyre::Result> { + fn make_swarm_controller( + id_keys: Keypair, + enable_relay: bool, + ) -> eyre::Result> { let peer_id = id_keys.public().to_peer_id(); let public_key = id_keys.public(); let swarm = libp2p::SwarmBuilder::with_existing_identity(id_keys) .with_tokio() .with_tcp( - tcp::Config::default(), + tcp::Config::default().port_reuse(true).nodelay(true), noise::Config::new, yamux::Config::default, )? @@ -119,7 +129,6 @@ impl SwarmController { .with_behaviour(|key, client| { let mut b = kad::Behaviour::new(peer_id, MemoryStore::new(key.public().to_peer_id())); - b.set_mode(Some(kad::Mode::Server)); let gossipsub_config = gossipsub::ConfigBuilder::default() .heartbeat_interval(Duration::from_secs(10)) .validation_mode(gossipsub::ValidationMode::Strict) @@ -149,7 +158,11 @@ impl SwarmController { )], request_response::Config::default(), ), - relay: libp2p::relay::Behaviour::new(peer_id, Default::default()), + relay: Toggle::from(if enable_relay { + Some(relay::Behaviour::new(peer_id, Default::default())) + } else { + None + }), relay_client: client, relay_manager: libp2p_relay_manager::Behaviour::new( libp2p_relay_manager::Config { @@ -159,9 +172,9 @@ impl SwarmController { backoff: Duration::from_secs(15), }, ), - autonat: libp2p::autonat::Behaviour::new( + autonat: autonat::Behaviour::new( peer_id, - libp2p::autonat::Config { + autonat::Config { boot_delay: Duration::from_secs(15), only_global_ips: false, use_connected: true, @@ -179,10 +192,10 @@ impl SwarmController { /// Starts the swarm controller listening and runs the run_loop awaiting incoming actions pub async fn run(&mut self, port: u64, listen_ip: String) -> eyre::Result<()> { - self.swarm - .listen_on(format!("/ip4/{listen_ip}/tcp/{port}").parse()?)?; self.swarm .listen_on(format!("/ip4/{listen_ip}/udp/{port}/quic-v1").parse()?)?; + self.swarm + .listen_on(format!("/ip4/{listen_ip}/tcp/{port}").parse()?)?; let registry_topic = announce_topic(); self.swarm @@ -265,7 +278,13 @@ impl SwarmController { match event { SwarmEvent::NewListenAddr { address, .. } => { let pid = self.swarm.local_peer_id(); - let local_address = if address.iter().any(|p| p == Protocol::P2pCircuit) { + let local_address = if address.is_relayed() { + // if it's a relay address, let's assume it's an external address + + if !self.swarm.external_addresses().contains(&address) { + self.swarm.add_external_address(address.clone()); + } + address.to_string() } else { address.with(Protocol::P2p(*pid)).to_string() @@ -273,6 +292,34 @@ impl SwarmController { tracing::info!(local_address = local_address, "Started listening"); } + SwarmEvent::ExpiredListenAddr { address, .. } => { + tracing::info!(address = address.to_string(), "Expired listen address"); + + if address.is_relayed() { + self.swarm.remove_external_address(&address); + } + } + + SwarmEvent::ListenerClosed { + addresses, reason, .. + } => { + tracing::info!( + addresses = addresses + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "), + reason = format!("{:?}", reason), + "Listener closed" + ); + + for address in addresses { + if address.is_relayed() { + self.swarm.remove_external_address(&address); + } + } + } + SwarmEvent::IncomingConnection { connection_id, local_addr, @@ -283,29 +330,10 @@ impl SwarmController { } SwarmEvent::ConnectionEstablished { peer_id, - endpoint, connection_id, .. } => { self.reject_connection_if_over_max(connection_id); - - match endpoint { - ConnectedPoint::Dialer { address, .. } => { - let addr = address; - self.swarm.add_external_address(addr.clone()); - let b = self.swarm.behaviour_mut(); - b.kad.add_address(&peer_id, addr.clone()); - tracing::info!("Dialed: {:?}", addr); - } - ConnectedPoint::Listener { - local_addr, - send_back_addr, - } => { - let addr = send_back_addr.with(Protocol::P2p(peer_id)); - tracing::info!("Was connected to by: {:?} local: {local_addr}", addr); - } - } - tracing::info!("Connection established with peer: {:?}", peer_id); tracing::info!(counter.connections = 1); } @@ -364,7 +392,13 @@ impl SwarmController { tracing::info!("Dialing: {:?}", peer_id) } + SwarmEvent::NewExternalAddrCandidate { address } => { + tracing::info!("New external address candidate: {:?}", address) + } + SwarmEvent::ExternalAddrConfirmed { address } => { + tracing::info!("External address confirmed: {:?}", address); + match self .swarm .behaviour_mut() @@ -386,7 +420,7 @@ impl SwarmController { SwarmEvent::Behaviour(MintpoolBehaviourEvent::Identify(event)) => match event { Event::Received { peer_id, info } => { - let is_relay = info.protocols.contains(&libp2p::relay::HOP_PROTOCOL_NAME); + let is_relay = info.protocols.contains(&relay::HOP_PROTOCOL_NAME); if is_relay { tracing::info!("Discovered relay peer: {:?}", info); @@ -579,7 +613,7 @@ impl SwarmController { } => match providers { FoundProviders { providers, .. } => { for peer in providers { - tracing::info!("Found provider: {:?}", peer); + tracing::trace!("Found provider: {:?}", peer); // lookup address in kad routing table let addresses = @@ -637,41 +671,49 @@ impl SwarmController { return; } - if state.all_external_addresses.contains(&address) && !self.local_mode { - tracing::warn!("Already connected to peer: {:?}", address); - return; - } + let opts = if let Some(peer_id) = address.peer_id() { + DialOpts::peer_id(peer_id) + .addresses(vec![address]) + .condition(PeerCondition::DisconnectedAndNotDialing) + .build() + } else { + DialOpts::from(address) + }; + + match self.swarm.dial(opts) { + Ok(_) | Err(DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing)) => {} - if let Err(err) = self.swarm.dial(address) { - tracing::error!("Error dialing peer: {:?}", err); + Err(err) => { + tracing::error!("Error dialing peer: {:?}", err); + } } } fn make_network_state(&mut self) -> NetworkState { - let dht_peers: Vec<_> = self - .swarm - .behaviour_mut() + let external_addresses = self.swarm.external_addresses().cloned().collect(); + let local_peer_id = *self.swarm.local_peer_id(); + let network_info = self.swarm.network_info(); + let listeners = self.swarm.listeners().cloned().collect(); + + let b = self.swarm.behaviour_mut(); + let dht_peers: Vec<_> = b .kad .kbuckets() .flat_map(|x| x.iter().map(|x| x.node.value.clone()).collect::>()) .collect(); - - let my_id = *self.swarm.local_peer_id(); - - let gossipsub_peers = self - .swarm - .behaviour_mut() - .gossipsub - .all_mesh_peers() - .cloned() - .collect::>(); + let providing = b.kad.store_mut().provided().map(Cow::into_owned).collect(); + let gossipsub_peers = b.gossipsub.all_mesh_peers().cloned().collect::>(); + let nat_status = b.autonat.nat_status(); NetworkState { - local_peer_id: my_id, - network_info: self.swarm.network_info(), + local_peer_id, + network_info, dht_peers, gossipsub_peers, - all_external_addresses: self.swarm.external_addresses().cloned().collect(), + external_addresses, + providing, + listeners, + nat_status, } } @@ -748,10 +790,6 @@ impl SwarmController { match event { relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } => { tracing::info!("Relay reservation request accepted: {:?}", relay_peer_id); - self.swarm - .behaviour_mut() - .kad - .start_providing(RecordKey::new(&"mintpool::gossip"))?; } other => { @@ -852,7 +890,10 @@ pub struct NetworkState { pub network_info: NetworkInfo, pub dht_peers: Vec, pub gossipsub_peers: Vec, - pub all_external_addresses: Vec, + pub external_addresses: Vec, + pub providing: Vec, + pub listeners: Vec, + pub nat_status: NatStatus, } #[derive(Debug, Serialize, Deserialize)]