diff --git a/src/controller.rs b/src/controller.rs index 9e9c3e7..14b69c1 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,7 +1,9 @@ use crate::chain::inclusion_claim_correct; use crate::config::{ChainInclusionMode, Config}; -use chrono::Utc; +use chrono::{DateTime, Utc}; use eyre::WrapErr; +use futures_ticker::Ticker; +use futures_util::StreamExt; use libp2p::PeerId; use sqlx::SqlitePool; use std::ops::Sub; @@ -34,6 +36,9 @@ pub enum SwarmCommand { channel: oneshot::Sender, }, SendOnchainMintFound(InclusionClaim), + Sync { + query: QueryOptions, + }, } /// Event types that may be received from the p2p swarm that need to be handled by the controller @@ -81,6 +86,7 @@ pub struct Controller { external_commands: mpsc::Receiver, store: PremintStorage, rules: RulesEngine, + sync_ticker: Ticker, config: Config, } @@ -94,12 +100,17 @@ impl Controller { store: PremintStorage, rules: RulesEngine, ) -> Self { + // sync every 60 minutes, also sync 30 seconds after startup (gives some time to connect to peers) + let sync_ticker = + Ticker::new_with_next(Duration::from_secs(60 * 60), Duration::from_secs(30)); + Self { swarm_command_sender, swarm_event_receiver, external_commands, store, rules, + sync_ticker, config, } } @@ -115,10 +126,32 @@ impl Controller { Some(event) = self.swarm_event_receiver.recv() => { self.handle_event(event).await; } + _ = self.sync_ticker.next() => { + self.do_sync().await; + } } } } + async fn do_sync(&self) { + let now = SystemTime::now(); + let then = now - Duration::from_secs(60 * 60 * self.config.sync_lookback_hours); + + let query = QueryOptions { + chain_id: None, + kind: None, + collection_address: None, + creator_address: None, + from: Some(DateTime::from(then)), + to: Some(DateTime::from(now)), + }; + + self.swarm_command_sender + .send(SwarmCommand::Sync { query }) + .await + .expect("Error sending sync command to swarm"); + } + pub async fn handle_event(&self, event: P2PEvent) { match event { P2PEvent::NetworkState(network_state) => { diff --git a/src/p2p.rs b/src/p2p.rs index a93b238..5747652 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -1,10 +1,6 @@ -use crate::config::Config; -use crate::controller::{P2PEvent, SwarmCommand}; -use crate::storage::QueryOptions; -use crate::types::{ - claims_topic_hashes, InclusionClaim, MintpoolNodeInfo, PeerInclusionClaim, Premint, - PremintName, PremintTypes, -}; +use std::hash::Hasher; +use std::time::Duration; + use eyre::WrapErr; use futures_ticker::Ticker; use libp2p::core::ConnectedPoint; @@ -20,12 +16,19 @@ use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent}; use libp2p::{ gossipsub, kad, noise, request_response, tcp, yamux, Multiaddr, PeerId, StreamProtocol, }; +use rand::prelude::SliceRandom; use serde::{Deserialize, Serialize}; use sha256::digest; -use std::hash::Hasher; -use std::time::Duration; use tokio::select; +use crate::config::Config; +use crate::controller::{P2PEvent, SwarmCommand}; +use crate::storage::QueryOptions; +use crate::types::{ + claims_topic_hashes, InclusionClaim, MintpoolNodeInfo, PeerInclusionClaim, Premint, + PremintName, PremintTypes, +}; + #[derive(NetworkBehaviour)] pub struct MintpoolBehaviour { gossipsub: gossipsub::Behaviour, @@ -219,6 +222,7 @@ impl SwarmController { tracing::error!("Error broadcasting claim: {:?}", err); } } + SwarmCommand::Sync { query } => self.do_sync(query).await, } } @@ -572,6 +576,28 @@ impl SwarmController { } } + async fn do_sync(&mut self, query: QueryOptions) { + // select random peer + let state = self.make_network_state(); + + let peer_id = state + .gossipsub_peers + .choose(&mut rand::thread_rng()) + .cloned(); + + if let Some(peer_id) = peer_id { + let id = self + .swarm + .behaviour_mut() + .request_response + .send_request(&peer_id, query); + + tracing::info!(request_id = id.to_string(), "sent sync request"); + } else { + tracing::info!("No peers to sync with"); + } + } + async fn handle_request_response_event( &mut self, event: request_response::Event,