Skip to content

Commit

Permalink
Suit new judge report APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
slhmy committed Jun 15, 2024
1 parent 204fcc5 commit 3cd1da1
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 38 deletions.
12 changes: 10 additions & 2 deletions judger/src/agent/http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use reqwest::Url;

pub struct HttpClient {
client: reqwest::Client,
base_url: String,
Expand All @@ -9,7 +11,13 @@ impl HttpClient {
Self { client, base_url }
}

pub fn post(&self, path: String) -> reqwest::RequestBuilder {
self.client.post(format!("{}{}", self.base_url, path))
pub fn post(&self, path: String) -> Result<reqwest::RequestBuilder, anyhow::Error> {
let url = Url::parse(&format!("{}{}", self.base_url, path))?;
Ok(self.client.post(url))
}

pub fn put(&self, path: String) -> Result<reqwest::RequestBuilder, anyhow::Error> {
let url = Url::parse(&format!("{}{}", self.base_url, path))?;
Ok(self.client.put(url))
}
}
133 changes: 115 additions & 18 deletions judger/src/agent/platform/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::http::HttpClient;
use judge_core::{compiler::Language, judge::result::JudgeResultInfo};
use judge_core::{compiler::Language, judge::result::JudgeVerdict};

pub struct PlatformClient {
client: HttpClient,
Expand All @@ -12,16 +12,41 @@ impl PlatformClient {
}
}

pub async fn pick_task(&self) -> Result<Option<JudgeTask>, anyhow::Error> {
pick_task(&self.client).await
pub async fn pick_judge_task(&self) -> Result<Option<JudgeTask>, anyhow::Error> {
pick_judge_task(&self.client).await
}

pub async fn report_task(
pub async fn report_judge_result_count(
&self,
judge_uid: &str,
result_count: usize,
) -> Result<(), anyhow::Error> {
report_judge_result_count(&self.client, judge_uid, result_count).await
}

pub async fn report_judge_result(
&self,
judge_uid: &str,
verdict: JudgeVerdict,
time_usage_ms: usize,
memory_usage_bytes: usize,
) -> Result<(), anyhow::Error> {
report_judge_result(
&self.client,
judge_uid,
verdict,
time_usage_ms,
memory_usage_bytes,
)
.await
}

pub async fn report_judge_task(
&self,
stream_id: &str,
results: Vec<JudgeResultInfo>,
verdict: JudgeVerdict,
) -> Result<(), anyhow::Error> {
report_task(&self.client, stream_id, results).await
report_task(&self.client, stream_id, verdict).await
}
}

Expand All @@ -45,12 +70,16 @@ struct PickTaskResponse {
task: JudgeTask,
}

async fn pick_task(client: &HttpClient) -> Result<Option<JudgeTask>, anyhow::Error> {
async fn pick_judge_task(client: &HttpClient) -> Result<Option<JudgeTask>, anyhow::Error> {
let pick_url = "api/v1/judge/task/pick";
let body = PickTaskBody {
consumer: "".to_string(),
};
let response = client.post(pick_url.to_string()).json(&body).send().await?;
let response = client
.post(pick_url.to_string())?
.json(&body)
.send()
.await?;

match response.status() {
reqwest::StatusCode::OK => Ok(Some(response.json::<PickTaskResponse>().await?.task)),
Expand All @@ -66,29 +95,97 @@ async fn pick_task(client: &HttpClient) -> Result<Option<JudgeTask>, anyhow::Err
}

#[derive(Serialize)]
struct ReportTaskBody {
struct ReportJudgeResultCountBody {
#[serde(rename = "judgeUID")]
judge_uid: String,
#[serde(rename = "resultCount")]
result_count: usize,
}

async fn report_judge_result_count(
client: &HttpClient,
judge_uid: &str,
result_count: usize,
) -> Result<(), anyhow::Error> {
let report_url = "api/v1/judge/task/report/result-count";
let body = ReportJudgeResultCountBody {
judge_uid: judge_uid.to_owned(),
result_count,
};
let response = client
.put(report_url.to_string())?
.json(&body)
.send()
.await?;

match response.status() {
reqwest::StatusCode::OK => Ok(()),
_ => Err(anyhow::anyhow!("Report JudgeResultCount Failed")),
}
}

#[derive(Serialize)]
struct ReportJudgeResultBody {
#[serde(rename = "judgeUID")]
judge_uid: String,
verdict: JudgeVerdict,
#[serde(rename = "timeUsageMS")]
time_usage_ms: usize,
#[serde(rename = "memoryUsageBytes")]
memory_usage_bytes: usize,
}

async fn report_judge_result(
client: &HttpClient,
judge_uid: &str,
verdict: JudgeVerdict,
time_usage_ms: usize,
memory_usage_bytes: usize,
) -> Result<(), anyhow::Error> {
let report_url = "api/v1/judge/task/report/result";
let body = ReportJudgeResultBody {
judge_uid: judge_uid.to_owned(),
verdict,
time_usage_ms,
memory_usage_bytes,
};
let response = client
.post(report_url.to_string())?
.json(&body)
.send()
.await?;

match response.status() {
reqwest::StatusCode::OK => Ok(()),
_ => Err(anyhow::anyhow!("Report JudgeResult Failed")),
}
}

#[derive(Serialize)]
struct ReportJudgeTaskBody {
consumer: String,
stream_id: String,
verdict_json: String,
#[serde(rename = "redisStreamID")]
redis_stream_id: String,
verdict: JudgeVerdict,
}
#[derive(Deserialize, Debug)]
struct ReportTaskResponse {
struct ReportJudgeTaskResponse {
message: String,
}

async fn report_task(
client: &HttpClient,
stream_id: &str,
results: Vec<JudgeResultInfo>,
verdict: JudgeVerdict,
) -> Result<(), anyhow::Error> {
let report_url = "api/v1/judge/task/report";
let body = ReportTaskBody {
let body = ReportJudgeTaskBody {
consumer: "".to_string(),
stream_id: stream_id.to_owned(),
verdict_json: serde_json::to_string(&results).unwrap(),
redis_stream_id: stream_id.to_owned(),
verdict,
};
let response = client
.post(report_url.to_string())
.put(report_url.to_string())?
.json(&body)
.send()
.await?;
Expand All @@ -97,7 +194,7 @@ async fn report_task(
reqwest::StatusCode::OK => {
log::debug!(
"Report message: {:?}",
response.json::<ReportTaskResponse>().await?.message
response.json::<ReportJudgeTaskResponse>().await?.message
);
Ok(())
}
Expand Down
26 changes: 16 additions & 10 deletions judger/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ async fn main() -> std::io::Result<()> {
problem_slug,
language,
src_path,
} => judge(
maybe_rclone_client,
opt.problem_package_bucket,
opt.problem_package_dir,
problem_slug,
language,
src_path,
),
} => {
judge(
maybe_rclone_client,
opt.problem_package_bucket,
opt.problem_package_dir,
problem_slug,
language,
src_path,
)
.await
}
}
}

Expand Down Expand Up @@ -95,7 +98,7 @@ async fn serve(
.await
}

fn judge(
async fn judge(
maybe_rclone_client: Option<RcloneClient>,
problem_package_bucket: String,
problem_package_dir: PathBuf,
Expand Down Expand Up @@ -126,7 +129,10 @@ fn judge(
}
};

match worker.run_judge(problem_slug, language, code) {
match worker
.run_judge("".to_string(), problem_slug, language, code)
.await
{
Ok(result) => {
println!("{:?}", result);
}
Expand Down
54 changes: 46 additions & 8 deletions judger/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,33 @@ impl JudgeWorker {
let mut interval = interval(Duration::from_secs(self.interval_sec));
loop {
interval.tick().await;
match platform_client.pick_task().await {
match platform_client.pick_judge_task().await {
Ok(maybe_task) => {
if maybe_task.is_none() {
continue;
}
let task = maybe_task.unwrap();
log::info!("Received task: {:?}", task);
match self.run_judge(
task.problem_slug.clone(),
task.language,
task.code.clone(),
) {
match self
.run_judge(
task.judge_uid.clone(),
task.problem_slug.clone(),
task.language,
task.code.clone(),
)
.await
{
Ok(results) => {
let mut verdict = JudgeVerdict::Accepted;
for result in results.iter() {
if result.verdict != JudgeVerdict::Accepted {
verdict = result.verdict.clone();
break;
}
}

let report_response = platform_client
.report_task(&task.redis_stream_id.clone(), results)
.report_judge_task(&task.redis_stream_id.clone(), verdict)
.await;
if report_response.is_err() {
log::debug!(
Expand All @@ -94,12 +106,15 @@ impl JudgeWorker {
}
}

pub fn run_judge(
pub async fn run_judge(
&self,
judge_uid: String,
problem_slug: String,
language: Language,
code: String,
) -> Result<Vec<JudgeResultInfo>, anyhow::Error> {
let platform_client = self.maybe_platform_client.as_ref().unwrap();

if let Some(rclone_client) = self.maybe_rclone_client.as_ref() {
rclone_client.sync_bucket(&self.package_bucket, &self.package_dir)?;
}
Expand Down Expand Up @@ -146,6 +161,16 @@ impl JudgeWorker {
let builder = new_builder_result.expect("builder creater error");
log::debug!("Builder created: {:?}", builder);
let mut results: Vec<JudgeResultInfo> = vec![];

// TODO: need to move out from this function
// because it's effecting the CLI judge usage too
let _ = platform_client
.report_judge_result_count(&judge_uid, builder.testdata_configs.len())
.await
.map_err(|e| {
log::warn!("Failed to report judge result count: {:?}", e);
});

for idx in 0..builder.testdata_configs.len() {
let judge_config = JudgeConfig {
test_data: builder.testdata_configs[idx].clone(),
Expand All @@ -158,6 +183,19 @@ impl JudgeWorker {
anyhow::anyhow!("Failed to run judge: {:?}", e)
})?;
log::debug!("Judge result: {:?}", result);

let _ = platform_client
.report_judge_result(
&judge_uid,
result.verdict.clone(),
result.time_usage.as_millis() as usize,
result.memory_usage_bytes as usize,
)
.await
.map_err(|e| {
log::warn!("Failed to report judge result count: {:?}", e);
});

results.push(result);
}

Expand Down

0 comments on commit 3cd1da1

Please sign in to comment.