Skip to content

Commit

Permalink
too many picked job issues fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Apr 28, 2024
1 parent b701e92 commit 811c2a5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
21 changes: 15 additions & 6 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
#![deny(unused_crate_dependencies)]

use futures::StreamExt;
use futures::{stream::FuturesUnordered, StreamExt};
use libp2p::gossipsub::Event;
use sharp_p2p_common::{
hash,
job::Job,
network::Network,
node_account::NodeAccount,
process::Process,
topic::{gossipsub_ident_topic, Topic},
};
use sharp_p2p_compiler::{
cairo_compiler::tests::models::fixture, cairo_compiler::CairoCompiler,
cairo_compiler::{tests::models::fixture, CairoCompiler},
errors::CompilerControllerError,
traits::CompilerController,
};
use sharp_p2p_peer::{registry::RegistryHandler, swarm::SwarmRunner};
Expand Down Expand Up @@ -60,6 +62,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let compiler = CairoCompiler::new(node_account.get_signing_key(), registry_address);

let mut compiler_scheduler =
FuturesUnordered::<Process<'_, Result<Job, CompilerControllerError>>>::new();

// Read cairo program path from stdin
let mut stdin = BufReader::new(stdin()).lines();

Expand All @@ -68,10 +73,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(Some(_)) = stdin.next_line() => {
// TODO: handle fixture better way
let fixture = fixture();
let job = compiler.run(fixture.program_path, fixture.program_input_path).unwrap().await.unwrap();
let serialized_job = serde_json::to_string(&job).unwrap();
send_topic_tx.send(serialized_job.into()).await?;
info!("Sent a new job: {}", hash!(&job));
compiler_scheduler.push(compiler.run(fixture.program_path.clone(), fixture.program_input_path)?);
info!("Scheduled compiling program at path: {:?}", fixture.program_path);

},
Some(event) = message_stream.next() => {
match event {
Expand Down Expand Up @@ -100,6 +104,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Some(Ok(event_vec)) = event_stream.next() => {
debug!("{:?}", event_vec);
},
Some(Ok(job)) = compiler_scheduler.next() => {
let serialized_job = serde_json::to_string(&job).unwrap();
send_topic_tx.send(serialized_job.into()).await?;
info!("Sent a new job: {}", hash!(&job));
},
else => break
}
}
Expand Down
5 changes: 2 additions & 3 deletions crates/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::{
use tracing::{debug, info};
use tracing_subscriber::EnvFilter;

const MAX_PARALLEL_JOBS: usize = 2;
const MAX_PARALLEL_JOBS: usize = 1;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -128,8 +128,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
else => break
};

if runner_scheduler.len() < MAX_PARALLEL_JOBS
&& prover_scheduler.len() < MAX_PARALLEL_JOBS
if runner_scheduler.len() + prover_scheduler.len() < MAX_PARALLEL_JOBS
&& !job_record.is_empty()
{
if let Some(job) = job_record.take_job().await {
Expand Down

0 comments on commit 811c2a5

Please sign in to comment.