diff --git a/rust/rsc/src/bin/rsc/read_job.rs b/rust/rsc/src/bin/rsc/read_job.rs index f2ce8691e..d2f157a9e 100644 --- a/rust/rsc/src/bin/rsc/read_job.rs +++ b/rust/rsc/src/bin/rsc/read_job.rs @@ -5,13 +5,14 @@ use crate::types::{ }; use axum::Json; use entity::{job, job_use, output_dir, output_file, output_symlink}; +use entity::prelude::Blob; +use futures::future::join_all; use hyper::StatusCode; use rand::{thread_rng, Rng}; use rsc::database; -use sea_orm::DatabaseTransaction; use sea_orm::{ prelude::Uuid, ActiveModelTrait, ActiveValue::*, ColumnTrait, DatabaseConnection, DbErr, - EntityTrait, ModelTrait, QueryFilter, TransactionTrait, + EntityTrait, ModelTrait, QueryFilter, TransactionTrait, ConnectionTrait, }; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -34,26 +35,55 @@ async fn record_miss(hash: String, conn: Arc) { } #[tracing::instrument(skip(db, stores))] -async fn resolve_blob( - id: Uuid, - db: &DatabaseTransaction, +async fn resolve_blobs( + ids: &Vec, + db: &T, stores: &HashMap>, -) -> Result { - let Ok(Some(blob)) = entity::prelude::Blob::find_by_id(id).one(db).await else { - return Err(format!("Unable to find blob {} by id", id)); - }; +) -> Result, String> { + //Postgres has a 65,535 parameter limit, ensuring we chunk ID's below that: https://www.postgresql.org/docs/current/limits.html + const CHUNK_SIZE: usize = 50_000; - let Some(store) = stores.get(&blob.store_id) else { - return Err(format!( - "Unable to find backing store {} for blob {}", - blob.store_id, id - )); - }; + let mut resolved_map = HashMap::new(); + + for chunk in ids.chunks(CHUNK_SIZE) { + // Fetch chunked blobs in a single query + let blob_map: HashMap = Blob::find() + .filter(entity::blob::Column::Id.is_in(chunk.to_vec())) + .all(db) + .await + .map_err(|e| format!("Failed to query blobs, database error: {}", e))? + .into_iter() + .map(|b| (b.id, b)) + .collect(); - return Ok(ResolvedBlob { - id: blob.id, - url: store.download_url(blob.key).await, - }); + // Ensure we have all requested blobs + for &id in chunk { + if !blob_map.contains_key(&id) { + return Err(format!("Unable to find blob {} by id", id)); + } + } + + // Resolve all download URLs in parallel + let futures = blob_map.iter().map(|(id, blob)| { + let store_opt = stores.get(&blob.store_id).cloned(); + let key = blob.key.clone(); + + async move { + let store = store_opt.ok_or_else(|| { + format!("Unable to find backing store {} for blob {}", blob.store_id, id) + })?; + let url = store.download_url(key).await; + Ok::<(Uuid, ResolvedBlob), String>((*id, ResolvedBlob { id: *id, url })) + } + }); + + let results = join_all(futures).await; + + let partial_map: HashMap = results.into_iter().collect::>()?; + resolved_map.extend(partial_map); + } + + Ok(resolved_map) } #[tracing::instrument(skip_all)] @@ -63,15 +93,12 @@ pub async fn read_job( blob_stores: HashMap>, ) -> (StatusCode, Json) { let hash = payload.hash(); + let hash_for_spawns = hash.clone(); - // TODO: This transaction is quite large with a bunch of "serialized" queries. If read_job - // becomes a bottleneck it should be rewritten such that joining on promises is delayed for as - // long as possible. Another option would be to collect all blob ids ahead of time and make a - // single db query to list them all out instead of a query per blob id. - let result = conn + // Fetch the job and related entities in a single transaction + let fetch_result = conn .as_ref() - .transaction::<_, (Option, ReadJobResponse), DbErr>(|txn| { - let hash = hash.clone(); + .transaction::<_, Option<(job::Model, Vec, Vec, Vec)>, DbErr>(|txn| { Box::pin(async move { let Some(matching_job) = job::Entity::find() .filter(job::Column::Hash.eq(hash.clone())) @@ -79,136 +106,125 @@ pub async fn read_job( .await? else { tracing::info!(%hash, "Miss"); - return Ok((None, ReadJobResponse::NoMatch)); + return Ok(None); }; + + let output_files = matching_job.find_related(output_file::Entity).all(txn).await?; + let output_symlinks = matching_job.find_related(output_symlink::Entity).all(txn).await?; + let output_dirs = matching_job.find_related(output_dir::Entity).all(txn).await?; - tracing::info!(%hash, "Hit"); - let output_files = matching_job - .find_related(output_file::Entity) - .all(txn) - .await? - .into_iter() - .map(|m| { - let stores_copy = blob_stores.clone(); - async move { - let blob = resolve_blob(m.blob_id, txn, &stores_copy).await?; - - Ok(ResolvedBlobFile { - path: m.path, - mode: m.mode, - blob, - }) - } - }); - - let output_files: Result, String> = - futures::future::join_all(output_files) - .await - .into_iter() - .collect(); - - let output_files = match output_files { - Err(err) => { - tracing::error! {%err, "Failed to resolve all output files. Resolving job as a cache miss."}; - return Ok((None, ReadJobResponse::NoMatch)) - }, - Ok(files) => files, - }; + Ok(Some((matching_job, output_files, output_symlinks, output_dirs))) + }) + }) + .await; + + let hash_copy = hash_for_spawns.clone(); + let Some((matching_job, output_files, output_symlinks, output_dirs)) = fetch_result.ok().flatten() else { + tokio::spawn(async move { + record_miss(hash_copy, conn.clone()).await; + }); + return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); + }; - let output_symlinks = matching_job - .find_related(output_symlink::Entity) - .all(txn) - .await? - .into_iter() - .map(|m| Symlink { - path: m.path, - link: m.link, - }) - .collect(); - - let output_dirs = matching_job - .find_related(output_dir::Entity) - .all(txn) - .await? - .into_iter() - .map(|m| Dir { - path: m.path, - mode: m.mode, - hidden: Some(m.hidden), - }) - .collect(); - - let stdout_blob = match resolve_blob(matching_job.stdout_blob_id, txn, &blob_stores).await { - Err(err) => { - tracing::error! {%err, "Failed to resolve stdout blob. Resolving job as a cache miss."}; - return Ok((None, ReadJobResponse::NoMatch)) - }, - Ok(blob) => blob, - }; + // Collect all the blob IDs we need to resolve + let mut blob_ids: Vec = output_files.iter().map(|f| f.blob_id).collect(); + blob_ids.push(matching_job.stdout_blob_id); + blob_ids.push(matching_job.stderr_blob_id); - let stderr_blob = match resolve_blob(matching_job.stderr_blob_id, txn, &blob_stores).await { - Err(err) => { - tracing::error! {%err, "Failed to resolve stderr blob. Resolving job as a cache miss."}; - return Ok((None, ReadJobResponse::NoMatch)) - }, - Ok(blob) => blob, - }; + // Resolve all needed blobs in one go + let resolved_blob_map = match resolve_blobs(&blob_ids, conn.as_ref(), &blob_stores).await { + Ok(map) => map, + Err(err) => { + tracing::error!(%err, "Failed to resolve blobs. Resolving job as a cache miss."); + return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); + } + }; - Ok(( - Some(matching_job.id), - ReadJobResponse::Match { - output_symlinks, - output_dirs, - output_files, - stdout_blob, - stderr_blob, - status: matching_job.status, - runtime: matching_job.runtime, - cputime: matching_job.cputime, - memory: matching_job.memory as u64, - ibytes: matching_job.i_bytes as u64, - obytes: matching_job.o_bytes as u64, - }, - )) + // Construct ResolvedBlobFile for each output file + let output_files = output_files + .into_iter() + .map(|m| { + let blob_id = m.blob_id; + let resolved_blob = resolved_blob_map.get(&blob_id).cloned().ok_or_else(|| { + format!("Missing resolved blob for {}", blob_id) + })?; + Ok(ResolvedBlobFile { + path: m.path, + mode: m.mode, + blob: resolved_blob, }) }) - .await; + .collect::, String>>(); - match result { - Ok((Some(job_id), response)) => { - // If we get a match we want to record the use but we don't - // want to block sending the response on it so we spawn a task - // to go do that. - let mut status = StatusCode::NOT_FOUND; - if let ReadJobResponse::Match { .. } = response { - status = StatusCode::OK; - let shared_conn = conn.clone(); - tokio::spawn(async move { - record_hit(job_id, hash, shared_conn).await; - }); - } - (status, Json(response)) + let output_files = match output_files { + Ok(files) => files, + Err(err) => { + tracing::error!(%err, "Failed to resolve all output files. Resolving job as a cache miss."); + return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); } - Ok((None, _)) => { - let shared_conn = conn.clone(); - tokio::spawn(async move { - record_miss(hash, shared_conn).await; - }); - (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)) + }; + + // Collect other resolved entities + let output_symlinks: Vec = output_symlinks + .into_iter() + .map(|m| Symlink { + path: m.path, + link: m.link, + }) + .collect(); + + let output_dirs: Vec = output_dirs + .into_iter() + .map(|m| Dir { + path: m.path, + mode: m.mode, + hidden: Some(m.hidden), + }) + .collect(); + + // Resolve stdout and stderr blobs from the map + let stdout_blob = match resolved_blob_map.get(&matching_job.stdout_blob_id) { + Some(blob) => blob.clone(), + None => { + tracing::error!("Failed to resolve stdout blob. Resolving job as a cache miss."); + return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); } - Err(cause) => { - tracing::error! { - %cause, - "failed to read job" - }; - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ReadJobResponse::NoMatch), - ) + }; + + let stderr_blob = match resolved_blob_map.get(&matching_job.stderr_blob_id) { + Some(blob) => blob.clone(), + None => { + tracing::error!("Failed to resolve stderr blob. Resolving job as a cache miss."); + return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); } - } + }; + + // Construct response + let response = ReadJobResponse::Match { + output_symlinks, + output_dirs, + output_files, + stdout_blob, + stderr_blob, + status: matching_job.status, + runtime: matching_job.runtime, + cputime: matching_job.cputime, + memory: matching_job.memory as u64, + ibytes: matching_job.i_bytes as u64, + obytes: matching_job.o_bytes as u64, + }; + + let job_id = matching_job.id; + let hash_copy = hash_for_spawns.clone(); + tracing::info!(%hash_copy, "Hit"); + tokio::spawn(async move { + record_hit(job_id, hash_copy, conn.clone()).await; + }); + + (StatusCode::OK, Json(response)) } + #[tracing::instrument(skip_all)] pub async fn allow_job( Json(payload): Json, diff --git a/rust/rsc/src/bin/rsc/types.rs b/rust/rsc/src/bin/rsc/types.rs index f719db73e..194a38db5 100644 --- a/rust/rsc/src/bin/rsc/types.rs +++ b/rust/rsc/src/bin/rsc/types.rs @@ -228,7 +228,7 @@ pub enum PostBlobResponse { Ok { blobs: Vec }, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ResolvedBlob { pub id: Uuid, pub url: String,