diff --git a/crates/common/src/job.rs b/crates/common/src/job.rs index 7bdd242..408e36b 100644 --- a/crates/common/src/job.rs +++ b/crates/common/src/job.rs @@ -1,6 +1,6 @@ use crate::hash; use cairo_vm::vm::runners::cairo_pie::CairoPie; -use libp2p::PeerId; +use libp2p::{kad, PeerId}; use serde::{Deserialize, Serialize}; use starknet::signers::{SigningKey, VerifyingKey}; use starknet_crypto::{poseidon_hash_many, FieldElement, Signature}; @@ -101,17 +101,10 @@ impl Display for Job { } } -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct JobBid { pub identity: PeerId, - pub job_hash: u64, - pub price: u64, -} - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -pub struct JobDelegation { - pub identity: PeerId, - pub job: Job, + pub job_key: kad::RecordKey, pub price: u64, } diff --git a/crates/common/src/job_trace.rs b/crates/common/src/job_trace.rs index ac674a4..f4421c7 100644 --- a/crates/common/src/job_trace.rs +++ b/crates/common/src/job_trace.rs @@ -1,7 +1,9 @@ use crate::hash; +use libp2p::kad; use std::{ fmt::Display, hash::{DefaultHasher, Hash, Hasher}, + mem::ManuallyDrop, }; use tempfile::NamedTempFile; @@ -13,11 +15,38 @@ use tempfile::NamedTempFile; #[derive(Debug)] pub struct JobTrace { - pub job_hash: u64, + pub job_key: kad::RecordKey, pub air_public_input: NamedTempFile, // Temporary file containing the public input pub air_private_input: NamedTempFile, // Temporary file containing the private input; memory and trace files must exist for this to be valid - pub memory: NamedTempFile, // Temporary file containing memory data (required for air_private_input validity) - pub trace: NamedTempFile, // Temporary file containing trace data (required for air_private_input validity) + pub memory: ManuallyDrop, // Temporary file containing memory data (required for air_private_input validity) + pub trace: ManuallyDrop, // Temporary file containing trace data (required for air_private_input validity) +} + +impl JobTrace { + pub fn new( + job_key: kad::RecordKey, + air_public_input: NamedTempFile, + air_private_input: NamedTempFile, + memory: NamedTempFile, + trace: NamedTempFile, + ) -> Self { + Self { + job_key, + air_public_input, + air_private_input, + memory: ManuallyDrop::new(memory), + trace: ManuallyDrop::new(trace), + } + } +} + +impl Drop for JobTrace { + fn drop(&mut self) { + unsafe { + ManuallyDrop::drop(&mut self.memory); + ManuallyDrop::drop(&mut self.trace); + } + } } impl Hash for JobTrace { diff --git a/crates/common/src/job_witness.rs b/crates/common/src/job_witness.rs index 3ccb237..9446be4 100644 --- a/crates/common/src/job_witness.rs +++ b/crates/common/src/job_witness.rs @@ -1,4 +1,5 @@ use crate::hash; +use libp2p::kad; use serde::{Deserialize, Serialize}; use std::{ fmt::Display, @@ -14,7 +15,7 @@ use std::{ #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct JobWitness { - pub job_hash: u64, + pub job_key: kad::RecordKey, pub proof: Vec, } diff --git a/crates/delegator/src/api.rs b/crates/delegator/src/api.rs index fe57936..0418192 100644 --- a/crates/delegator/src/api.rs +++ b/crates/delegator/src/api.rs @@ -6,7 +6,7 @@ use axum::{ }; use futures::StreamExt; use hyper::StatusCode; -use libp2p::PeerId; +use libp2p::{kad, PeerId}; use serde::{Deserialize, Serialize}; use std::hash::{DefaultHasher, Hash, Hasher}; use std::{io, time::Duration}; @@ -19,7 +19,7 @@ use crate::delegator::DelegatorEvent; #[derive(Debug)] pub struct ServerState { pub delegate_tx: mpsc::Sender, - pub events_rx: broadcast::Receiver<(u64, DelegatorEvent)>, + pub events_rx: broadcast::Receiver<(kad::RecordKey, DelegatorEvent)>, } impl Clone for ServerState { @@ -39,7 +39,7 @@ pub struct DelegateRequest { #[derive(Debug, Serialize)] pub struct DelegateResponse { - job_hash: String, + job_hash: kad::RecordKey, } pub async fn deletage_handler( @@ -47,14 +47,14 @@ pub async fn deletage_handler( Json(input): Json, ) -> Result, StatusCode> { let job_data = JobData::new(input.pie); - let job_data_hash = hash!(&job_data); + let job_data_hash = kad::RecordKey::new(&hash!(job_data).to_be_bytes()); state.delegate_tx.send(job_data).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - Ok(Json(DelegateResponse { job_hash: job_data_hash.to_string() })) + Ok(Json(DelegateResponse { job_hash: job_data_hash })) } #[derive(Debug, Deserialize)] pub struct JobEventsRequest { - job_hash: String, + job_hash: kad::RecordKey, } #[derive(Debug, Serialize)] @@ -70,8 +70,7 @@ pub async fn job_events_handler( Query(input): Query, ) -> Sse>> { let stream = stream! { - let job_hash = input.job_hash.parse::() - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + let job_hash = input.job_hash; loop { tokio::select! { Ok((hash, event)) = state.events_rx.recv() => { diff --git a/crates/delegator/src/bid_queue.rs b/crates/delegator/src/bid_queue.rs index f10fa60..c958442 100644 --- a/crates/delegator/src/bid_queue.rs +++ b/crates/delegator/src/bid_queue.rs @@ -1,7 +1,7 @@ -use libp2p::PeerId; +use libp2p::{kad, PeerId}; use std::{collections::BTreeMap, future::Future, pin::Pin, time::Duration}; use tokio::{sync::mpsc, time::sleep}; -use zetina_common::{job::Job, process::Process}; +use zetina_common::process::Process; pub struct BidQueue {} @@ -19,17 +19,21 @@ impl Default for BidQueue { impl BidQueue { pub fn run<'future>( - job: Job, + job_hash: kad::RecordKey, ) -> ( - Process<'future, Result<(Job, BTreeMap>), BidControllerError>>, + Process<'future, Result<(kad::RecordKey, BTreeMap>), BidControllerError>>, mpsc::Sender<(u64, PeerId)>, ) { let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10); let (bid_tx, mut bid_rx) = mpsc::channel::<(u64, PeerId)>(10); let future: Pin< Box< - dyn Future>), BidControllerError>> - + Send + dyn Future< + Output = Result< + (kad::RecordKey, BTreeMap>), + BidControllerError, + >, + > + Send + '_, >, > = Box::pin(async move { @@ -53,7 +57,7 @@ impl BidQueue { } } _ = sleep(duration) => { - break Ok((job, bids.take().ok_or(BidControllerError::BidsTerminated)?)) + break Ok((job_hash, bids.take().ok_or(BidControllerError::BidsTerminated)?)) } _ = terminate_rx.recv() => { break Err(BidControllerError::TaskTerminated); diff --git a/crates/delegator/src/delegator.rs b/crates/delegator/src/delegator.rs index 0b2798d..6b3fad5 100644 --- a/crates/delegator/src/delegator.rs +++ b/crates/delegator/src/delegator.rs @@ -1,7 +1,7 @@ use crate::bid_queue::{BidControllerError, BidQueue}; use futures::stream::FuturesUnordered; use futures::Stream; -use libp2p::{gossipsub, PeerId}; +use libp2p::{gossipsub, kad, PeerId}; use starknet::signers::SigningKey; use std::collections::{BTreeMap, HashMap}; use std::hash::{DefaultHasher, Hash, Hasher}; @@ -13,10 +13,10 @@ use tokio_stream::StreamExt; use tracing::{error, info}; use zetina_common::graceful_shutdown::shutdown_signal; use zetina_common::hash; -use zetina_common::job::{Job, JobData, JobDelegation}; +use zetina_common::job::{Job, JobBid, JobData}; use zetina_common::process::Process; use zetina_peer::swarm::{ - DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic, + DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic, }; pub struct Delegator { @@ -27,28 +27,28 @@ impl Delegator { pub fn new( mut swarm_events: Pin + Send>>, gossipsub_tx: Sender, + kademlia_tx: Sender, mut delegate_rx: mpsc::Receiver, - events_tx: broadcast::Sender<(u64, DelegatorEvent)>, + events_tx: broadcast::Sender<(kad::RecordKey, DelegatorEvent)>, signing_key: SigningKey, ) -> Self { Self { handle: Some(tokio::spawn(async move { let mut job_bid_scheduler = FuturesUnordered::< - Process>), BidControllerError>>, + Process< + Result<(kad::RecordKey, BTreeMap>), BidControllerError>, + >, >::new(); - let mut job_hash_store = HashMap::>::new(); + let mut job_hash_store = + HashMap::>::new(); loop { tokio::select! { Some(job_data) = delegate_rx.recv() => { let job = Job::try_from_job_data(job_data, &signing_key); - gossipsub_tx.send(GossipsubMessage { - topic: Topic::Market.into(), - data: serde_json::to_vec(&MarketMessage::Job(job.to_owned()))? - }).await?; - info!("Propagated job: {} for bidding", hash!(job)); - let (process, bid_tx) = BidQueue::run(job.to_owned()); - job_bid_scheduler.push(process); - job_hash_store.insert(hash!(job), bid_tx); + let job_key = kad::RecordKey::new(&hash!(job).to_be_bytes()); + kademlia_tx.send(KademliaMessage::PUT( + (job_key, serde_json::to_vec(&job)?) + )).await?; }, Some(event) = swarm_events.next() => { match event { @@ -56,10 +56,10 @@ impl Delegator { if message.topic == Topic::Market.into() { match serde_json::from_slice::(&message.data)? { MarketMessage::JobBid(job_bid) => { - if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_hash) { - info!("Received job bid: {} price: {} from: {}", job_bid.job_hash, job_bid.price, job_bid.identity); + if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_key) { + info!("Received job bid: {} price: {} from: {}", hex::encode(&job_bid.job_key), job_bid.price, job_bid.identity); bid_tx.send((job_bid.price, job_bid.identity)).await?; - events_tx.send((job_bid.job_hash, DelegatorEvent::BidReceived(job_bid.identity)))?; + events_tx.send((job_bid.job_key, DelegatorEvent::BidReceived(job_bid.identity)))?; } } _ => {} @@ -68,27 +68,42 @@ impl Delegator { if message.topic == Topic::Delegation.into() { match serde_json::from_slice::(&message.data)? { DelegationMessage::Finished(job_witness) => { - info!("Received finished job: {}", job_witness.job_hash); - events_tx.send((job_witness.job_hash, DelegatorEvent::Finished(job_witness.proof)))?; + info!("Received finished job: {}", hex::encode(&job_witness.job_key)); + events_tx.send((job_witness.job_key, DelegatorEvent::Finished(job_witness.proof)))?; } _ => {} } } + }, + PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..}) => { + match result { + kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => { + gossipsub_tx.send(GossipsubMessage { + topic: Topic::Market.into(), + data: serde_json::to_vec(&MarketMessage::JobBidPropagation(key.to_owned()))? + }).await?; + info!("Propagated job: {} for bidding", hex::encode(&key)); + let (process, bid_tx) = BidQueue::run(key.to_owned()); + job_bid_scheduler.push(process); + job_hash_store.insert(key, bid_tx); + } + _ => {} + } } _ => {} } } - Some(Ok((job, bids))) = job_bid_scheduler.next() => { - job_hash_store.remove(&hash!(job)); + Some(Ok((job_key, bids))) = job_bid_scheduler.next() => { + job_hash_store.remove(&job_key); let bid = bids.first_key_value().unwrap(); let price = *bid.0; let identity = *bid.1.first().unwrap(); - info!("Job {} delegated to best bidder: {}", hash!(job), identity); + info!("Job {} delegated to best bidder: {}", hex::encode(&job_key), identity); gossipsub_tx.send(GossipsubMessage { topic: Topic::Delegation.into(), - data: serde_json::to_vec(&DelegationMessage::Delegate(JobDelegation{identity, job: job.to_owned(), price}))? + data: serde_json::to_vec(&DelegationMessage::Delegate(JobBid{identity, job_key: job_key.to_owned(), price}))? }).await?; - events_tx.send((hash!(job), DelegatorEvent::Delegated(identity)))?; + events_tx.send((job_key, DelegatorEvent::Delegated(identity)))?; } _ = shutdown_signal() => { break @@ -125,8 +140,13 @@ pub enum Error { #[error("mpsc_send_error GossipsubMessage")] MpscSendErrorGossipsubMessage(#[from] mpsc::error::SendError), + #[error("mpsc_send_error KademliaMessage")] + MpscSendErrorKademliaMessage(#[from] mpsc::error::SendError), + #[error("mpsc_send_error DelegatorEvent")] - BreadcastSendErrorDelegatorEvent(#[from] broadcast::error::SendError<(u64, DelegatorEvent)>), + BreadcastSendErrorDelegatorEvent( + #[from] broadcast::error::SendError<(kad::RecordKey, DelegatorEvent)>, + ), #[error("mpsc_send_error JobBid")] MpscSendErrorJobBid(#[from] mpsc::error::SendError<(u64, PeerId)>), diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index e2b50cc..d27d713 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -10,7 +10,7 @@ use axum::{ }; use clap::Parser; use delegator::{Delegator, DelegatorEvent}; -use libp2p::Multiaddr; +use libp2p::{kad, Multiaddr}; use starknet::{core::types::FieldElement, signers::SigningKey}; use std::{str::FromStr, time::Duration}; use tokio::{ @@ -24,7 +24,7 @@ use tower_http::{ }; use tracing_subscriber::EnvFilter; use zetina_common::{graceful_shutdown::shutdown_signal, job::JobData}; -use zetina_peer::swarm::{GossipsubMessage, SwarmRunner}; +use zetina_peer::swarm::{GossipsubMessage, KademliaMessage, SwarmRunner}; #[derive(Parser)] struct Cli { @@ -71,11 +71,12 @@ async fn main() -> Result<(), Box> { .unwrap(); let (gossipsub_tx, gossipsub_rx) = mpsc::channel::(100); - let (delegate_tx, delegate_rx) = mpsc::channel::(100); - let (events_tx, events_rx) = broadcast::channel::<(u64, DelegatorEvent)>(100); - let swarm_events = swarm_runner.run(gossipsub_rx); + let (kademlia_tx, kademlia_rx) = mpsc::channel::(100); + let swarm_events = swarm_runner.run(gossipsub_rx, kademlia_rx); - Delegator::new(swarm_events, gossipsub_tx, delegate_rx, events_tx, signing_key); + let (delegate_tx, delegate_rx) = mpsc::channel::(100); + let (events_tx, events_rx) = broadcast::channel::<(kad::RecordKey, DelegatorEvent)>(100); + Delegator::new(swarm_events, gossipsub_tx, kademlia_tx, delegate_rx, events_tx, signing_key); // Create a `TcpListener` using tokio. let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 753d226..16c5fd8 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -1,18 +1,19 @@ use futures::{stream::FuturesUnordered, Stream}; -use libp2p::{gossipsub, PeerId}; -use std::hash::{DefaultHasher, Hash, Hasher}; +use libp2p::{gossipsub, kad, PeerId}; +use std::collections::HashSet; use std::pin::Pin; use thiserror::Error; use tokio::sync::mpsc; use tokio::{sync::mpsc::Sender, task::JoinHandle}; use tokio_stream::StreamExt; use tracing::{error, info}; +use zetina_common::job::Job; use zetina_common::{ - graceful_shutdown::shutdown_signal, hash, job::JobBid, job_trace::JobTrace, - job_witness::JobWitness, process::Process, + graceful_shutdown::shutdown_signal, job::JobBid, job_trace::JobTrace, job_witness::JobWitness, + process::Process, }; use zetina_peer::swarm::{ - DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic, + DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic, }; use zetina_prover::{ errors::ProverControllerError, stone_prover::StoneProver, traits::ProverController, @@ -30,6 +31,7 @@ impl Executor { identity: PeerId, mut swarm_events: Pin + Send>>, gossipsub_tx: Sender, + kademlia_tx: Sender, runner: CairoRunner, prover: StoneProver, ) -> Self { @@ -41,6 +43,8 @@ impl Executor { Process<'_, Result>, >::new(); + let mut job_hash_store = HashSet::::new(); + loop { tokio::select! { Some(event) = swarm_events.next() => { @@ -48,13 +52,13 @@ impl Executor { PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }) => { if message.topic == Topic::Market.into() { match serde_json::from_slice::(&message.data)? { - MarketMessage::Job(job) => { + MarketMessage::JobBidPropagation(job_key) => { gossipsub_tx .send(GossipsubMessage { topic: Topic::Market.into(), data: serde_json::to_vec(&MarketMessage::JobBid(JobBid { identity, - job_hash: hash!(job), + job_key, price: (runner_scheduler.len() * prover_scheduler.len()) as u64, }))? }) @@ -67,23 +71,42 @@ impl Executor { match serde_json::from_slice::(&message.data)? { DelegationMessage::Delegate(job_delegation) => { if job_delegation.identity == identity { - info!("Scheduled running of job: {}", hash!(job_delegation.job)); - runner_scheduler.push(runner.run(job_delegation.job)?); + info!("received delegation of job: {}", hex::encode(&job_delegation.job_key)); + job_hash_store.insert(job_delegation.job_key.to_owned()); + kademlia_tx.send(KademliaMessage::GET(job_delegation.job_key)).await?; } } _ => {} } } } + PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..}) => { + match result { + kad::QueryResult::GetRecord(Ok( + kad::GetRecordOk::FoundRecord(kad::PeerRecord { + record: kad::Record { key, value, .. }, + .. + }) + )) => { + if job_hash_store.contains(&key) { + let job: Job = serde_json::from_slice(&value)?; + info!("received delegation of job: {}", hex::encode(&key)); + runner_scheduler.push(runner.run(job)?); + job_hash_store.remove(&key); + } + } + _ => {} + } + } _ => {} } } Some(Ok(job_trace)) = runner_scheduler.next() => { - info!("Scheduled proving of job_trace: {}", &job_trace.job_hash); + info!("Scheduled proving of job_trace: {}", hex::encode(&job_trace.job_key)); prover_scheduler.push(prover.run(job_trace)?); }, Some(Ok(job_witness)) = prover_scheduler.next() => { - info!("Finished proving: {}", &job_witness.job_hash); + info!("Finished proving: {}", hex::encode(&job_witness.job_key)); gossipsub_tx.send(GossipsubMessage { topic: Topic::Delegation.into(), data: serde_json::to_vec(&DelegationMessage::Finished(job_witness))? @@ -120,8 +143,11 @@ pub enum Error { #[error("runner_controller_error")] RunnerControllerError(#[from] RunnerControllerError), - #[error("mpsc_send_error")] - MpscSendError(#[from] mpsc::error::SendError), + #[error("mpsc_send_error GossipsubMessage")] + MpscSendErrorGossipsubMessage(#[from] mpsc::error::SendError), + + #[error("mpsc_send_error KademliaMessage")] + MpscSendErrorKademliaMessage(#[from] mpsc::error::SendError), #[error("io")] Io(#[from] std::io::Error), diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 58f257b..cf11c53 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -11,7 +11,7 @@ use tokio::{net::TcpListener, sync::mpsc}; use tower_http::{timeout::TimeoutLayer, trace::TraceLayer}; use tracing_subscriber::EnvFilter; use zetina_common::graceful_shutdown::shutdown_signal; -use zetina_peer::swarm::{GossipsubMessage, SwarmRunner}; +use zetina_peer::swarm::{GossipsubMessage, KademliaMessage, SwarmRunner}; use zetina_prover::stone_prover::StoneProver; use zetina_runner::cairo_runner::CairoRunner; @@ -68,12 +68,12 @@ async fn main() -> Result<(), Box> { .unwrap(); let (gossipsub_tx, gossipsub_rx) = mpsc::channel::(100); - let swarm_events = swarm_runner.run(gossipsub_rx); + let (kademlia_tx, kademlia_rx) = mpsc::channel::(100); + let swarm_events = swarm_runner.run(gossipsub_rx, kademlia_rx); let runner = CairoRunner::new(bootloader_program_path, signing_key.verifying_key()); let prover = StoneProver::new(); - - Executor::new(identity, swarm_events, gossipsub_tx, runner, prover); + Executor::new(identity, swarm_events, gossipsub_tx, kademlia_tx, runner, prover); // Create a `TcpListener` using tokio. let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); diff --git a/crates/peer/Cargo.toml b/crates/peer/Cargo.toml index bbabccf..d7e5cdc 100644 --- a/crates/peer/Cargo.toml +++ b/crates/peer/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +hex.workspace = true async-stream.workspace = true futures.workspace = true thiserror.workspace = true diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index 04e4e27..dc649d0 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -3,20 +3,23 @@ use futures::stream::Stream; use libp2p::futures::StreamExt; use libp2p::gossipsub::{self, IdentTopic, TopicHash}; use libp2p::identity::Keypair; +use libp2p::kad::store::MemoryStore; +use libp2p::kad::Mode; use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent}; -use libp2p::{noise, tcp, yamux, Multiaddr, Swarm, SwarmBuilder}; +use libp2p::{kad, noise, tcp, yamux, Multiaddr, Swarm, SwarmBuilder}; use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::time::Duration; use tokio::sync::mpsc; use tracing::{debug, error, info}; use zetina_common::graceful_shutdown::shutdown_signal; -use zetina_common::job::{Job, JobBid, JobDelegation}; +use zetina_common::job::{Job, JobBid}; use zetina_common::job_witness::JobWitness; #[derive(NetworkBehaviour)] pub struct PeerBehaviour { gossipsub: gossipsub::Behaviour, + kademlia: kad::Behaviour, } pub struct SwarmRunner { @@ -53,11 +56,18 @@ impl From for IdentTopic { } } +#[derive(Debug)] pub struct GossipsubMessage { pub topic: IdentTopic, pub data: Vec, } +#[derive(Debug)] +pub enum KademliaMessage { + GET(kad::RecordKey), + PUT((kad::RecordKey, Vec)), +} + #[derive(Debug, Serialize, Deserialize)] pub enum NetworkingMessage { Multiaddr(Multiaddr), @@ -66,12 +76,13 @@ pub enum NetworkingMessage { #[derive(Debug, Serialize, Deserialize)] pub enum MarketMessage { Job(Job), + JobBidPropagation(kad::RecordKey), JobBid(JobBid), } #[derive(Debug, Serialize, Deserialize)] pub enum DelegationMessage { - Delegate(JobDelegation), + Delegate(JobBid), Finished(JobWitness), } @@ -90,6 +101,10 @@ impl SwarmRunner { )? .with_quic() .with_behaviour(|p2p_keypair| PeerBehaviour { + kademlia: kad::Behaviour::new( + p2p_keypair.public().to_peer_id(), + MemoryStore::new(p2p_keypair.public().to_peer_id()), + ), gossipsub: Self::init_gossip(p2p_keypair).unwrap(), })? .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) @@ -98,6 +113,7 @@ impl SwarmRunner { swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Networking.as_str()))?; swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Market.as_str()))?; swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Delegation.as_str()))?; + swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); // swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?; swarm.listen_on(listen_multiaddr)?; @@ -122,18 +138,38 @@ impl SwarmRunner { pub fn run( mut self, mut gossipsub_message: mpsc::Receiver, + mut kademlia_message: mpsc::Receiver, ) -> Pin + Send>> { let stream = stream! { loop { tokio::select! { Some(message) = gossipsub_message.recv() => { - debug!{"Sending gossipsub_message: topic {}, data {:?}", message.topic, message.data}; + debug!{"Sending gossipsub_message: topic {}, data {}", message.topic, hex::encode(&message.data)}; if let Err(e) = self.swarm .behaviour_mut() .gossipsub .publish(message.topic, message.data) { - error!("Publish error: {e:?}"); + error!("Gossipsub error: {e:?}"); + } + }, + Some(message) = kademlia_message.recv() => { + debug!{"Sending kademlia_message: {:?}", message}; + match message { + KademliaMessage::GET(key) => { + self.swarm.behaviour_mut().kademlia.get_record(kad::RecordKey::new(&key)); + }, + KademliaMessage::PUT((key, data)) => { + let record = kad::Record { + key: kad::RecordKey::new(&key), + value: data, + publisher: Some(*self.swarm.local_peer_id()), + expires: None, + }; + if let Err(e) = self.swarm.behaviour_mut().kademlia.put_record(record, kad::Quorum::One) { + error!("Kademlia error: {e:?}"); + } + }, } }, event = self.swarm.select_next_some() => match event { @@ -175,13 +211,59 @@ impl SwarmRunner { message, }); } - SwarmEvent::ConnectionEstablished { peer_id, connection_id, num_established, .. } => { + SwarmEvent::ConnectionEstablished { peer_id, connection_id, num_established, endpoint, .. } => { info!{"Connection established: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established}; self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + self.swarm.behaviour_mut().kademlia.add_address(&peer_id, endpoint.get_remote_address().to_owned()); } - SwarmEvent::ConnectionClosed { peer_id, connection_id, num_established, .. } => { + SwarmEvent::ConnectionClosed { peer_id, connection_id, num_established, endpoint, .. } => { info!{"Connection closed: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established}; - self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); + if num_established == 0 { + self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); + self.swarm.behaviour_mut().kademlia.remove_address(&peer_id, endpoint.get_remote_address()); + } + } + SwarmEvent::Behaviour(PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, result, stats, step })) => { + match result { + kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => { + for peer in providers { + info!("Peer {peer:?} provides key {}", hex::encode(&key)); + } + } + kad::QueryResult::GetProviders(Err(err)) => { + error!("Failed to get providers: {err:?}"); + } + kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(kad::PeerRecord {peer, record}))) => { + info!("Successfully got record {}", hex::encode(&record.key)); + + yield PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, + result: kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(kad::PeerRecord {peer, record}))), + stats, step }) + } + kad::QueryResult::GetRecord(Ok(_)) => {} + kad::QueryResult::GetRecord(Err(err)) => { + error!("Failed to get record: {err:?}"); + } + kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => { + info!("Successfully put record {}", hex::encode(&key)); + + yield PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, + result: kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })), + stats, step }) + } + kad::QueryResult::PutRecord(Err(err)) => { + error!("Failed to put record: {err:?}"); + } + kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => { + info!("Successfully put provider record {}", hex::encode(&key)); + } + kad::QueryResult::StartProviding(Err(err)) => { + error!("Failed to put provider record: {err:?}"); + } + event => { + debug!("Unhandled event: {:?}", event); + } + } } SwarmEvent::Behaviour(event) => { yield event; diff --git a/crates/prover/Cargo.toml b/crates/prover/Cargo.toml index e9cf6dc..2f89ba0 100644 --- a/crates/prover/Cargo.toml +++ b/crates/prover/Cargo.toml @@ -11,6 +11,8 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +hex.workspace = true +libp2p.workspace = true async-process.workspace = true cairo-proof-parser.workspace = true futures.workspace= true diff --git a/crates/prover/src/stone_prover/mod.rs b/crates/prover/src/stone_prover/mod.rs index 76bd689..4986858 100644 --- a/crates/prover/src/stone_prover/mod.rs +++ b/crates/prover/src/stone_prover/mod.rs @@ -8,14 +8,13 @@ use futures::Future; use serde_json::Value; use std::{ fs, - hash::{DefaultHasher, Hash, Hasher}, io::{Read, Write}, pin::Pin, }; use tempfile::NamedTempFile; use tokio::{process::Command, select, sync::mpsc}; use tracing::debug; -use zetina_common::{hash, job_trace::JobTrace, job_witness::JobWitness, process::Process}; +use zetina_common::{job_trace::JobTrace, job_witness::JobWitness, process::Process}; pub mod tests; pub mod types; @@ -67,9 +66,7 @@ impl ProverController for StoneProver { .stdout(Stdio::null()) .spawn()?; - let job_trace_hash = hash!(job_trace); - - debug!("task {} spawned", job_trace_hash); + debug!("task {} spawned", hex::encode(&job_trace.job_key)); loop { select! { @@ -91,7 +88,7 @@ impl ProverController for StoneProver { let mut proof = Vec::new(); out_file.read_to_end(&mut proof)?; - Ok(JobWitness { job_hash: job_trace.job_hash, proof }) + Ok(JobWitness { job_key: job_trace.job_key.to_owned(), proof }) }); Ok(Process::new(future, terminate_tx)) diff --git a/crates/prover/src/stone_prover/tests/models.rs b/crates/prover/src/stone_prover/tests/models.rs index b5d8650..b718f2b 100644 --- a/crates/prover/src/stone_prover/tests/models.rs +++ b/crates/prover/src/stone_prover/tests/models.rs @@ -1,3 +1,4 @@ +use libp2p::kad; use std::{env, fs, io::Write, path::PathBuf}; use tempfile::NamedTempFile; use zetina_common::job_trace::JobTrace; @@ -28,12 +29,12 @@ pub fn fixture() -> TestFixture { trace.write_all(&fs::read(trace_path).unwrap()).unwrap(); TestFixture { - job_trace: JobTrace { - job_hash: u64::default(), + job_trace: JobTrace::new( + kad::RecordKey::new(&[0]), air_public_input, air_private_input, memory, trace, - }, + ), } } diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml index a3c0234..0866f59 100644 --- a/crates/runner/Cargo.toml +++ b/crates/runner/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +hex.workspace = true async-process.workspace = true futures.workspace = true libp2p.workspace = true diff --git a/crates/runner/src/cairo_runner/mod.rs b/crates/runner/src/cairo_runner/mod.rs index 269f3d5..2453b9c 100644 --- a/crates/runner/src/cairo_runner/mod.rs +++ b/crates/runner/src/cairo_runner/mod.rs @@ -2,6 +2,7 @@ use self::types::input::SimpleBootloaderInput; use crate::{errors::RunnerControllerError, traits::RunnerController}; use async_process::Stdio; use futures::Future; +use libp2p::kad; use starknet::signers::VerifyingKey; use std::{ hash::{DefaultHasher, Hash, Hasher}, @@ -36,7 +37,7 @@ impl RunnerController for CairoRunner { let future: Pin< Box> + Send + '_>, > = Box::pin(async move { - let job_hash = hash!(job); + let job_key = kad::RecordKey::new(&hash!(job).to_be_bytes()); let layout: &str = Layout::Starknet.into(); let mut cairo_pie = NamedTempFile::new()?; @@ -77,7 +78,7 @@ impl RunnerController for CairoRunner { .stdout(Stdio::null()) .spawn()?; - debug!("task {} spawned", job_hash); + debug!("task {} spawned", hex::encode(&job_key)); loop { select! { @@ -95,7 +96,7 @@ impl RunnerController for CairoRunner { } } } - Ok(JobTrace { job_hash, air_public_input, air_private_input, memory, trace }) + Ok(JobTrace::new(job_key, air_public_input, air_private_input, memory, trace)) }); Ok(Process::new(future, terminate_tx))