Skip to content

Commit

Permalink
new executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Jun 30, 2024
1 parent 13fd761 commit b306238
Show file tree
Hide file tree
Showing 15 changed files with 516 additions and 404 deletions.
8 changes: 7 additions & 1 deletion crates/common/src/job_witness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@ use std::{
This serialized proof can be deserialized into a StarkProof object by the verifier to proceed with the verification of the statement.
*/

#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct JobWitness {
pub proof: Vec<u8>, // Serialized proof
}

impl Hash for JobWitness {
fn hash<H: Hasher>(&self, state: &mut H) {
self.proof.hash(state);
}
}

impl Display for JobWitness {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(hash!(self).to_be_bytes()))
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::pin::Pin;
use tokio::sync::mpsc;

pub struct Process<'future, PR> {
future: Pin<Box<dyn Future<Output = PR> + 'future>>,
future: Pin<Box<dyn Future<Output = PR> + Send + 'future>>,
abort: mpsc::Sender<()>,
}

impl<'future, PR> Process<'future, PR> {
pub fn new(
future: Pin<Box<dyn Future<Output = PR> + 'future>>,
future: Pin<Box<dyn Future<Output = PR> + Send + 'future>>,
abort: mpsc::Sender<()>,
) -> Self {
Self { future, abort }
Expand Down
113 changes: 57 additions & 56 deletions crates/compiler/src/cairo_compiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,78 +30,79 @@ impl<'identity> CompilerController for CairoCompiler<'identity> {
program_input_path: PathBuf,
) -> Result<Process<Result<Job, CompilerControllerError>>, CompilerControllerError> {
let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10);
let future: Pin<Box<dyn Future<Output = Result<Job, CompilerControllerError>> + '_>> =
Box::pin(async move {
let layout: &str = Layout::RecursiveWithPoseidon.into();
let future: Pin<
Box<dyn Future<Output = Result<Job, CompilerControllerError>> + Send + '_>,
> = Box::pin(async move {
let layout: &str = Layout::RecursiveWithPoseidon.into();

let output = NamedTempFile::new()?;
let output = NamedTempFile::new()?;

let mut task = Command::new("cairo-compile")
.arg(program_path.as_path())
.arg("--output")
.arg(output.path())
.arg("--proof_mode")
.stdout(Stdio::null())
.spawn()?;
let mut task = Command::new("cairo-compile")
.arg(program_path.as_path())
.arg("--output")
.arg(output.path())
.arg("--proof_mode")
.stdout(Stdio::null())
.spawn()?;

debug!("program {:?} is compiling... ", program_path);
debug!("program {:?} is compiling... ", program_path);

loop {
select! {
output = task.wait() => {
debug!("{:?}", output);
if !output?.success() {
return Err(CompilerControllerError::TaskTerminated);
}
let output = task.wait_with_output().await?;
debug!("{:?}", output);
break;
}
Some(()) = terminate_rx.recv() => {
task.start_kill()?;
loop {
select! {
output = task.wait() => {
debug!("{:?}", output);
if !output?.success() {
return Err(CompilerControllerError::TaskTerminated);
}
let output = task.wait_with_output().await?;
debug!("{:?}", output);
break;
}
Some(()) = terminate_rx.recv() => {
task.start_kill()?;
}
}
}

let mut cairo_pie = NamedTempFile::new()?;
let mut cairo_pie = NamedTempFile::new()?;

let mut task = Command::new("cairo-run")
.arg("--program")
.arg(output.path())
.arg("--layout")
.arg(layout)
.arg("--program_input")
.arg(program_input_path)
.arg("--cairo_pie_output")
.arg(cairo_pie.path())
.arg("--print_output")
.stdout(Stdio::null())
.spawn()?;
let mut task = Command::new("cairo-run")
.arg("--program")
.arg(output.path())
.arg("--layout")
.arg(layout)
.arg("--program_input")
.arg(program_input_path)
.arg("--cairo_pie_output")
.arg(cairo_pie.path())
.arg("--print_output")
.stdout(Stdio::null())
.spawn()?;

debug!("program {:?} is generating PIE... ", program_path);
debug!("program {:?} is generating PIE... ", program_path);

loop {
select! {
output = task.wait() => {
debug!("{:?}", output);
if !output?.success() {
return Err(CompilerControllerError::TaskTerminated);
}
let output = task.wait_with_output().await?;
debug!("{:?}", output);
break;
}
Some(()) = terminate_rx.recv() => {
task.start_kill()?;
loop {
select! {
output = task.wait() => {
debug!("{:?}", output);
if !output?.success() {
return Err(CompilerControllerError::TaskTerminated);
}
let output = task.wait_with_output().await?;
debug!("{:?}", output);
break;
}
Some(()) = terminate_rx.recv() => {
task.start_kill()?;
}
}
}

let mut cairo_pie_compressed = Vec::new();
cairo_pie.read_to_end(&mut cairo_pie_compressed)?;
let mut cairo_pie_compressed = Vec::new();
cairo_pie.read_to_end(&mut cairo_pie_compressed)?;

Ok(Job::try_from_job_data(JobData::new(0, cairo_pie_compressed), self.signing_key))
});
Ok(Job::try_from_job_data(JobData::new(0, cairo_pie_compressed), self.signing_key))
});

Ok(Process::new(future, terminate_tx))
}
Expand Down
1 change: 1 addition & 0 deletions crates/delegator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ prost.workspace = true
async-stream.workspace = true
tokio-util.workspace = true
tokio-stream.workspace = true
thiserror.workspace = true

[build-dependencies]
tonic-build.workspace = true
75 changes: 66 additions & 9 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,73 @@
pub mod event_loop;

use event_loop::delegator_loop;
use futures::executor::block_on;
use futures::FutureExt;
use libp2p::gossipsub::Event;
use std::hash::{DefaultHasher, Hash, Hasher};
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use zetina_common::job_witness::JobWitness;
use tracing::info;
use zetina_common::{
hash,
job::Job,
job_witness::JobWitness,
network::Network,
topic::{gossipsub_ident_topic, Topic},
};

pub struct Delegator {
cancellation_token: CancellationToken,
handle: Option<JoinHandle<()>>,
handle: Option<JoinHandle<Result<(), DelegatorError>>>,
}

impl Delegator {
pub fn new(
job_witness_tx: broadcast::Sender<JobWitness>,
events_rx: mpsc::Receiver<Event>,
mut events_rx: mpsc::Receiver<Event>,
) -> Self {
let cancellation_token = CancellationToken::new();

Self {
cancellation_token: cancellation_token.to_owned(),
handle: Some(tokio::spawn(async move {
delegator_loop(events_rx, job_witness_tx, cancellation_token).boxed().await
loop {
tokio::select! {
Some(event) = events_rx.recv() => {
match event {
Event::Message { message, .. } => {
// Received a new-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::NewJob).into() {
let job: Job = serde_json::from_slice(&message.data)?;
info!("Received a new job event: {}", hash!(&job));
}
// Received a picked-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob).into() {
let job: Job = serde_json::from_slice(&message.data)?;
info!("Received picked job event: {}", hash!(&job));
}
// Received a finished-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::FinishedJob).into() {
let job_witness: JobWitness = serde_json::from_slice(&message.data)?;
info!("Received finished job event: {}", hash!(&job_witness));
job_witness_tx.send(job_witness)?;
}
},
Event::Subscribed { peer_id, topic } => {
info!("{} subscribed to the topic {}", peer_id.to_string(), topic.to_string());
},
Event::Unsubscribed { peer_id, topic }=> {
info!("{} unsubscribed to the topic {}", peer_id.to_string(), topic.to_string());
},
_ => {}
}
},
_ = cancellation_token.cancelled() => {
break
}
else => break
}
}
Ok(())
})),
}
}
Expand All @@ -36,7 +77,23 @@ impl Drop for Delegator {
fn drop(&mut self) {
self.cancellation_token.cancel();
block_on(async move {
self.handle.take().unwrap().await.unwrap();
if let Some(handle) = self.handle.take() {
handle.await.unwrap().unwrap();
}
})
}
}

use thiserror::Error;

#[derive(Error, Debug)]
pub enum DelegatorError {
#[error("broadcast_send_error")]
BroadcastSendError(#[from] tokio::sync::broadcast::error::SendError<JobWitness>),

#[error("io")]
Io(#[from] std::io::Error),

#[error("serde")]
Serde(#[from] serde_json::Error),
}
56 changes: 0 additions & 56 deletions crates/delegator/src/delegator/event_loop.rs

This file was deleted.

4 changes: 3 additions & 1 deletion crates/delegator/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ impl Drop for SwarmRunner {
fn drop(&mut self) {
self.cancellation_token.cancel();
block_on(async move {
self.handle.take().unwrap().await.unwrap();
if let Some(handle) = self.handle.take() {
handle.await.unwrap();
}
})
}
}
3 changes: 2 additions & 1 deletion crates/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ tracing-subscriber.workspace = true
tracing.workspace = true
starknet.workspace = true
async-stream.workspace = true
tokio-util.workspace = true
tokio-util.workspace = true
thiserror.workspace = true
Loading

0 comments on commit b306238

Please sign in to comment.