Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rsc: update read_job transaction hit path #1671

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
302 changes: 159 additions & 143 deletions rust/rsc/src/bin/rsc/read_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use crate::types::{
};
use axum::Json;
use entity::{job, job_use, output_dir, output_file, output_symlink};
use entity::prelude::Blob;
use futures::future::join_all;
use hyper::StatusCode;
use rand::{thread_rng, Rng};
use rsc::database;
use sea_orm::DatabaseTransaction;
use sea_orm::{
prelude::Uuid, ActiveModelTrait, ActiveValue::*, ColumnTrait, DatabaseConnection, DbErr,
EntityTrait, ModelTrait, QueryFilter, TransactionTrait,
EntityTrait, ModelTrait, QueryFilter, TransactionTrait, ConnectionTrait,
};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
Expand All @@ -34,26 +35,55 @@ async fn record_miss(hash: String, conn: Arc<DatabaseConnection>) {
}

#[tracing::instrument(skip(db, stores))]
async fn resolve_blob(
id: Uuid,
db: &DatabaseTransaction,
async fn resolve_blobs<T: ConnectionTrait>(
ids: &Vec<Uuid>,
db: &T,
stores: &HashMap<Uuid, Arc<dyn blob::DebugBlobStore + Sync + Send>>,
) -> Result<ResolvedBlob, String> {
let Ok(Some(blob)) = entity::prelude::Blob::find_by_id(id).one(db).await else {
return Err(format!("Unable to find blob {} by id", id));
};
) -> Result<HashMap<Uuid, ResolvedBlob>, String> {
//Postgres has a 65,535 parameter limit, ensuring we chunk ID's below that: https://www.postgresql.org/docs/current/limits.html
const CHUNK_SIZE: usize = 50_000;

let Some(store) = stores.get(&blob.store_id) else {
return Err(format!(
"Unable to find backing store {} for blob {}",
blob.store_id, id
));
};
let mut resolved_map = HashMap::new();

for chunk in ids.chunks(CHUNK_SIZE) {
// Fetch chunked blobs in a single query
let blob_map: HashMap<Uuid, entity::blob::Model> = Blob::find()
.filter(entity::blob::Column::Id.is_in(chunk.to_vec()))
.all(db)
.await
.map_err(|e| format!("Failed to query blobs, database error: {}", e))?
.into_iter()
.map(|b| (b.id, b))
.collect();

return Ok(ResolvedBlob {
id: blob.id,
url: store.download_url(blob.key).await,
});
// Ensure we have all requested blobs
for &id in chunk {
if !blob_map.contains_key(&id) {
return Err(format!("Unable to find blob {} by id", id));
}
}

// Resolve all download URLs in parallel
let futures = blob_map.iter().map(|(id, blob)| {
let store_opt = stores.get(&blob.store_id).cloned();
let key = blob.key.clone();

async move {
let store = store_opt.ok_or_else(|| {
format!("Unable to find backing store {} for blob {}", blob.store_id, id)
})?;
let url = store.download_url(key).await;
Ok::<(Uuid, ResolvedBlob), String>((*id, ResolvedBlob { id: *id, url }))
}
});

let results = join_all(futures).await;

let partial_map: HashMap<Uuid, ResolvedBlob> = results.into_iter().collect::<Result<_,_>>()?;
resolved_map.extend(partial_map);
}

Ok(resolved_map)
}

#[tracing::instrument(skip_all)]
Expand All @@ -63,152 +93,138 @@ pub async fn read_job(
blob_stores: HashMap<Uuid, Arc<dyn blob::DebugBlobStore + Sync + Send>>,
) -> (StatusCode, Json<ReadJobResponse>) {
let hash = payload.hash();
let hash_for_spawns = hash.clone();

// TODO: This transaction is quite large with a bunch of "serialized" queries. If read_job
// becomes a bottleneck it should be rewritten such that joining on promises is delayed for as
// long as possible. Another option would be to collect all blob ids ahead of time and make a
// single db query to list them all out instead of a query per blob id.
let result = conn
// Fetch the job and related entities in a single transaction
let fetch_result = conn
.as_ref()
.transaction::<_, (Option<Uuid>, ReadJobResponse), DbErr>(|txn| {
let hash = hash.clone();
.transaction::<_, Option<(job::Model, Vec<output_file::Model>, Vec<output_symlink::Model>, Vec<output_dir::Model>)>, DbErr>(|txn| {
Box::pin(async move {
let Some(matching_job) = job::Entity::find()
.filter(job::Column::Hash.eq(hash.clone()))
.one(txn)
.await?
else {
tracing::info!(%hash, "Miss");
return Ok((None, ReadJobResponse::NoMatch));
return Ok(None);
};

let output_files = matching_job.find_related(output_file::Entity).all(txn).await?;
let output_symlinks = matching_job.find_related(output_symlink::Entity).all(txn).await?;
let output_dirs = matching_job.find_related(output_dir::Entity).all(txn).await?;

tracing::info!(%hash, "Hit");
let output_files = matching_job
.find_related(output_file::Entity)
.all(txn)
.await?
.into_iter()
.map(|m| {
let stores_copy = blob_stores.clone();
async move {
let blob = resolve_blob(m.blob_id, txn, &stores_copy).await?;

Ok(ResolvedBlobFile {
path: m.path,
mode: m.mode,
blob,
})
}
});

let output_files: Result<Vec<ResolvedBlobFile>, String> =
futures::future::join_all(output_files)
.await
.into_iter()
.collect();

let output_files = match output_files {
Err(err) => {
tracing::error! {%err, "Failed to resolve all output files. Resolving job as a cache miss."};
return Ok((None, ReadJobResponse::NoMatch))
},
Ok(files) => files,
};
Ok(Some((matching_job, output_files, output_symlinks, output_dirs)))
})
})
.await;

let hash_copy = hash_for_spawns.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this just hash.clone()?

let Some((matching_job, output_files, output_symlinks, output_dirs)) = fetch_result.ok().flatten() else {
tokio::spawn(async move {
record_miss(hash_copy, conn.clone()).await;
});
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
};

let output_symlinks = matching_job
.find_related(output_symlink::Entity)
.all(txn)
.await?
.into_iter()
.map(|m| Symlink {
path: m.path,
link: m.link,
})
.collect();

let output_dirs = matching_job
.find_related(output_dir::Entity)
.all(txn)
.await?
.into_iter()
.map(|m| Dir {
path: m.path,
mode: m.mode,
hidden: Some(m.hidden),
})
.collect();

let stdout_blob = match resolve_blob(matching_job.stdout_blob_id, txn, &blob_stores).await {
Err(err) => {
tracing::error! {%err, "Failed to resolve stdout blob. Resolving job as a cache miss."};
return Ok((None, ReadJobResponse::NoMatch))
},
Ok(blob) => blob,
};
// Collect all the blob IDs we need to resolve
let mut blob_ids: Vec<Uuid> = output_files.iter().map(|f| f.blob_id).collect();
blob_ids.push(matching_job.stdout_blob_id);
blob_ids.push(matching_job.stderr_blob_id);

let stderr_blob = match resolve_blob(matching_job.stderr_blob_id, txn, &blob_stores).await {
Err(err) => {
tracing::error! {%err, "Failed to resolve stderr blob. Resolving job as a cache miss."};
return Ok((None, ReadJobResponse::NoMatch))
},
Ok(blob) => blob,
};
// Resolve all needed blobs in one go
let resolved_blob_map = match resolve_blobs(&blob_ids, conn.as_ref(), &blob_stores).await {
Ok(map) => map,
Err(err) => {
tracing::error!(%err, "Failed to resolve blobs. Resolving job as a cache miss.");
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
};

Ok((
Some(matching_job.id),
ReadJobResponse::Match {
output_symlinks,
output_dirs,
output_files,
stdout_blob,
stderr_blob,
status: matching_job.status,
runtime: matching_job.runtime,
cputime: matching_job.cputime,
memory: matching_job.memory as u64,
ibytes: matching_job.i_bytes as u64,
obytes: matching_job.o_bytes as u64,
},
))
// Construct ResolvedBlobFile for each output file
let output_files = output_files
.into_iter()
.map(|m| {
let blob_id = m.blob_id;
let resolved_blob = resolved_blob_map.get(&blob_id).cloned().ok_or_else(|| {
format!("Missing resolved blob for {}", blob_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to track the job that had a blob fetch failure? I can see that being useful debug info when something is failing.

})?;
Ok(ResolvedBlobFile {
path: m.path,
mode: m.mode,
blob: resolved_blob,
})
})
.await;
.collect::<Result<Vec<_>, String>>();

match result {
Ok((Some(job_id), response)) => {
// If we get a match we want to record the use but we don't
// want to block sending the response on it so we spawn a task
// to go do that.
let mut status = StatusCode::NOT_FOUND;
if let ReadJobResponse::Match { .. } = response {
status = StatusCode::OK;
let shared_conn = conn.clone();
tokio::spawn(async move {
record_hit(job_id, hash, shared_conn).await;
});
}
(status, Json(response))
let output_files = match output_files {
Ok(files) => files,
Err(err) => {
tracing::error!(%err, "Failed to resolve all output files. Resolving job as a cache miss.");
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
Ok((None, _)) => {
let shared_conn = conn.clone();
tokio::spawn(async move {
record_miss(hash, shared_conn).await;
});
(StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch))
};

// Collect other resolved entities
let output_symlinks: Vec<Symlink> = output_symlinks
.into_iter()
.map(|m| Symlink {
path: m.path,
link: m.link,
})
.collect();

let output_dirs: Vec<Dir> = output_dirs
.into_iter()
.map(|m| Dir {
path: m.path,
mode: m.mode,
hidden: Some(m.hidden),
})
.collect();

// Resolve stdout and stderr blobs from the map
let stdout_blob = match resolved_blob_map.get(&matching_job.stdout_blob_id) {
Some(blob) => blob.clone(),
None => {
tracing::error!("Failed to resolve stdout blob. Resolving job as a cache miss.");
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
Err(cause) => {
tracing::error! {
%cause,
"failed to read job"
};
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ReadJobResponse::NoMatch),
)
};

let stderr_blob = match resolved_blob_map.get(&matching_job.stderr_blob_id) {
Some(blob) => blob.clone(),
None => {
tracing::error!("Failed to resolve stderr blob. Resolving job as a cache miss.");
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
}
};

// Construct response
let response = ReadJobResponse::Match {
output_symlinks,
output_dirs,
output_files,
stdout_blob,
stderr_blob,
status: matching_job.status,
runtime: matching_job.runtime,
cputime: matching_job.cputime,
memory: matching_job.memory as u64,
ibytes: matching_job.i_bytes as u64,
obytes: matching_job.o_bytes as u64,
};

let job_id = matching_job.id;
let hash_copy = hash_for_spawns.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again it feels like we didn't need to create this separate hash_for_spawns variable and could have instead continued to clone hash

tracing::info!(%hash_copy, "Hit");
tokio::spawn(async move {
record_hit(job_id, hash_copy, conn.clone()).await;
});

(StatusCode::OK, Json(response))
}


#[tracing::instrument(skip_all)]
pub async fn allow_job(
Json(payload): Json<AllowJobPayload>,
Expand Down
2 changes: 1 addition & 1 deletion rust/rsc/src/bin/rsc/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub enum PostBlobResponse {
Ok { blobs: Vec<PostBlobResponsePart> },
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResolvedBlob {
pub id: Uuid,
pub url: String,
Expand Down