diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 9fb3ff1..4b72211 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -13,6 +13,7 @@ publish = true [dependencies] cairo-vm.workspace = true +rand.workspace = true futures.workspace = true hex.workspace = true libp2p.workspace = true diff --git a/crates/common/src/job_record.rs b/crates/common/src/job_record.rs index 326a3f3..4f61eb8 100644 --- a/crates/common/src/job_record.rs +++ b/crates/common/src/job_record.rs @@ -1,5 +1,7 @@ use std::{collections::BTreeSet, hash::Hash}; +use rand::Rng; + pub struct JobRecord { ordered_set: BTreeSet, } @@ -21,6 +23,12 @@ where } pub async fn take_job(&mut self) -> Option { + // add random wait to simulate network overhead + let random = { + let mut rng = rand::thread_rng(); + rng.gen_range(0..1000) + }; + tokio::time::sleep(std::time::Duration::from_millis(random)).await; self.ordered_set.pop_last() } diff --git a/crates/common/src/job_trace.rs b/crates/common/src/job_trace.rs index f5870ea..ac674a4 100644 --- a/crates/common/src/job_trace.rs +++ b/crates/common/src/job_trace.rs @@ -13,6 +13,7 @@ use tempfile::NamedTempFile; #[derive(Debug)] pub struct JobTrace { + pub job_hash: u64, pub air_public_input: NamedTempFile, // Temporary file containing the public input pub air_private_input: NamedTempFile, // Temporary file containing the private input; memory and trace files must exist for this to be valid pub memory: NamedTempFile, // Temporary file containing memory data (required for air_private_input validity) diff --git a/crates/common/src/job_witness.rs b/crates/common/src/job_witness.rs index 799d195..1f3c405 100644 --- a/crates/common/src/job_witness.rs +++ b/crates/common/src/job_witness.rs @@ -14,6 +14,7 @@ use std::{ #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct JobWitness { + pub job_hash: u64, pub proof: Vec, // Serialized proof } diff --git a/crates/delegator/proto/delegator.proto b/crates/delegator/proto/delegator.proto index 2712477..bac9d9b 100644 --- a/crates/delegator/proto/delegator.proto +++ b/crates/delegator/proto/delegator.proto @@ -7,4 +7,7 @@ service DelegatorService { } message DelegateRequest { bytes cairo_pie = 1; } -message DelegateResponse { bytes proof = 1; } \ No newline at end of file +message DelegateResponse { + bytes proof = 1; + uint64 job_hash = 2; +} \ No newline at end of file diff --git a/crates/delegator/src/swarm/event_loop.rs b/crates/delegator/src/swarm/event_loop.rs index be90ed9..c512467 100644 --- a/crates/delegator/src/swarm/event_loop.rs +++ b/crates/delegator/src/swarm/event_loop.rs @@ -1,3 +1,5 @@ +use super::PeerBehaviour; +use crate::swarm::PeerBehaviourEvent; use futures::StreamExt; use libp2p::{ gossipsub::{self, IdentTopic}, @@ -9,10 +11,6 @@ use tokio::sync::mpsc::{self, Receiver}; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; -use crate::swarm::PeerBehaviourEvent; - -use super::PeerBehaviour; - pub(crate) async fn swarm_loop( mut swarm: Swarm, mut transmit_topics: Vec<(IdentTopic, Receiver>)>, diff --git a/crates/delegator/src/tonic.rs b/crates/delegator/src/tonic.rs index 3e51060..0c57c8c 100644 --- a/crates/delegator/src/tonic.rs +++ b/crates/delegator/src/tonic.rs @@ -1,6 +1,13 @@ +pub mod proto { + tonic::include_proto!("delegator"); +} +use proto::delegator_service_server::DelegatorService; +use proto::{DelegateRequest, DelegateResponse}; + use async_stream::stream; use futures::{Stream, StreamExt, TryStreamExt}; use starknet::signers::SigningKey; +use std::collections::HashSet; use std::hash::{DefaultHasher, Hash, Hasher}; use std::pin::Pin; use tokio::{ @@ -15,12 +22,6 @@ use zetina_common::{ job_witness::JobWitness, }; -pub mod proto { - tonic::include_proto!("delegator"); -} -use crate::tonic::proto::delegator_service_server::DelegatorService; -use proto::{DelegateRequest, DelegateResponse}; - pub struct DelegatorGRPCServer { signing_key: SigningKey, job_topic_tx: mpsc::Sender>, @@ -50,18 +51,23 @@ impl DelegatorService for DelegatorGRPCServer { let mut witness_channel = self.job_witness_rx.resubscribe(); let signing_key = self.signing_key.clone(); + let mut queue_set = HashSet::::new(); + let out_stream = stream! { loop { select! { Ok(request) = in_stream.select_next_some() => { let job_data = JobData::new(0, request.cairo_pie); let job = Job::try_from_job_data(job_data, &signing_key); + queue_set.insert(hash!(job)); let serialized_job = serde_json::to_string(&job).unwrap(); job_channel.send(serialized_job.into()).await.unwrap(); info!("Sent a new job: {}", hash!(&job)); } Ok(rx) = witness_channel.recv() => { - yield Ok(DelegateResponse { proof: rx.proof }) + if let Some(job_hash) = queue_set.take(&rx.job_hash) { + yield Ok(DelegateResponse { job_hash, proof: rx.proof }) + } } else => { yield Err(Status::cancelled("")) diff --git a/crates/executor/src/swarm/event_loop.rs b/crates/executor/src/swarm/event_loop.rs index a0e5921..c2c5f70 100644 --- a/crates/executor/src/swarm/event_loop.rs +++ b/crates/executor/src/swarm/event_loop.rs @@ -1,3 +1,5 @@ +use super::PeerBehaviour; +use crate::swarm::PeerBehaviourEvent; use futures::StreamExt; use libp2p::{ gossipsub::{self, IdentTopic}, @@ -9,10 +11,6 @@ use tokio::sync::mpsc::{self, Receiver}; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; -use crate::swarm::PeerBehaviourEvent; - -use super::PeerBehaviour; - pub(crate) async fn swarm_loop( mut swarm: Swarm, mut transmit_topics: Vec<(IdentTopic, Receiver>)>, diff --git a/crates/prover/src/stone_prover/mod.rs b/crates/prover/src/stone_prover/mod.rs index 11f3d7d..76bd689 100644 --- a/crates/prover/src/stone_prover/mod.rs +++ b/crates/prover/src/stone_prover/mod.rs @@ -91,7 +91,7 @@ impl ProverController for StoneProver { let mut proof = Vec::new(); out_file.read_to_end(&mut proof)?; - Ok(JobWitness { proof }) + Ok(JobWitness { job_hash: job_trace.job_hash, proof }) }); Ok(Process::new(future, terminate_tx)) diff --git a/crates/prover/src/stone_prover/tests/models.rs b/crates/prover/src/stone_prover/tests/models.rs index d58629a..b5d8650 100644 --- a/crates/prover/src/stone_prover/tests/models.rs +++ b/crates/prover/src/stone_prover/tests/models.rs @@ -27,5 +27,13 @@ pub fn fixture() -> TestFixture { let mut trace = NamedTempFile::new().unwrap(); trace.write_all(&fs::read(trace_path).unwrap()).unwrap(); - TestFixture { job_trace: JobTrace { air_public_input, air_private_input, memory, trace } } + TestFixture { + job_trace: JobTrace { + job_hash: u64::default(), + air_public_input, + air_private_input, + memory, + trace, + }, + } } diff --git a/crates/runner/src/cairo_runner/mod.rs b/crates/runner/src/cairo_runner/mod.rs index 1d50e6b..269f3d5 100644 --- a/crates/runner/src/cairo_runner/mod.rs +++ b/crates/runner/src/cairo_runner/mod.rs @@ -95,7 +95,7 @@ impl RunnerController for CairoRunner { } } } - Ok(JobTrace { air_public_input, air_private_input, memory, trace }) + Ok(JobTrace { job_hash, air_public_input, air_private_input, memory, trace }) }); Ok(Process::new(future, terminate_tx))