From 811c2a54563c7bceda408328e0e59b25aa464356 Mon Sep 17 00:00:00 2001 From: Bartosz Nowak Date: Sun, 28 Apr 2024 22:19:54 +0200 Subject: [PATCH] too many picked job issues fixed --- crates/delegator/src/main.rs | 21 +++++++++++++++------ crates/executor/src/main.rs | 5 ++--- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index a59e6b1..d450828 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -1,16 +1,18 @@ #![deny(unused_crate_dependencies)] -use futures::StreamExt; +use futures::{stream::FuturesUnordered, StreamExt}; use libp2p::gossipsub::Event; use sharp_p2p_common::{ hash, job::Job, network::Network, node_account::NodeAccount, + process::Process, topic::{gossipsub_ident_topic, Topic}, }; use sharp_p2p_compiler::{ - cairo_compiler::tests::models::fixture, cairo_compiler::CairoCompiler, + cairo_compiler::{tests::models::fixture, CairoCompiler}, + errors::CompilerControllerError, traits::CompilerController, }; use sharp_p2p_peer::{registry::RegistryHandler, swarm::SwarmRunner}; @@ -60,6 +62,9 @@ async fn main() -> Result<(), Box> { let compiler = CairoCompiler::new(node_account.get_signing_key(), registry_address); + let mut compiler_scheduler = + FuturesUnordered::>>::new(); + // Read cairo program path from stdin let mut stdin = BufReader::new(stdin()).lines(); @@ -68,10 +73,9 @@ async fn main() -> Result<(), Box> { Ok(Some(_)) = stdin.next_line() => { // TODO: handle fixture better way let fixture = fixture(); - let job = compiler.run(fixture.program_path, fixture.program_input_path).unwrap().await.unwrap(); - let serialized_job = serde_json::to_string(&job).unwrap(); - send_topic_tx.send(serialized_job.into()).await?; - info!("Sent a new job: {}", hash!(&job)); + compiler_scheduler.push(compiler.run(fixture.program_path.clone(), fixture.program_input_path)?); + info!("Scheduled compiling program at path: {:?}", fixture.program_path); + }, Some(event) = message_stream.next() => { match event { @@ -100,6 +104,11 @@ async fn main() -> Result<(), Box> { Some(Ok(event_vec)) = event_stream.next() => { debug!("{:?}", event_vec); }, + Some(Ok(job)) = compiler_scheduler.next() => { + let serialized_job = serde_json::to_string(&job).unwrap(); + send_topic_tx.send(serialized_job.into()).await?; + info!("Sent a new job: {}", hash!(&job)); + }, else => break } } diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 7c70015..df5dea0 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -29,7 +29,7 @@ use tokio::{ use tracing::{debug, info}; use tracing_subscriber::EnvFilter; -const MAX_PARALLEL_JOBS: usize = 2; +const MAX_PARALLEL_JOBS: usize = 1; #[tokio::main] async fn main() -> Result<(), Box> { @@ -128,8 +128,7 @@ async fn main() -> Result<(), Box> { else => break }; - if runner_scheduler.len() < MAX_PARALLEL_JOBS - && prover_scheduler.len() < MAX_PARALLEL_JOBS + if runner_scheduler.len() + prover_scheduler.len() < MAX_PARALLEL_JOBS && !job_record.is_empty() { if let Some(job) = job_record.take_job().await {