Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Aug 8, 2024
1 parent ab981ba commit df45f7e
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 19 deletions.
89 changes: 89 additions & 0 deletions crates/delegator/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use async_stream::stream;
use axum::{
extract::{Query, State},
response::{sse::Event, Sse},
Json,
};
use futures::StreamExt;
use hyper::StatusCode;
use serde::{Deserialize, Serialize};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{io, time::Duration};
use tokio::sync::{broadcast, mpsc};
use tokio_stream::Stream;
use zetina_common::{hash, job::JobData, job_witness::JobWitness};

#[derive(Debug)]
pub struct ServerState {
pub delegate_tx: mpsc::Sender<JobData>,
pub finished_rx: broadcast::Receiver<JobWitness>,
}

impl Clone for ServerState {
fn clone(&self) -> Self {
Self {
delegate_tx: self.delegate_tx.to_owned(),
finished_rx: self.finished_rx.resubscribe(),
}
}
}

#[derive(Debug, Deserialize)]
pub struct DelegateRequest {
pie: Vec<u8>,
}

#[derive(Debug, Serialize)]
pub struct DelegateResponse {
job_hash: String,
}

pub async fn deletage_handler(
State(state): State<ServerState>,
Json(input): Json<DelegateRequest>,
) -> Result<Json<DelegateResponse>, StatusCode> {
let job_data = JobData::new(input.pie);
let job_data_hash = hash!(&job_data);
state.delegate_tx.send(job_data).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(DelegateResponse { job_hash: job_data_hash.to_string() }))
}

#[derive(Debug, Deserialize)]
pub struct JobEventsRequest {
job_hash: String,
}

#[derive(Debug, Serialize)]
#[serde(tag = "type", content = "data")]
pub enum JobEventsResponse {
Finished(Vec<u8>),
}

