From cec73ba26c1a328a777a68c1b51bead7373f1326 Mon Sep 17 00:00:00 2001 From: Erik Reppel Date: Thu, 25 Apr 2024 20:33:07 -0400 Subject: [PATCH 1/3] Start --- docs/OPERATION.md | 1 + src/config.rs | 3 +++ src/controller.rs | 27 ++++++++++++++++++++++++++- 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/docs/OPERATION.md b/docs/OPERATION.md index ca2ffa0..757b602 100644 --- a/docs/OPERATION.md +++ b/docs/OPERATION.md @@ -58,6 +58,7 @@ INTERACTIVE: bool (false) - If true, interactive repl will run ENABLE_RPC: bool (true) - If true, rpc will be used for rules evaluation ADMIN_API_SECRET: Option (None) - Secret key used to access admin api routes RATE_LIMIT_RPS: u32 (2) - Rate limit requests per second for the http api +SYNC_LOOKBACK_HOURS: u64 (6) - Number of hours to look back for syncing premints from another node ``` #### Logging diff --git a/src/config.rs b/src/config.rs index 0826f68..6a65ef7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -75,6 +75,9 @@ pub struct Config { // If set to chain, will check the Zora Network MintpoolTrusted nodes contract for boot nodes #[envconfig(from = "BOOT_NODES", default = "chain")] pub boot_nodes: BootNodes, + + #[envconfig(from = "SYNC_LOOKBACK_HOURS", default = "6")] + pub sync_lookback_hours: u64, } impl Config { diff --git a/src/controller.rs b/src/controller.rs index 97cdbd6..9881264 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,8 +1,11 @@ use crate::chain::inclusion_claim_correct; use crate::config::{ChainInclusionMode, Config}; +use chrono::Utc; use eyre::WrapErr; use libp2p::PeerId; use sqlx::SqlitePool; +use std::ops::Sub; +use std::time::{Duration, SystemTime}; use tokio::select; use tokio::sync::{mpsc, oneshot}; @@ -73,11 +76,12 @@ pub struct Controller { rules: RulesEngine, trusted_peers: Vec, inclusion_mode: ChainInclusionMode, + config: Config, } impl Controller { pub fn new( - config: &Config, + config: Config, swarm_command_sender: mpsc::Sender, swarm_event_receiver: mpsc::Receiver, external_commands: mpsc::Receiver, @@ -92,6 +96,7 @@ impl Controller { rules, trusted_peers: config.trusted_peers(), inclusion_mode: config.chain_inclusion_mode, + config, } } @@ -280,6 +285,26 @@ impl Controller { } } } + + /// temporary solution for full state from a known other node via their http api. + /// We should migrate to syncing based on peer_id & libp2p request_response + async fn api_sync(&self, api_url: String) -> eyre::Result<()> { + let seconds = self.config.sync_lookback_hours * 60 * 60; + let from_time = SystemTime::now().sub(Duration::from_secs(seconds)); + + let from_time = chrono::Utc::now().sub(chrono::Duration::hours( + self.config.sync_lookback_hours as i64, + )); + + let url = reqwest::Url::parse_with_params( + api_url.as_str(), + &[("from", serde_json::to_string(&from_time)?)], + )?; + + // reqwest::get() + + Ok(()) + } } #[derive(Clone)] From fe0b7b37b18e01fb9f34e0ab2a5b0dd53f194155 Mon Sep 17 00:00:00 2001 From: Erik Reppel Date: Thu, 25 Apr 2024 21:58:44 -0400 Subject: [PATCH 2/3] Implement P2P and controller layers for request / response based sync --- Cargo.lock | 32 ++++++++++++++ Cargo.toml | 2 + examples/extra_rules_and_routes.rs | 2 +- src/config.rs | 3 +- src/controller.rs | 21 +++++++++- src/main.rs | 2 +- src/p2p.rs | 67 +++++++++++++++++++++++++++++- src/run.rs | 10 ++--- src/storage.rs | 4 +- 9 files changed, 131 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93a8a0d..f1424d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1286,6 +1286,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "cbor4ii" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59b4c883b9cc4757b061600d39001d4d0232bece4a3174696cf8f58a14db107d" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.0.89" @@ -2911,6 +2920,7 @@ dependencies = [ "libp2p-noise", "libp2p-ping", "libp2p-quic", + "libp2p-request-response", "libp2p-swarm", "libp2p-tcp", "libp2p-upnp", @@ -3198,6 +3208,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "libp2p-request-response" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e12823250fe0c45bdddea6eefa2be9a609aff1283ff4e1d8a294fdbb89572f6f" +dependencies = [ + "async-trait", + "cbor4ii", + "futures", + "futures-bounded", + "futures-timer", + "instant", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand", + "serde", + "smallvec", + "tracing", + "void", +] + [[package]] name = "libp2p-swarm" version = "0.44.1" diff --git a/Cargo.toml b/Cargo.toml index a2e9be9..4dc12d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,8 @@ libp2p = { version = "0.53.2", features = [ "identify", "ping", "dns", + "request-response", + "cbor" ] } tokio = { version = "1.36.0", features = ["full"] } eyre = "0.6.12" diff --git a/examples/extra_rules_and_routes.rs b/examples/extra_rules_and_routes.rs index e628292..924e01e 100644 --- a/examples/extra_rules_and_routes.rs +++ b/examples/extra_rules_and_routes.rs @@ -27,7 +27,7 @@ async fn main() -> eyre::Result<()> { rules.add_rule(metadata_rule!(only_odd_token_ids)); rules.add_rule(Box::new(MustStartWithA {})); - let ctl = mintpool::run::start_p2p_services(&config, rules).await?; + let ctl = mintpool::run::start_p2p_services(config.clone(), rules).await?; // Add some custom routes in addition to the defaults. You could also add middleware or anything else you can do with axum. let mut router = mintpool::api::router_with_defaults(&config); diff --git a/src/config.rs b/src/config.rs index 6a65ef7..3ad5d0a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,7 +7,7 @@ use rand::Rng; use crate::chain_list::CHAINS; use crate::types::PremintName; -#[derive(Envconfig, Debug)] +#[derive(Envconfig, Debug, Clone)] pub struct Config { // Used to derive an ed25519 keypair for node identity // Should be 32 bytes of random hex. @@ -103,6 +103,7 @@ impl Config { admin_api_secret: None, rate_limit_rps: 1, boot_nodes: BootNodes::None, + sync_lookback_hours: 6, } } } diff --git a/src/controller.rs b/src/controller.rs index 9881264..16cb434 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -11,7 +11,7 @@ use tokio::sync::{mpsc, oneshot}; use crate::p2p::NetworkState; use crate::rules::{Results, RulesEngine}; -use crate::storage::{PremintStorage, Reader, Writer}; +use crate::storage::{list_all_with_options, PremintStorage, QueryOptions, Reader, Writer}; use crate::types::{ InclusionClaim, MintpoolNodeInfo, PeerInclusionClaim, PremintName, PremintTypes, }; @@ -41,6 +41,13 @@ pub enum P2PEvent { NetworkState(NetworkState), PremintReceived(PremintTypes), MintSeenOnchain(PeerInclusionClaim), + SyncRequest { + query: QueryOptions, + channel: oneshot::Sender>>, + }, + SyncResponse { + premints: Vec, + }, } pub enum ControllerCommands { @@ -132,6 +139,18 @@ impl Controller { tracing::error!("Error handling onchain claim: {:?}", err); } } + P2PEvent::SyncRequest { query, channel } => { + let events = list_all_with_options(&self.store.db(), &query).await; + if let Err(Err(err)) = channel.send(events) { + tracing::error!("Error sending sync response: {:?}", err); + } + tracing::info!(histogram.sync_request_processed = 1); + } + P2PEvent::SyncResponse { premints } => { + for premint in premints { + let _ = self.validate_and_insert(premint).await; + } + } } } diff --git a/src/main.rs b/src/main.rs index 628ceff..528bbb4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ async fn main() -> eyre::Result<()> { let mut rules = RulesEngine::new(&config); rules.add_default_rules(); - let ctl = start_p2p_services(&config, rules).await?; + let ctl = start_p2p_services(config.clone(), rules).await?; let router = api::router_with_defaults(&config).merge(metrics_router); api::start_api(&config, ctl.clone(), router, true).await?; diff --git a/src/p2p.rs b/src/p2p.rs index ea7b488..192ad6a 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -1,5 +1,6 @@ use crate::config::Config; use crate::controller::{P2PEvent, SwarmCommand}; +use crate::storage::QueryOptions; use crate::types::{ claims_topic_hashes, InclusionClaim, MintpoolNodeInfo, PeerInclusionClaim, Premint, PremintName, PremintTypes, @@ -14,12 +15,17 @@ use libp2p::kad::store::MemoryStore; use libp2p::kad::GetProvidersOk::FoundProviders; use libp2p::kad::{Addresses, QueryResult, RecordKey}; use libp2p::multiaddr::Protocol; +use libp2p::request_response::{Message, ProtocolSupport}; use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent}; -use libp2p::{gossipsub, kad, noise, tcp, yamux, Multiaddr, PeerId}; +use libp2p::{ + gossipsub, kad, noise, request_response, tcp, yamux, Multiaddr, PeerId, StreamProtocol, +}; use sha256::digest; use std::hash::Hasher; use std::time::Duration; use tokio::select; +use tower::load::pending_requests; +use tower_http::request_id; #[derive(NetworkBehaviour)] pub struct MintpoolBehaviour { @@ -27,6 +33,7 @@ pub struct MintpoolBehaviour { kad: kad::Behaviour, identify: libp2p::identify::Behaviour, ping: libp2p::ping::Behaviour, + request_response: request_response::cbor::Behaviour>, } pub struct SwarmController { @@ -120,6 +127,13 @@ impl SwarmController { public_key, )), ping: libp2p::ping::Behaviour::new(libp2p::ping::Config::new()), + request_response: request_response::cbor::Behaviour::new( + [( + StreamProtocol::new("/mintpool-sync/1"), + ProtocolSupport::Full, + )], + request_response::Config::default(), + ), } })? .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) @@ -286,6 +300,10 @@ impl SwarmController { } } + SwarmEvent::Behaviour(MintpoolBehaviourEvent::RequestResponse(event)) => { + self.handle_request_response_event(event).await + } + SwarmEvent::Dialing { peer_id, .. } => { tracing::info!("Dialing: {:?}", peer_id) } @@ -549,6 +567,53 @@ impl SwarmController { all_external_addresses: self.swarm.external_addresses().cloned().collect(), } } + + async fn handle_request_response_event( + &mut self, + event: request_response::Event>, + ) -> eyre::Result<()> { + match event { + request_response::Event::Message { peer, message } => match message { + Message::Request { + request_id, + request, + channel, + } => { + tracing::info!( + request_id = request_id.to_string(), + "processing request for sync" + ); + let (snd, recv) = tokio::sync::oneshot::channel(); + self.event_sender + .send(P2PEvent::SyncRequest { + query: request, + channel: snd, + }) + .await?; + let result = recv.await??; + self.swarm + .behaviour_mut() + .request_response + .send_response(channel, result) + .map_err(|e| eyre::eyre!("Error sending response: {:?}", e))? + } + Message::Response { + request_id, + response, + } => { + tracing::info!( + request_id = request_id.to_string(), + "received response for sync" + ); + self.event_sender + .send(P2PEvent::SyncResponse { premints: response }) + .await?; + } + }, + other => tracing::info!("mintpool sync request/response event: {:?}", other), + } + Ok(()) + } } fn gossipsub_message_id(message: &gossipsub::Message) -> gossipsub::MessageId { diff --git a/src/run.rs b/src/run.rs index ebeea7a..2f13939 100644 --- a/src/run.rs +++ b/src/run.rs @@ -18,20 +18,20 @@ use crate::types::Premint; /// All interactions with the controller should be done through `ControllerInterface` for memory safety. /// Recommended to use this function when extending mintpool as a library, but if you're feeling bold you can reproduce what its doing. pub async fn start_p2p_services( - config: &Config, + config: Config, rules: RulesEngine, ) -> eyre::Result { - let id_keys = make_keypair(config) + let id_keys = make_keypair(&config) .expect("Failed to create keypair, node cannot start. Confirm secret is 32 bytes of hex (0x + 64 hex chars)"); let (event_send, event_recv) = tokio::sync::mpsc::channel(1024); let (swrm_cmd_send, swrm_recv) = tokio::sync::mpsc::channel(1024); let (ext_cmd_send, ext_cmd_recv) = tokio::sync::mpsc::channel(1024); - let store = PremintStorage::new(config).await; + let store = PremintStorage::new(&config).await; - let mut swarm_controller = SwarmController::new(id_keys, config, swrm_recv, event_send); + let mut swarm_controller = SwarmController::new(id_keys, &config, swrm_recv, event_send); let mut controller = Controller::new( - &config, + config.clone(), swrm_cmd_send, event_recv, ext_cmd_recv, diff --git a/src/storage.rs b/src/storage.rs index 25b0384..0f7630a 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -5,7 +5,7 @@ use alloy_primitives::Address; use async_trait::async_trait; use chrono::Utc; use eyre::WrapErr; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use sqlx::sqlite::SqliteConnectOptions; use sqlx::Row; use sqlx::{QueryBuilder, Sqlite, SqlitePool}; @@ -233,7 +233,7 @@ pub async fn list_all(db: &SqlitePool) -> eyre::Result> { Ok(premints) } -#[derive(Deserialize)] +#[derive(Deserialize, Serialize, Debug, Clone)] pub struct QueryOptions { pub chain_id: Option, pub kind: Option, From d933fe5cb80cb959c15dcf1f6a4e8e8db912d494 Mon Sep 17 00:00:00 2001 From: Erik Reppel Date: Thu, 25 Apr 2024 22:25:33 -0400 Subject: [PATCH 3/3] End progress for the night, controller and p2p changes are in, need entrypoint and testing --- src/config.rs | 2 + src/controller.rs | 35 ++++------------ src/p2p.rs | 99 ++++++++++++++++++++++++++++++++++----------- tests/api_test.rs | 2 +- tests/common/mod.rs | 2 +- tests/e2e_test.rs | 29 ++++++++----- 6 files changed, 107 insertions(+), 62 deletions(-) diff --git a/src/config.rs b/src/config.rs index 3ad5d0a..d0881de 100644 --- a/src/config.rs +++ b/src/config.rs @@ -231,6 +231,7 @@ mod test { admin_api_secret: None, rate_limit_rps: 1, boot_nodes: BootNodes::Chain, + sync_lookback_hours: 0, }; let names = config.premint_names(); @@ -259,6 +260,7 @@ mod test { admin_api_secret: None, rate_limit_rps: 1, boot_nodes: BootNodes::None, + sync_lookback_hours: 0, }; let names = config.premint_names(); diff --git a/src/controller.rs b/src/controller.rs index 16cb434..9e9c3e7 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -81,8 +81,7 @@ pub struct Controller { external_commands: mpsc::Receiver, store: PremintStorage, rules: RulesEngine, - trusted_peers: Vec, - inclusion_mode: ChainInclusionMode, + config: Config, } @@ -101,8 +100,6 @@ impl Controller { external_commands, store, rules, - trusted_peers: config.trusted_peers(), - inclusion_mode: config.chain_inclusion_mode, config, } } @@ -229,7 +226,7 @@ impl Controller { tracing::debug!("Marked as seen onchain {:?}", claim.clone()); } - if self.inclusion_mode == ChainInclusionMode::Check { + if self.config.chain_inclusion_mode == ChainInclusionMode::Check { if let Err(err) = self .swarm_command_sender .send(SwarmCommand::SendOnchainMintFound(claim)) @@ -263,7 +260,7 @@ impl Controller { } async fn handle_event_onchain_claim(&self, peer_claim: PeerInclusionClaim) -> eyre::Result<()> { - match self.inclusion_mode { + match self.config.chain_inclusion_mode { ChainInclusionMode::Check | ChainInclusionMode::Verify => { let claim = peer_claim.claim; let premint = self @@ -286,7 +283,11 @@ impl Controller { } } ChainInclusionMode::Trust => { - if self.trusted_peers.contains(&peer_claim.from_peer_id) { + if self + .config + .trusted_peers() + .contains(&peer_claim.from_peer_id) + { self.store .mark_seen_on_chain(peer_claim.claim.clone()) .await?; @@ -304,26 +305,6 @@ impl Controller { } } } - - /// temporary solution for full state from a known other node via their http api. - /// We should migrate to syncing based on peer_id & libp2p request_response - async fn api_sync(&self, api_url: String) -> eyre::Result<()> { - let seconds = self.config.sync_lookback_hours * 60 * 60; - let from_time = SystemTime::now().sub(Duration::from_secs(seconds)); - - let from_time = chrono::Utc::now().sub(chrono::Duration::hours( - self.config.sync_lookback_hours as i64, - )); - - let url = reqwest::Url::parse_with_params( - api_url.as_str(), - &[("from", serde_json::to_string(&from_time)?)], - )?; - - // reqwest::get() - - Ok(()) - } } #[derive(Clone)] diff --git a/src/p2p.rs b/src/p2p.rs index 192ad6a..a93b238 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -15,17 +15,16 @@ use libp2p::kad::store::MemoryStore; use libp2p::kad::GetProvidersOk::FoundProviders; use libp2p::kad::{Addresses, QueryResult, RecordKey}; use libp2p::multiaddr::Protocol; -use libp2p::request_response::{Message, ProtocolSupport}; +use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport, ResponseChannel}; use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent}; use libp2p::{ gossipsub, kad, noise, request_response, tcp, yamux, Multiaddr, PeerId, StreamProtocol, }; +use serde::{Deserialize, Serialize}; use sha256::digest; use std::hash::Hasher; use std::time::Duration; use tokio::select; -use tower::load::pending_requests; -use tower_http::request_id; #[derive(NetworkBehaviour)] pub struct MintpoolBehaviour { @@ -33,7 +32,7 @@ pub struct MintpoolBehaviour { kad: kad::Behaviour, identify: libp2p::identify::Behaviour, ping: libp2p::ping::Behaviour, - request_response: request_response::cbor::Behaviour>, + request_response: request_response::cbor::Behaviour, } pub struct SwarmController { @@ -301,7 +300,12 @@ impl SwarmController { } SwarmEvent::Behaviour(MintpoolBehaviourEvent::RequestResponse(event)) => { - self.handle_request_response_event(event).await + match self.handle_request_response_event(event).await { + Ok(_) => {} + Err(err) => { + tracing::error!("Error handling request response event: {:?}", err); + } + } } SwarmEvent::Dialing { peer_id, .. } => { @@ -570,7 +574,7 @@ impl SwarmController { async fn handle_request_response_event( &mut self, - event: request_response::Event>, + event: request_response::Event, ) -> eyre::Result<()> { match event { request_response::Event::Message { peer, message } => match message { @@ -579,23 +583,12 @@ impl SwarmController { request, channel, } => { - tracing::info!( - request_id = request_id.to_string(), - "processing request for sync" - ); - let (snd, recv) = tokio::sync::oneshot::channel(); - self.event_sender - .send(P2PEvent::SyncRequest { - query: request, - channel: snd, - }) - .await?; - let result = recv.await??; + let resp = self.make_sync_response(request_id, request).await; self.swarm .behaviour_mut() .request_response - .send_response(channel, result) - .map_err(|e| eyre::eyre!("Error sending response: {:?}", e))? + .send_response(channel, resp) + .map_err(|e| eyre::eyre!("Error sending response: {:?}", e))?; } Message::Response { request_id, @@ -605,15 +598,69 @@ impl SwarmController { request_id = request_id.to_string(), "received response for sync" ); - self.event_sender - .send(P2PEvent::SyncResponse { premints: response }) - .await?; + match response { + SyncResponse::Premints(premints) => { + self.event_sender + .send(P2PEvent::SyncResponse { premints }) + .await?; + } + SyncResponse::Error(err) => { + tracing::error!( + request_id = request_id.to_string(), + error = err, + "error received to our sync request" + ); + } + } } }, other => tracing::info!("mintpool sync request/response event: {:?}", other), } Ok(()) } + + // Makes a Response for a request to sync from another node + async fn make_sync_response( + &mut self, + request_id: InboundRequestId, + request: QueryOptions, + ) -> SyncResponse { + tracing::info!( + request_id = request_id.to_string(), + "processing request for sync" + ); + match self.make_sync_response_query(request).await { + Ok(premints) => SyncResponse::Premints(premints), + Err(err) => { + tracing::error!( + request_id = request_id.to_string(), + error = err.to_string(), + "error processing sync request" + ); + SyncResponse::Error(err.to_string()) + } + } + } + + // inner function to make propagating errors that occur during query easier to work with + async fn make_sync_response_query( + &mut self, + request: QueryOptions, + ) -> eyre::Result> { + let (snd, recv) = tokio::sync::oneshot::channel(); + self.event_sender + .send(P2PEvent::SyncRequest { + query: request, + channel: snd, + }) + .await + .map_err(|_| eyre::eyre!("Controller error"))?; + let result = recv + .await + .map_err(|_| eyre::eyre!("Channel error"))? + .map_err(|_| eyre::eyre!("Query error"))?; + Ok(result) + } } fn gossipsub_message_id(message: &gossipsub::Message) -> gossipsub::MessageId { @@ -643,6 +690,12 @@ pub struct NetworkState { pub all_external_addresses: Vec, } +#[derive(Debug, Serialize, Deserialize)] +pub enum SyncResponse { + Premints(Vec), + Error(String), +} + fn announce_topic() -> gossipsub::IdentTopic { gossipsub::IdentTopic::new("mintpool::announce") } diff --git a/tests/api_test.rs b/tests/api_test.rs index afdf137..59742dd 100644 --- a/tests/api_test.rs +++ b/tests/api_test.rs @@ -29,7 +29,7 @@ mod api_test { async fn make_test_router(config: &Config) -> Router { let mut rules = RulesEngine::new(config); rules.add_default_rules(); - let ctl = start_p2p_services(config, rules).await.unwrap(); + let ctl = start_p2p_services(config.clone(), rules).await.unwrap(); let router = api::router_with_defaults(config); let state = AppState::from(config, ctl.clone()).await; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 03a06a0..f9a9f36 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -44,7 +44,7 @@ pub mod mintpool_build { let config = make_config(start_port + i, peer_limit); let ctl = mintpool::run::start_p2p_services( - &config, + config.clone(), RulesEngine::new_with_default_rules(&config), ) .await diff --git a/tests/e2e_test.rs b/tests/e2e_test.rs index 607d8a6..bcdb328 100644 --- a/tests/e2e_test.rs +++ b/tests/e2e_test.rs @@ -57,7 +57,7 @@ async fn test_zora_premint_v2_e2e() { // set this so CHAINS will use the anvil rpc rather than the one in chains.json env::set_var("CHAIN_7777777_RPC_WSS", anvil.ws_endpoint()); - let ctl = run::start_p2p_services(&config, RulesEngine::new_with_default_rules(&config)) + let ctl = run::start_p2p_services(config.clone(), RulesEngine::new_with_default_rules(&config)) .await .unwrap(); run::start_watch_chain::(&config, ctl.clone()).await; @@ -208,14 +208,20 @@ async fn test_verify_e2e() { // Start 3 nodes, one in check, one in verify, one in trust (trusts node 1) // ============================================================================================ - let ctl1 = run::start_p2p_services(&config1, RulesEngine::new_with_default_rules(&config1)) - .await - .unwrap(); + let ctl1 = run::start_p2p_services( + config1.clone(), + RulesEngine::new_with_default_rules(&config1), + ) + .await + .unwrap(); run::start_watch_chain::(&config1, ctl1.clone()).await; - let ctl2 = run::start_p2p_services(&config2, RulesEngine::new_with_default_rules(&config2)) - .await - .unwrap(); + let ctl2 = run::start_p2p_services( + config2.clone(), + RulesEngine::new_with_default_rules(&config2), + ) + .await + .unwrap(); let node_info = ctl1.get_node_info().await.unwrap(); @@ -225,9 +231,12 @@ async fn test_verify_e2e() { config3.chain_inclusion_mode = ChainInclusionMode::Trust; config3.trusted_peers = Some(node_info.peer_id.to_string()); - let ctl3 = run::start_p2p_services(&config3, RulesEngine::new_with_default_rules(&config3)) - .await - .unwrap(); + let ctl3 = run::start_p2p_services( + config3.clone(), + RulesEngine::new_with_default_rules(&config3), + ) + .await + .unwrap(); connect_all_to_first(vec![ctl1.clone(), ctl2.clone(), ctl3.clone()]).await;