Skip to content

Commit

Permalink
Implement P2P and controller layers for request / response based sync
Browse files Browse the repository at this point in the history
  • Loading branch information
erikreppel authored and ligustah committed Apr 26, 2024
1 parent 8cb8929 commit 3435ec1
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 12 deletions.
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ libp2p = { version = "0.53.2", features = [
"identify",
"ping",
"dns",
"request-response",
"cbor"
] }
tokio = { version = "1.36.0", features = ["full"] }
eyre = "0.6.12"
Expand Down
2 changes: 1 addition & 1 deletion examples/extra_rules_and_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> eyre::Result<()> {
rules.add_rule(metadata_rule!(only_odd_token_ids));
rules.add_rule(Box::new(MustStartWithA {}));

let ctl = mintpool::run::start_p2p_services(&config, rules).await?;
let ctl = mintpool::run::start_p2p_services(config.clone(), rules).await?;

// Add some custom routes in addition to the defaults. You could also add middleware or anything else you can do with axum.
let mut router = mintpool::api::router_with_defaults(&config);
Expand Down
3 changes: 2 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rand::Rng;
use crate::chain_list::CHAINS;
use crate::types::PremintName;

#[derive(Envconfig, Debug)]
#[derive(Envconfig, Debug, Clone)]
pub struct Config {
// Used to derive an ed25519 keypair for node identity
// Should be 32 bytes of random hex.
Expand Down Expand Up @@ -103,6 +103,7 @@ impl Config {
admin_api_secret: None,
rate_limit_rps: 1,
boot_nodes: BootNodes::None,
sync_lookback_hours: 6,
}
}
}
Expand Down
21 changes: 20 additions & 1 deletion src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::{mpsc, oneshot};

use crate::p2p::NetworkState;
use crate::rules::{Results, RulesEngine};
use crate::storage::{PremintStorage, Reader, Writer};
use crate::storage::{list_all_with_options, PremintStorage, QueryOptions, Reader, Writer};
use crate::types::{
InclusionClaim, MintpoolNodeInfo, PeerInclusionClaim, PremintName, PremintTypes,
};
Expand Down Expand Up @@ -41,6 +41,13 @@ pub enum P2PEvent {
NetworkState(NetworkState),
PremintReceived(PremintTypes),
MintSeenOnchain(PeerInclusionClaim),
SyncRequest {
query: QueryOptions,
channel: oneshot::Sender<eyre::Result<Vec<PremintTypes>>>,
},
SyncResponse {
premints: Vec<PremintTypes>,
},
}

pub enum ControllerCommands {
Expand Down Expand Up @@ -132,6 +139,18 @@ impl Controller {
tracing::error!("Error handling onchain claim: {:?}", err);
}
}
P2PEvent::SyncRequest { query, channel } => {
let events = list_all_with_options(&self.store.db(), &query).await;
if let Err(Err(err)) = channel.send(events) {
tracing::error!("Error sending sync response: {:?}", err);
}
tracing::info!(histogram.sync_request_processed = 1);
}
P2PEvent::SyncResponse { premints } => {
for premint in premints {
let _ = self.validate_and_insert(premint).await;
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() -> eyre::Result<()> {

let mut rules = RulesEngine::new(&config);
rules.add_default_rules();
let ctl = start_p2p_services(&config, rules).await?;
let ctl = start_p2p_services(config.clone(), rules).await?;

let router = api::router_with_defaults(&config).merge(metrics_router);
api::start_api(&config, ctl.clone(), router, true).await?;
Expand Down
67 changes: 66 additions & 1 deletion src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::config::Config;
use crate::controller::{P2PEvent, SwarmCommand};
use crate::storage::QueryOptions;
use crate::types::{
claims_topic_hashes, InclusionClaim, MintpoolNodeInfo, PeerInclusionClaim, Premint,
PremintName, PremintTypes,
Expand All @@ -14,19 +15,25 @@ use libp2p::kad::store::MemoryStore;
use libp2p::kad::GetProvidersOk::FoundProviders;
use libp2p::kad::{Addresses, QueryResult, RecordKey};
use libp2p::multiaddr::Protocol;
use libp2p::request_response::{Message, ProtocolSupport};
use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent};
use libp2p::{gossipsub, kad, noise, tcp, yamux, Multiaddr, PeerId};
use libp2p::{
gossipsub, kad, noise, request_response, tcp, yamux, Multiaddr, PeerId, StreamProtocol,
};
use sha256::digest;
use std::hash::Hasher;
use std::time::Duration;
use tokio::select;
use tower::load::pending_requests;
use tower_http::request_id;

#[derive(NetworkBehaviour)]
pub struct MintpoolBehaviour {
gossipsub: gossipsub::Behaviour,
kad: kad::Behaviour<MemoryStore>,
identify: libp2p::identify::Behaviour,
ping: libp2p::ping::Behaviour,
request_response: request_response::cbor::Behaviour<QueryOptions, Vec<PremintTypes>>,
}

pub struct SwarmController {
Expand Down Expand Up @@ -120,6 +127,13 @@ impl SwarmController {
public_key,
)),
ping: libp2p::ping::Behaviour::new(libp2p::ping::Config::new()),
request_response: request_response::cbor::Behaviour::new(
[(
StreamProtocol::new("/mintpool-sync/1"),
ProtocolSupport::Full,
)],
request_response::Config::default(),
),
}
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
Expand Down Expand Up @@ -286,6 +300,10 @@ impl SwarmController {
}
}

SwarmEvent::Behaviour(MintpoolBehaviourEvent::RequestResponse(event)) => {
self.handle_request_response_event(event).await
}

SwarmEvent::Dialing { peer_id, .. } => {
tracing::info!("Dialing: {:?}", peer_id)
}
Expand Down Expand Up @@ -549,6 +567,53 @@ impl SwarmController {
all_external_addresses: self.swarm.external_addresses().cloned().collect(),
}
}

