Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sync lookback hours and request-response behavior #59

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ libp2p = { version = "0.53.2", features = [
"identify",
"ping",
"dns",
"request-response",
"cbor"
] }
tokio = { version = "1.36.0", features = ["full"] }
eyre = "0.6.12"
Expand Down
1 change: 1 addition & 0 deletions docs/OPERATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ INTERACTIVE: bool (false) - If true, interactive repl will run
ENABLE_RPC: bool (true) - If true, rpc will be used for rules evaluation
ADMIN_API_SECRET: Option<String> (None) - Secret key used to access admin api routes
RATE_LIMIT_RPS: u32 (2) - Rate limit requests per second for the http api
SYNC_LOOKBACK_HOURS: u64 (6) - Number of hours to look back for syncing premints from another node
```

#### Logging
Expand Down
2 changes: 1 addition & 1 deletion examples/extra_rules_and_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> eyre::Result<()> {
rules.add_rule(metadata_rule!(only_odd_token_ids));
rules.add_rule(Box::new(MustStartWithA {}));

let ctl = mintpool::run::start_p2p_services(&config, rules).await?;
let ctl = mintpool::run::start_p2p_services(config.clone(), rules).await?;

// Add some custom routes in addition to the defaults. You could also add middleware or anything else you can do with axum.
let mut router = mintpool::api::router_with_defaults(&config);
Expand Down
8 changes: 7 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rand::Rng;
use crate::chain_list::CHAINS;
use crate::types::PremintName;

#[derive(Envconfig, Debug)]
#[derive(Envconfig, Debug, Clone)]
pub struct Config {
// Used to derive an ed25519 keypair for node identity
// Should be 32 bytes of random hex.
Expand Down Expand Up @@ -75,6 +75,9 @@ pub struct Config {
// If set to chain, will check the Zora Network MintpoolTrusted nodes contract for boot nodes
#[envconfig(from = "BOOT_NODES", default = "chain")]
pub boot_nodes: BootNodes,

#[envconfig(from = "SYNC_LOOKBACK_HOURS", default = "6")]
pub sync_lookback_hours: u64,
}

impl Config {
Expand All @@ -100,6 +103,7 @@ impl Config {
admin_api_secret: None,
rate_limit_rps: 1,
boot_nodes: BootNodes::None,
sync_lookback_hours: 6,
}
}
}
Expand Down Expand Up @@ -227,6 +231,7 @@ mod test {
admin_api_secret: None,
rate_limit_rps: 1,
boot_nodes: BootNodes::Chain,
sync_lookback_hours: 0,
};

let names = config.premint_names();
Expand Down Expand Up @@ -255,6 +260,7 @@ mod test {
admin_api_secret: None,
rate_limit_rps: 1,
boot_nodes: BootNodes::None,
sync_lookback_hours: 0,
};

let names = config.premint_names();
Expand Down
43 changes: 34 additions & 9 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use crate::chain::inclusion_claim_correct;
use crate::config::{ChainInclusionMode, Config};
use chrono::Utc;
use eyre::WrapErr;
use libp2p::PeerId;
use sqlx::SqlitePool;
use std::ops::Sub;
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::{mpsc, oneshot};

use crate::p2p::NetworkState;
use crate::rules::{Results, RulesEngine};
use crate::storage::{PremintStorage, Reader, Writer};
use crate::storage::{list_all_with_options, PremintStorage, QueryOptions, Reader, Writer};
use crate::types::{
InclusionClaim, MintpoolNodeInfo, PeerInclusionClaim, PremintName, PremintTypes,
};
Expand Down Expand Up @@ -38,6 +41,13 @@ pub enum P2PEvent {
NetworkState(NetworkState),
PremintReceived(PremintTypes),
MintSeenOnchain(PeerInclusionClaim),
SyncRequest {
query: QueryOptions,
channel: oneshot::Sender<eyre::Result<Vec<PremintTypes>>>,
},
SyncResponse {
premints: Vec<PremintTypes>,
},
}

pub enum ControllerCommands {
Expand Down Expand Up @@ -71,13 +81,13 @@ pub struct Controller {
external_commands: mpsc::Receiver<ControllerCommands>,
store: PremintStorage,
rules: RulesEngine<PremintStorage>,
trusted_peers: Vec<PeerId>,
inclusion_mode: ChainInclusionMode,

config: Config,
}

impl Controller {
pub fn new(
config: &Config,
config: Config,
swarm_command_sender: mpsc::Sender<SwarmCommand>,
swarm_event_receiver: mpsc::Receiver<P2PEvent>,
external_commands: mpsc::Receiver<ControllerCommands>,
Expand All @@ -90,8 +100,7 @@ impl Controller {
external_commands,
store,
rules,
trusted_peers: config.trusted_peers(),
inclusion_mode: config.chain_inclusion_mode,
config,
}
}

Expand Down Expand Up @@ -127,6 +136,18 @@ impl Controller {
tracing::error!("Error handling onchain claim: {:?}", err);
}
}
P2PEvent::SyncRequest { query, channel } => {
let events = list_all_with_options(&self.store.db(), &query).await;
if let Err(Err(err)) = channel.send(events) {
tracing::error!("Error sending sync response: {:?}", err);
}
tracing::info!(histogram.sync_request_processed = 1);
}
P2PEvent::SyncResponse { premints } => {
for premint in premints {
let _ = self.validate_and_insert(premint).await;
}
}
}
}

Expand Down Expand Up @@ -205,7 +226,7 @@ impl Controller {
tracing::debug!("Marked as seen onchain {:?}", claim.clone());
}

if self.inclusion_mode == ChainInclusionMode::Check {
if self.config.chain_inclusion_mode == ChainInclusionMode::Check {
if let Err(err) = self
.swarm_command_sender
.send(SwarmCommand::SendOnchainMintFound(claim))
Expand Down Expand Up @@ -239,7 +260,7 @@ impl Controller {
}

async fn handle_event_onchain_claim(&self, peer_claim: PeerInclusionClaim) -> eyre::Result<()> {
match self.inclusion_mode {
match self.config.chain_inclusion_mode {
ChainInclusionMode::Check | ChainInclusionMode::Verify => {
let claim = peer_claim.claim;
let premint = self
Expand All @@ -262,7 +283,11 @@ impl Controller {
}
}
ChainInclusionMode::Trust => {
if self.trusted_peers.contains(&peer_claim.from_peer_id) {
if self
.config
.trusted_peers()
.contains(&peer_claim.from_peer_id)
{
self.store
.mark_seen_on_chain(peer_claim.claim.clone())
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() -> eyre::Result<()> {

let mut rules = RulesEngine::new(&config);
rules.add_default_rules();
let ctl = start_p2p_services(&config, rules).await?;
let ctl = start_p2p_services(config.clone(), rules).await?;

let router = api::router_with_defaults(&config).merge(metrics_router);
api::start_api(&config, ctl.clone(), router, true).await?;
Expand Down
Loading
Loading