Skip to content

Commit

Permalink
Set a metric field for when GHA 429's
Browse files Browse the repository at this point in the history
  • Loading branch information
grahamc authored and cole-h committed Jan 16, 2025
1 parent 3cb38e2 commit 37d2f6a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
44 changes: 33 additions & 11 deletions gha-cache/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const MAX_CONCURRENCY: usize = 4;

type Result<T> = std::result::Result<T, Error>;

pub type CircuitBreakerTrippedCallback = Arc<Box<dyn Fn() + Send + Sync>>;

/// An API error.
#[derive(Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -82,7 +84,6 @@ pub enum Error {
TooManyCollisions,
}

#[derive(Debug)]
pub struct Api {
/// Credentials to access the cache.
credentials: Credentials,
Expand All @@ -104,6 +105,8 @@ pub struct Api {

circuit_breaker_429_tripped: Arc<AtomicBool>,

circuit_breaker_429_tripped_callback: CircuitBreakerTrippedCallback,

/// Backend request statistics.
#[cfg(debug_assertions)]
stats: RequestStats,
Expand Down Expand Up @@ -242,7 +245,10 @@ impl fmt::Display for ApiErrorInfo {
}

impl Api {
pub fn new(credentials: Credentials) -> Result<Self> {
pub fn new(
credentials: Credentials,
circuit_breaker_429_tripped_callback: CircuitBreakerTrippedCallback,
) -> Result<Self> {
let mut headers = HeaderMap::new();
let auth_header = {
let mut h = HeaderValue::from_str(&format!("Bearer {}", credentials.runtime_token))
Expand Down Expand Up @@ -273,6 +279,7 @@ impl Api {
client,
concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)),
circuit_breaker_429_tripped: Arc::new(AtomicBool::from(false)),
circuit_breaker_429_tripped_callback,
#[cfg(debug_assertions)]
stats: Default::default(),
})
Expand Down Expand Up @@ -366,6 +373,8 @@ impl Api {
let client = self.client.clone();
let concurrency_limit = self.concurrency_limit.clone();
let circuit_breaker_429_tripped = self.circuit_breaker_429_tripped.clone();
let circuit_breaker_429_tripped_callback =
self.circuit_breaker_429_tripped_callback.clone();
let url = self.construct_url(&format!("caches/{}", allocation.0 .0));

tokio::task::spawn(async move {
Expand Down Expand Up @@ -402,7 +411,8 @@ impl Api {

drop(permit);

circuit_breaker_429_tripped.check_result(&r);
circuit_breaker_429_tripped
.check_result(&r, &circuit_breaker_429_tripped_callback);

r
})
Expand Down Expand Up @@ -465,7 +475,8 @@ impl Api {
.check_json()
.await;

self.circuit_breaker_429_tripped.check_result(&res);
self.circuit_breaker_429_tripped
.check_result(&res, &self.circuit_breaker_429_tripped_callback);

match res {
Ok(entry) => Ok(Some(entry)),
Expand Down Expand Up @@ -508,7 +519,8 @@ impl Api {
.check_json()
.await;

self.circuit_breaker_429_tripped.check_result(&res);
self.circuit_breaker_429_tripped
.check_result(&res, &self.circuit_breaker_429_tripped_callback);

res
}
Expand All @@ -535,7 +547,8 @@ impl Api {
.check()
.await
{
self.circuit_breaker_429_tripped.check_err(&e);
self.circuit_breaker_429_tripped
.check_err(&e, &self.circuit_breaker_429_tripped_callback);
return Err(e);
}

Expand Down Expand Up @@ -610,18 +623,26 @@ async fn handle_error(res: reqwest::Response) -> Error {
}

trait AtomicCircuitBreaker {
fn check_err(&self, e: &Error);
fn check_result<T>(&self, r: &std::result::Result<T, Error>);
fn check_err(&self, e: &Error, callback: &CircuitBreakerTrippedCallback);
fn check_result<T>(
&self,
r: &std::result::Result<T, Error>,
callback: &CircuitBreakerTrippedCallback,
);
}

impl AtomicCircuitBreaker for AtomicBool {
fn check_result<T>(&self, r: &std::result::Result<T, Error>) {
fn check_result<T>(
&self,
r: &std::result::Result<T, Error>,
callback: &CircuitBreakerTrippedCallback,
) {
if let Err(ref e) = r {
self.check_err(e)
self.check_err(e, callback)
}
}

fn check_err(&self, e: &Error) {
fn check_err(&self, e: &Error, callback: &CircuitBreakerTrippedCallback) {
if let Error::ApiError {
status: reqwest::StatusCode::TOO_MANY_REQUESTS,
..
Expand All @@ -636,6 +657,7 @@ impl AtomicCircuitBreaker for AtomicBool {
";
println!("::notice title={title}::{msg}");
self.store(true, Ordering::Relaxed);
callback();
}
}
}
10 changes: 9 additions & 1 deletion magic-nix-cache/src/gha.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ impl GhaCache {
metrics: Arc<telemetry::TelemetryReport>,
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
) -> Result<GhaCache> {
let mut api = Api::new(credentials)?;
let cb_metrics = metrics.clone();
let mut api = Api::new(
credentials,
Arc::new(Box::new(move || {
cb_metrics
.tripped_429
.store(true, std::sync::atomic::Ordering::Relaxed);
})),
)?;

if let Some(cache_version) = &cache_version {
api.mutate_version(cache_version.as_bytes());
Expand Down
2 changes: 2 additions & 0 deletions magic-nix-cache/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct TelemetryReport {
pub num_original_paths: Metric,
pub num_final_paths: Metric,
pub num_new_paths: Metric,

pub tripped_429: std::sync::atomic::AtomicBool,
}

#[derive(Debug, Default, serde::Serialize)]
Expand Down

0 comments on commit 37d2f6a

Please sign in to comment.