async fn handle_request_response_event(
&mut self,
event: request_response::Event<QueryOptions, Vec<PremintTypes>>,
) -> eyre::Result<()> {
match event {
request_response::Event::Message { peer, message } => match message {
Message::Request {
request_id,
request,
channel,
} => {
tracing::info!(
request_id = request_id.to_string(),
"processing request for sync"
);
let (snd, recv) = tokio::sync::oneshot::channel();
self.event_sender
.send(P2PEvent::SyncRequest {
query: request,
channel: snd,
})
.await?;
let result = recv.await??;
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, result)
.map_err(|e| eyre::eyre!("Error sending response: {:?}", e))?
}
Message::Response {
request_id,
response,
} => {
tracing::info!(
request_id = request_id.to_string(),
"received response for sync"
);
self.event_sender
.send(P2PEvent::SyncResponse { premints: response })
.await?;
}
},
other => tracing::info!("mintpool sync request/response event: {:?}", other),
}
Ok(())
}
}

fn gossipsub_message_id(message: &gossipsub::Message) -> gossipsub::MessageId {
Expand Down
10 changes: 5 additions & 5 deletions src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ use crate::types::Premint;
/// All interactions with the controller should be done through `ControllerInterface` for memory safety.
/// Recommended to use this function when extending mintpool as a library, but if you're feeling bold you can reproduce what its doing.
pub async fn start_p2p_services(
config: &Config,
config: Config,
rules: RulesEngine<PremintStorage>,
) -> eyre::Result<ControllerInterface> {
let id_keys = make_keypair(config)
let id_keys = make_keypair(&config)
.expect("Failed to create keypair, node cannot start. Confirm secret is 32 bytes of hex (0x + 64 hex chars)");
let (event_send, event_recv) = tokio::sync::mpsc::channel(1024);
let (swrm_cmd_send, swrm_recv) = tokio::sync::mpsc::channel(1024);
let (ext_cmd_send, ext_cmd_recv) = tokio::sync::mpsc::channel(1024);

let store = PremintStorage::new(config).await;
let store = PremintStorage::new(&config).await;

let mut swarm_controller = SwarmController::new(id_keys, config, swrm_recv, event_send);
let mut swarm_controller = SwarmController::new(id_keys, &config, swrm_recv, event_send);
let mut controller = Controller::new(
&config,
config.clone(),
swrm_cmd_send,
event_recv,
ext_cmd_recv,
Expand Down
4 changes: 2 additions & 2 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::types::{InclusionClaim, Premint, PremintName, PremintTypes};
use alloy_primitives::Address;
use async_trait::async_trait;
use eyre::WrapErr;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::Row;
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
Expand Down Expand Up @@ -231,7 +231,7 @@ pub async fn list_all(db: &SqlitePool) -> eyre::Result<Vec<PremintTypes>> {
Ok(premints)
}

#[derive(Deserialize)]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct QueryOptions {
pub chain_id: Option<u64>,
pub kind: Option<String>,
Expand Down

0 comments on commit 3435ec1

Please sign in to comment.