Skip to content

Commit

Permalink
delegator filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Jun 30, 2024
1 parent b306238 commit d00de81
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 19 deletions.
1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ publish = true

[dependencies]
cairo-vm.workspace = true
rand.workspace = true
futures.workspace = true
hex.workspace = true
libp2p.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions crates/common/src/job_record.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{collections::BTreeSet, hash::Hash};

use rand::Rng;

pub struct JobRecord<J> {
ordered_set: BTreeSet<J>,
}
Expand All @@ -21,6 +23,12 @@ where
}

pub async fn take_job(&mut self) -> Option<J> {
// add random wait to simulate network overhead
let random = {
let mut rng = rand::thread_rng();
rng.gen_range(0..1000)
};
tokio::time::sleep(std::time::Duration::from_millis(random)).await;
self.ordered_set.pop_last()
}

Expand Down
1 change: 1 addition & 0 deletions crates/common/src/job_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tempfile::NamedTempFile;

#[derive(Debug)]
pub struct JobTrace {
pub job_hash: u64,
pub air_public_input: NamedTempFile, // Temporary file containing the public input
pub air_private_input: NamedTempFile, // Temporary file containing the private input; memory and trace files must exist for this to be valid
pub memory: NamedTempFile, // Temporary file containing memory data (required for air_private_input validity)
Expand Down
1 change: 1 addition & 0 deletions crates/common/src/job_witness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct JobWitness {
pub job_hash: u64,
pub proof: Vec<u8>, // Serialized proof
}

Expand Down
5 changes: 4 additions & 1 deletion crates/delegator/proto/delegator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ service DelegatorService {
}

message DelegateRequest { bytes cairo_pie = 1; }
message DelegateResponse { bytes proof = 1; }
message DelegateResponse {
bytes proof = 1;
uint64 job_hash = 2;
}
6 changes: 2 additions & 4 deletions crates/delegator/src/swarm/event_loop.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use super::PeerBehaviour;
use crate::swarm::PeerBehaviourEvent;
use futures::StreamExt;
use libp2p::{
gossipsub::{self, IdentTopic},
Expand All @@ -9,10 +11,6 @@ use tokio::sync::mpsc::{self, Receiver};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};

use crate::swarm::PeerBehaviourEvent;

use super::PeerBehaviour;

pub(crate) async fn swarm_loop(
mut swarm: Swarm<PeerBehaviour>,
mut transmit_topics: Vec<(IdentTopic, Receiver<Vec<u8>>)>,
Expand Down
20 changes: 13 additions & 7 deletions crates/delegator/src/tonic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
pub mod proto {
tonic::include_proto!("delegator");
}
use proto::delegator_service_server::DelegatorService;
use proto::{DelegateRequest, DelegateResponse};

use async_stream::stream;
use futures::{Stream, StreamExt, TryStreamExt};
use starknet::signers::SigningKey;
use std::collections::HashSet;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::pin::Pin;
use tokio::{
Expand All @@ -15,12 +22,6 @@ use zetina_common::{
job_witness::JobWitness,
};

pub mod proto {
tonic::include_proto!("delegator");
}
use crate::tonic::proto::delegator_service_server::DelegatorService;
use proto::{DelegateRequest, DelegateResponse};

pub struct DelegatorGRPCServer {
signing_key: SigningKey,
job_topic_tx: mpsc::Sender<Vec<u8>>,
Expand Down Expand Up @@ -50,18 +51,23 @@ impl DelegatorService for DelegatorGRPCServer {
let mut witness_channel = self.job_witness_rx.resubscribe();
let signing_key = self.signing_key.clone();

let mut queue_set = HashSet::<u64>::new();

let out_stream = stream! {
loop {
select! {
Ok(request) = in_stream.select_next_some() => {
let job_data = JobData::new(0, request.cairo_pie);
let job = Job::try_from_job_data(job_data, &signing_key);
queue_set.insert(hash!(job));
let serialized_job = serde_json::to_string(&job).unwrap();
job_channel.send(serialized_job.into()).await.unwrap();
info!("Sent a new job: {}", hash!(&job));
}
Ok(rx) = witness_channel.recv() => {
yield Ok(DelegateResponse { proof: rx.proof })
if let Some(job_hash) = queue_set.take(&rx.job_hash) {
yield Ok(DelegateResponse { job_hash, proof: rx.proof })
}
}
else => {
yield Err(Status::cancelled(""))
Expand Down
6 changes: 2 additions & 4 deletions crates/executor/src/swarm/event_loop.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use super::PeerBehaviour;
use crate::swarm::PeerBehaviourEvent;
use futures::StreamExt;
use libp2p::{
gossipsub::{self, IdentTopic},
Expand All @@ -9,10 +11,6 @@ use tokio::sync::mpsc::{self, Receiver};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};

use crate::swarm::PeerBehaviourEvent;

use super::PeerBehaviour;

pub(crate) async fn swarm_loop(
mut swarm: Swarm<PeerBehaviour>,
mut transmit_topics: Vec<(IdentTopic, Receiver<Vec<u8>>)>,
Expand Down
2 changes: 1 addition & 1 deletion crates/prover/src/stone_prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl ProverController for StoneProver {
let mut proof = Vec::new();
out_file.read_to_end(&mut proof)?;

Ok(JobWitness { proof })
Ok(JobWitness { job_hash: job_trace.job_hash, proof })
});

Ok(Process::new(future, terminate_tx))
Expand Down
10 changes: 9 additions & 1 deletion crates/prover/src/stone_prover/tests/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,13 @@ pub fn fixture() -> TestFixture {
let mut trace = NamedTempFile::new().unwrap();
trace.write_all(&fs::read(trace_path).unwrap()).unwrap();

TestFixture { job_trace: JobTrace { air_public_input, air_private_input, memory, trace } }
TestFixture {
job_trace: JobTrace {
job_hash: u64::default(),
air_public_input,
air_private_input,
memory,
trace,
},
}
}
2 changes: 1 addition & 1 deletion crates/runner/src/cairo_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl RunnerController for CairoRunner {
}
}
}
Ok(JobTrace { air_public_input, air_private_input, memory, trace })
Ok(JobTrace { job_hash, air_public_input, air_private_input, memory, trace })
});

Ok(Process::new(future, terminate_tx))
Expand Down

0 comments on commit d00de81

Please sign in to comment.