Skip to content

Commit

Permalink
Update sync lookback hours and request-response behavior (#59)
Browse files Browse the repository at this point in the history
Move to using request / response for syncing node

- changes to p2p swarm controller & controller are in
- needs kicking off sync on start that uses `config.sync_lookback_hours` on bootnodes
- needs testing
- needs periodic call to resync from a random peer
- force syncing a specific peer via admin API
  • Loading branch information
erikreppel authored Apr 26, 2024
1 parent 6bcacfb commit f56e5c8
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 32 deletions.
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ libp2p = { version = "0.53.2", features = [
"identify",
"ping",
"dns",
"request-response",
"cbor"
] }
tokio = { version = "1.36.0", features = ["full"] }
eyre = "0.6.12"
Expand Down
1 change: 1 addition & 0 deletions docs/OPERATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ INTERACTIVE: bool (false) - If true, interactive repl will run
ENABLE_RPC: bool (true) - If true, rpc will be used for rules evaluation
ADMIN_API_SECRET: Option<String> (None) - Secret key used to access admin api routes
RATE_LIMIT_RPS: u32 (2) - Rate limit requests per second for the http api
SYNC_LOOKBACK_HOURS: u64 (6) - Number of hours to look back for syncing premints from another node
```

#### Logging
Expand Down
2 changes: 1 addition & 1 deletion examples/extra_rules_and_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> eyre::Result<()> {
rules.add_rule(metadata_rule!(only_odd_token_ids));
rules.add_rule(Box::new(MustStartWithA {}));

let ctl = mintpool::run::start_p2p_services(&config, rules).await?;
let ctl = mintpool::run::start_p2p_services(config.clone(), rules).await?;

// Add some custom routes in addition to the defaults. You could also add middleware or anything else you can do with axum.
let mut router = mintpool::api::router_with_defaults(&config);
Expand Down
8 changes: 7 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rand::Rng;
use crate::chain_list::CHAINS;
use crate::types::PremintName;

#[derive(Envconfig, Debug)]
#[derive(Envconfig, Debug, Clone)]
pub struct Config {
// Used to derive an ed25519 keypair for node identity
// Should be 32 bytes of random hex.
Expand Down Expand Up @@ -75,6 +75,9 @@ pub struct Config {
// If set to chain, will check the Zora Network MintpoolTrusted nodes contract for boot nodes
#[envconfig(from = "BOOT_NODES", default = "chain")]
pub boot_nodes: BootNodes,

#[envconfig(from = "SYNC_LOOKBACK_HOURS", default = "6")]
pub sync_lookback_hours: u64,
}

impl Config {
Expand All @@ -100,6 +103,7 @@ impl Config {
admin_api_secret: None,
rate_limit_rps: 1,
boot_nodes: BootNodes::None,
sync_lookback_hours: 6,
}
}
}
Expand Down Expand Up @@ -227,6 +231,7 @@ mod test {
admin_api_secret: None,
rate_limit_rps: 1,
boot_nodes: BootNodes::Chain,
sync_lookback_hours: 0,
};

let names = config.premint_names();
Expand Down Expand Up @@ -255,6 +260,7 @@ mod test {
admin_api_secret: None,
rate_limit_rps: 1,
boot_nodes: BootNodes::None,
sync_lookback_hours: 0,
};

let names = config.premint_names();
Expand Down
43 changes: 34 additions & 9 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use crate::chain::inclusion_claim_correct;
use crate::config::{ChainInclusionMode, Config};
use chrono::Utc;
use eyre::WrapErr;
use libp2p::PeerId;
use sqlx::SqlitePool;
use std::ops::Sub;
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::{mpsc, oneshot};

use crate::p2p::NetworkState;
use crate::rules::{Results, RulesEngine};
use crate::storage::{PremintStorage, Reader, Writer};
use crate::storage::{list_all_with_options, PremintStorage, QueryOptions, Reader, Writer};
use crate::types::{
InclusionClaim, MintpoolNodeInfo, PeerInclusionClaim, PremintName, PremintTypes,
};
Expand Down Expand Up @@ -38,6 +41,13 @@ pub enum P2PEvent {
NetworkState(NetworkState),
PremintReceived(PremintTypes),
MintSeenOnchain(PeerInclusionClaim),
SyncRequest {
query: QueryOptions,
channel: oneshot::Sender<eyre::Result<Vec<PremintTypes>>>,
},
SyncResponse {
premints: Vec<PremintTypes>,
},
}

pub enum ControllerCommands {
Expand Down Expand Up @@ -71,13 +81,13 @@ pub struct Controller {
external_commands: mpsc::Receiver<ControllerCommands>,
store: PremintStorage,
rules: RulesEngine<PremintStorage>,
trusted_peers: Vec<PeerId>,
inclusion_mode: ChainInclusionMode,

config: Config,
}

impl Controller {
pub fn new(
config: &Config,
config: Config,
swarm_command_sender: mpsc::Sender<SwarmCommand>,
swarm_event_receiver: mpsc::Receiver<P2PEvent>,
external_commands: mpsc::Receiver<ControllerCommands>,
Expand All @@ -90,8 +100,7 @@ impl Controller {
external_commands,
store,
rules,
trusted_peers: config.trusted_peers(),
inclusion_mode: config.chain_inclusion_mode,
config,
}
}

Expand Down Expand Up @@ -127,6 +136,18 @@ impl Controller {
tracing::error!("Error handling onchain claim: {:?}", err);
}
}
P2PEvent::SyncRequest { query, channel } => {
let events = list_all_with_options(&self.store.db(), &query).await;
if let Err(Err(err)) = channel.send(events) {
tracing::error!("Error sending sync response: {:?}", err);
}
tracing::info!(histogram.sync_request_processed = 1);
}
P2PEvent::SyncResponse { premints } => {
for premint in premints {
let _ = self.validate_and_insert(premint).await;
}
}
}
}

Expand Down Expand Up @@ -205,7 +226,7 @@ impl Controller {
tracing::debug!("Marked as seen onchain {:?}", claim.clone());
}

if self.inclusion_mode == ChainInclusionMode::Check {
if self.config.chain_inclusion_mode == ChainInclusionMode::Check {
if let Err(err) = self
.swarm_command_sender
.send(SwarmCommand::SendOnchainMintFound(claim))
Expand Down Expand Up @@ -239,7 +260,7 @@ impl Controller {
}

async fn handle_event_onchain_claim(&self, peer_claim: PeerInclusionClaim) -> eyre::Result<()> {
match self.inclusion_mode {
match self.config.chain_inclusion_mode {
ChainInclusionMode::Check | ChainInclusionMode::Verify => {
let claim = peer_claim.claim;
let premint = self
Expand All @@ -262,7 +283,11 @@ impl Controller {
}
}
ChainInclusionMode::Trust => {
if self.trusted_peers.contains(&peer_claim.from_peer_id) {
if self
.config
.trusted_peers()
.contains(&peer_claim.from_peer_id)
{
self.store
.mark_seen_on_chain(peer_claim.claim.clone())
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() -> eyre::Result<()> {

let mut rules = RulesEngine::new(&config);
rules.add_default_rules();
let ctl = start_p2p_services(&config, rules).await?;
let ctl = start_p2p_services(config.clone(), rules).await?;

let router = api::router_with_defaults(&config).merge(metrics_router);
api::start_api(&config, ctl.clone(), router, true).await?;
Expand Down
Loading

0 comments on commit f56e5c8

Please sign in to comment.