Skip to content

Commit

Permalink
MAJOR refactor of processes & runner tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Apr 19, 2024
1 parent 46206b0 commit 55dd1b4
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 140 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ libp2p = { version = "0.53.2", features = [
] }
libsecp256k1 = "0.7.1"
num-bigint = "0.4.4"
rand = "0.8.5"
serde = "1.0.197"
serde_json = "1.0.115"
starknet = "0.9.0"
Expand All @@ -56,7 +57,6 @@ tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
zip-extensions = "0.6.2"


sharp-p2p-common = { path = "crates/common" }
sharp-p2p-delegator = { path = "crates/delegator" }
sharp-p2p-executor = { path = "crates/executor" }
Expand Down
2 changes: 2 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license-file.workspace = true
[dependencies]
bincode.workspace = true
cairo-felt.workspace = true
futures.workspace= true
hex.workspace = true
libp2p.workspace = true
libsecp256k1.workspace = true
Expand All @@ -18,3 +19,4 @@ serde_json.workspace = true
serde.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio.workspace = true
31 changes: 30 additions & 1 deletion crates/common/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::hash;
use libsecp256k1::{curve::Scalar, sign, PublicKey, SecretKey, Signature};
use libsecp256k1::{curve::Scalar, sign, Message, PublicKey, SecretKey, Signature};
use std::{
fmt::Display,
fs,
hash::{DefaultHasher, Hash, Hasher},
path::PathBuf,
};