pub async fn job_events_handler(
State(mut state): State<ServerState>,
Query(input): Query<JobEventsRequest>,
) -> Sse<impl Stream<Item = Result<Event, io::Error>>> {
let stream = stream! {
let job_hash = input.job_hash.parse::<u64>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
loop {
tokio::select! {
Ok(job_witness) = state.finished_rx.recv() => {
if job_witness.job_hash == job_hash {
yield Event::default()
.json_data(JobEventsResponse::Finished(job_witness.proof))
.map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()));
}
}
else => break
}
}
}
.boxed();

Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(5))
.text("keep-alive-text"),
)
}
15 changes: 10 additions & 5 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use libp2p::gossipsub;
use starknet::signers::SigningKey;
use std::pin::Pin;
use thiserror::Error;
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::{broadcast, mpsc};
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use tokio_stream::StreamExt;
use tracing::{error, info};
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_common::job::{Job, JobData, JobDelegation};
use zetina_common::job_witness::JobWitness;
use zetina_peer::swarm::{
DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic,
};
Expand All @@ -24,7 +25,8 @@ impl Delegator {
pub fn new(
mut swarm_events: Pin<Box<dyn Stream<Item = PeerBehaviourEvent> + Send>>,
gossipsub_tx: Sender<GossipsubMessage>,
mut delegate_rx: Receiver<JobData>,
mut delegate_rx: mpsc::Receiver<JobData>,
finished_tx: broadcast::Sender<JobWitness>,
signing_key: SigningKey,
) -> Self {
Self {
Expand Down Expand Up @@ -65,7 +67,7 @@ impl Delegator {
match serde_json::from_slice::<DelegationMessage>(&message.data)? {
DelegationMessage::Finished(job_witness) => {
info!("Received finished job: {}", job_witness.job_hash);
todo!()
finished_tx.send(job_witness)?;
}
_ => {}
}
Expand Down Expand Up @@ -95,8 +97,11 @@ impl Drop for Delegator {

#[derive(Error, Debug)]
pub enum Error {
#[error("mpsc_send_error")]
MpscSendError(#[from] mpsc::error::SendError<GossipsubMessage>),
#[error("mpsc_send_error GossipsubMessage")]
MpscSendErrorGossipsubMessage(#[from] mpsc::error::SendError<GossipsubMessage>),

#[error("mpsc_send_error JobWitness")]
BreadcastSendErrorJobWitness(#[from] broadcast::error::SendError<JobWitness>),

#[error("io")]
Io(#[from] std::io::Error),
Expand Down
2 changes: 1 addition & 1 deletion crates/delegator/src/job_bid_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl JobBidQueue {
pub fn get_best(&self, job_hash: u64) -> Option<(Job, PeerId, u64)> {
self.map.get(&job_hash).and_then(|(job, bids)| {
bids.iter()
.max_by_key(|&(_, price)| price)
.min_by_key(|&(_, price)| price)
.map(|(peer_id, &price)| (job.clone(), *peer_id, price))
})
}
Expand Down
44 changes: 32 additions & 12 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
pub mod api;
pub mod delegator;
pub mod job_bid_queue;

use axum::Router;
use api::ServerState;
use axum::{
extract::DefaultBodyLimit,
routing::{get, post},
Router,
};
use clap::Parser;
use delegator::Delegator;
use libp2p::Multiaddr;
use starknet::{core::types::FieldElement, signers::SigningKey};
use std::{str::FromStr, time::Duration};
use tokio::{net::TcpListener, sync::mpsc};
use tower_http::{timeout::TimeoutLayer, trace::TraceLayer};
use tokio::{
net::TcpListener,
sync::{broadcast, mpsc},
};
use tower_http::{
cors::{Any, CorsLayer},
timeout::TimeoutLayer,
trace::TraceLayer,
};
use tracing_subscriber::EnvFilter;
use zetina_common::{graceful_shutdown::shutdown_signal, job::JobData};
use zetina_common::{graceful_shutdown::shutdown_signal, job::JobData, job_witness::JobWitness};
use zetina_peer::swarm::{GossipsubMessage, SwarmRunner};

#[derive(Parser)]
Expand Down Expand Up @@ -53,22 +66,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let (gossipsub_tx, gossipsub_rx) = mpsc::channel::<GossipsubMessage>(100);
let (delegate_tx, delegate_rx) = mpsc::channel::<JobData>(100);
let (finished_tx, finished_rx) = broadcast::channel::<JobWitness>(100);
let swarm_events = swarm_runner.run(gossipsub_rx);

Delegator::new(swarm_events, gossipsub_tx, delegate_rx, signing_key);
Delegator::new(swarm_events, gossipsub_tx, delegate_rx, finished_tx, signing_key);

// Create a `TcpListener` using tokio.
let listener = TcpListener::bind("0.0.0.0:3010").await.unwrap();
let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();

// Run the server with graceful shutdown
axum::serve(
listener,
Router::new().layer((
TraceLayer::new_for_http(),
// Graceful shutdown will wait for outstanding requests to complete. Add a timeout so
// requests don't hang forever.
TimeoutLayer::new(Duration::from_secs(10)),
)),
Router::new()
.route("/delegate", post(api::deletage_handler))
.route("/job_events", get(api::job_events_handler))
.layer((
TraceLayer::new_for_http(),
// Graceful shutdown will wait for outstanding requests to complete. Add a timeout so
// requests don't hang forever.
TimeoutLayer::new(Duration::from_secs(10)),
CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any),
DefaultBodyLimit::disable(),
))
.with_state(ServerState { delegate_tx, finished_rx }),
)
.with_graceful_shutdown(shutdown_signal())
.await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Executor::new(identity, swarm_events, gossipsub_tx, runner, prover);

// Create a `TcpListener` using tokio.
let listener = TcpListener::bind("0.0.0.0:3010").await.unwrap();
let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();

// Run the server with graceful shutdown
axum::serve(
Expand Down

0 comments on commit df45f7e

Please sign in to comment.