/*
Expand All @@ -24,6 +26,33 @@ pub struct Job {
pub signature: Signature, // The signature of the delegator, used in the bootloader stage to confirm authenticity of the Job<->Delegator relationship
}

impl Job {
pub fn new(
reward: u32,
num_of_steps: u32,
cairo_pie_file: PathBuf,
registry_address: &str,
secret_key: SecretKey,
) -> Self {
Self {
reward,
num_of_steps,
cairo_pie: fs::read(cairo_pie_file).unwrap(),
registry_address: registry_address.to_string(),
public_key: PublicKey::from_secret_key(&secret_key),
signature: libsecp256k1::sign(
// TODO proper impl just mocked rn for tests
&Message::parse(&[
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
]),
&secret_key,
)
.0,
}
}
}

impl Default for Job {
fn default() -> Self {
let secret_key = &SecretKey::default();
Expand Down
1 change: 1 addition & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod job;
pub mod job_trace;
pub mod job_witness;
pub mod network;
pub mod process;
pub mod topic;
pub mod vec252;
35 changes: 35 additions & 0 deletions crates/common/src/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use futures::{Future, FutureExt};
use std::pin::Pin;
use tokio::sync::mpsc;

pub struct Process<'future, PR> {
future: Pin<Box<dyn Future<Output = PR> + 'future>>,
abort: mpsc::Sender<()>,
}

impl<'future, PR> Process<'future, PR> {
pub fn new(
future: Pin<Box<dyn Future<Output = PR> + 'future>>,
abort: mpsc::Sender<()>,
) -> Self {
Self { future, abort }
}

pub async fn abort(&self) -> Result<(), mpsc::error::SendError<()>> {
self.abort.send(()).await
}

pub fn into_parts(self) -> (Pin<Box<dyn Future<Output = PR> + 'future>>, mpsc::Sender<()>) {
(self.future, self.abort)
}
}

impl<'future, PR> Future for Process<'future, PR> {
type Output = PR;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.future.poll_unpin(cx)
}
}
10 changes: 5 additions & 5 deletions crates/prover/build.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// use std::process::Command;
use std::process::Command;

fn main() {
// Check if stone-prover command is present
// Command::new("cpu_air_prover")
// .arg("--help")
// .output()
// .expect("Failed to execute cpu_air_prover command");
Command::new("cpu_air_prover")
.arg("--help")
.output()
.expect("Failed to execute cpu_air_prover command");
}
3 changes: 3 additions & 0 deletions crates/runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ license-file.workspace = true
async-process.workspace = true
cairo-proof-parser.workspace = true
futures.workspace= true
hex.workspace = true
itertools.workspace = true
libsecp256k1.workspace = true
rand.workspace = true
serde_json.workspace = true
serde.workspace = true
sharp-p2p-common.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions crates/runner/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ fn main() {
Command::new("cairo-compile")
.arg("--cairo_path")
.arg(&workspace_root.join(&cairo_path))
.arg(&workspace_root.join(&cairo_path).join(&bootloader_path))
.arg(&workspace_root.join(&cairo_path).join(bootloader_path))
.arg("--output")
.arg(&out_dir.join(&bootloader_out_name))
.arg(&out_dir.join(bootloader_out_name))
.arg("--proof_mode")
.output()
.expect("bootloader compile failed");
Expand Down
189 changes: 87 additions & 102 deletions crates/runner/src/cairo_runner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,122 +1,107 @@
use crate::{
errors::RunnerControllerError,
traits::{Runner, RunnerController},
traits::RunnerController,
types::{
input::{BootloaderInput, Task},
input::{BootloaderInput, BootloaderTask},
layout::Layout,
},
};
use sharp_p2p_common::{hash, job::Job, job_trace::JobTrace};
use async_process::Stdio;
use futures::Future;
use sharp_p2p_common::{hash, job::Job, job_trace::JobTrace, process::Process};
use std::{
collections::HashMap,
hash::{DefaultHasher, Hash, Hasher},
pin::Pin,
};
use std::{env, io::Write, path::PathBuf};
use std::{io::Write, path::PathBuf};
use tempfile::NamedTempFile;
use tokio::process::{Child, Command};
use tracing::{debug, trace};
use tokio::{process::Command, select, sync::mpsc};
use tracing::debug;

#[cfg(test)]
pub mod tests;

pub struct CairoRunner {
tasks: HashMap<u64, Child>,
program_path: PathBuf,
}

impl Runner for CairoRunner {
fn init() -> impl RunnerController {
Self { tasks: HashMap::new() }
impl CairoRunner {
pub fn new(program_path: PathBuf) -> Self {
Self { program_path }
}
}

impl RunnerController for CairoRunner {
async fn run(&mut self, job: Job) -> Result<JobTrace, RunnerControllerError> {
let cargo_target_dir =
PathBuf::from(env::var("CARGO_TARGET_DIR").expect("CARGO_TARGET_DIR env not present"));
let bootloader_out_name = PathBuf::from(
env::var("BOOTLOADER_OUT_NAME").expect("BOOTLOADER_OUT_NAME env not present"),
);

let program = cargo_target_dir.join(&bootloader_out_name);
let layout: &str = Layout::RecursiveWithPoseidon.into();

let mut cairo_pie = NamedTempFile::new()?;
cairo_pie.write_all(&job.cairo_pie)?;

let input = BootloaderInput {
tasks: vec![Task { path: cairo_pie.path().to_path_buf(), ..Default::default() }],
..Default::default()
};

let mut program_input = NamedTempFile::new()?;
program_input.write_all(&serde_json::to_string(&input)?.into_bytes())?;

// outputs
let air_public_input = NamedTempFile::new()?;
let air_private_input = NamedTempFile::new()?;
let trace = NamedTempFile::new()?;
let memory = NamedTempFile::new()?;

let task = Command::new("cairo-run")
.arg("--program")
.arg(program.as_path())
.arg("--layout")
.arg(layout)
.arg("--program_input")
.arg(program_input.path())
.arg("--air_public_input")
.arg(air_public_input.path())
.arg("--air_private_input")
.arg(air_private_input.path())
.arg("--trace_file")
.arg(trace.path())
.arg("--memory_file")
.arg(memory.path())
.arg("--proof_mode")
.arg("--print_output")
.spawn()?;

let job_hash = hash!(job);

debug!("task {} spawned", job_hash);
self.tasks.insert(job_hash.to_owned(), task);

let task_status = self
.tasks
.get_mut(&job_hash)
.ok_or(RunnerControllerError::TaskNotFound)?
.wait()
.await?;

trace!("task {} woke up", job_hash);
if !task_status.success() {
debug!("task terminated {}", job_hash);
return Err(RunnerControllerError::TaskTerminated);
}

let task_output = self
.tasks
.remove(&job_hash)
.ok_or(RunnerControllerError::TaskNotFound)?
.wait_with_output()
.await?;
trace!("task {} output {:?}", job_hash, task_output);

Ok(JobTrace { air_public_input, air_private_input, memory, trace })
}

fn terminate(&mut self, job_hash: u64) -> Result<(), RunnerControllerError> {
self.tasks.get_mut(&job_hash).ok_or(RunnerControllerError::TaskNotFound)?.start_kill()?;
trace!("task scheduled for termination {}", job_hash);
Ok(())
}

fn drop(mut self) -> Result<(), RunnerControllerError> {
let keys: Vec<u64> = self.tasks.keys().cloned().collect();
for job_hash in keys.iter() {
self.tasks
.get_mut(job_hash)
.ok_or(RunnerControllerError::TaskNotFound)?
.start_kill()?;
trace!("task scheduled for termination {}", job_hash);
}
Ok(())
type ProcessResult = Result<JobTrace, RunnerControllerError>;
fn run(&self, job: Job) -> Result<Process<Self::ProcessResult>, RunnerControllerError> {
let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10);
let future: Pin<Box<dyn Future<Output = Self::ProcessResult> + '_>> =
Box::pin(async move {
let layout: &str = Layout::RecursiveWithPoseidon.into();

let mut cairo_pie = NamedTempFile::new()?;
cairo_pie.write_all(&job.cairo_pie)?;

let input = BootloaderInput {
tasks: vec![BootloaderTask {
path: cairo_pie.path().to_path_buf(),
..Default::default()
}],
..Default::default()
};

let mut program_input = NamedTempFile::new()?;
program_input.write_all(&serde_json::to_string(&input)?.into_bytes())?;

// outputs
let air_public_input = NamedTempFile::new()?;
let air_private_input = NamedTempFile::new()?;
let trace = NamedTempFile::new()?;
let memory = NamedTempFile::new()?;

let mut task = Command::new("cairo-run")
.arg("--program")
.arg(self.program_path.as_path())
.arg("--layout")
.arg(layout)
.arg("--program_input")
.arg(program_input.path())
.arg("--air_public_input")
.arg(air_public_input.path())
.arg("--air_private_input")
.arg(air_private_input.path())
.arg("--trace_file")
.arg(trace.path())
.arg("--memory_file")
.arg(memory.path())
.arg("--proof_mode")
.arg("--print_output")
.stdout(Stdio::null())
.spawn()?;

let job_hash = hash!(job);

debug!("task {} spawned", job_hash);

loop {
select! {
output = task.wait() => {
debug!("{:?}", output);
if !output?.success() {
return Err(RunnerControllerError::TaskTerminated);
}
let output = task.wait_with_output().await?;
debug!("{:?}", output);
break;
}
Some(()) = terminate_rx.recv() => {
task.start_kill()?;
}
}
}
Ok(JobTrace { air_public_input, air_private_input, memory, trace })
});

Ok(Process::new(future, terminate_tx))
}
}
Binary file not shown.
Loading

0 comments on commit 55dd1b4

Please sign in to comment.