From 4e2b37be36557b29dd437630f2879bf8af1ab354 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 16 Jan 2025 11:13:00 -0500 Subject: [PATCH] Set a metric field for when GHA 429's --- gha-cache/src/api.rs | 44 ++++++++++++++++++++++++-------- magic-nix-cache/src/gha.rs | 10 +++++++- magic-nix-cache/src/telemetry.rs | 2 ++ 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/gha-cache/src/api.rs b/gha-cache/src/api.rs index 7d56626..3b42d66 100644 --- a/gha-cache/src/api.rs +++ b/gha-cache/src/api.rs @@ -48,6 +48,8 @@ const MAX_CONCURRENCY: usize = 4; type Result = std::result::Result; +pub type CircuitBreakerTrippedCallback = Arc>; + /// An API error. #[derive(Error, Debug)] pub enum Error { @@ -82,7 +84,6 @@ pub enum Error { TooManyCollisions, } -#[derive(Debug)] pub struct Api { /// Credentials to access the cache. credentials: Credentials, @@ -104,6 +105,8 @@ pub struct Api { circuit_breaker_429_tripped: Arc, + circuit_breaker_429_tripped_callback: CircuitBreakerTrippedCallback, + /// Backend request statistics. #[cfg(debug_assertions)] stats: RequestStats, @@ -242,7 +245,10 @@ impl fmt::Display for ApiErrorInfo { } impl Api { - pub fn new(credentials: Credentials) -> Result { + pub fn new( + credentials: Credentials, + circuit_breaker_429_tripped_callback: CircuitBreakerTrippedCallback, + ) -> Result { let mut headers = HeaderMap::new(); let auth_header = { let mut h = HeaderValue::from_str(&format!("Bearer {}", credentials.runtime_token)) @@ -273,6 +279,7 @@ impl Api { client, concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)), circuit_breaker_429_tripped: Arc::new(AtomicBool::from(false)), + circuit_breaker_429_tripped_callback, #[cfg(debug_assertions)] stats: Default::default(), }) @@ -366,6 +373,8 @@ impl Api { let client = self.client.clone(); let concurrency_limit = self.concurrency_limit.clone(); let circuit_breaker_429_tripped = self.circuit_breaker_429_tripped.clone(); + let circuit_breaker_429_tripped_callback = + self.circuit_breaker_429_tripped_callback.clone(); let url = self.construct_url(&format!("caches/{}", allocation.0 .0)); tokio::task::spawn(async move { @@ -402,7 +411,8 @@ impl Api { drop(permit); - circuit_breaker_429_tripped.check_result(&r); + circuit_breaker_429_tripped + .check_result(&r, &circuit_breaker_429_tripped_callback); r }) @@ -465,7 +475,8 @@ impl Api { .check_json() .await; - self.circuit_breaker_429_tripped.check_result(&res); + self.circuit_breaker_429_tripped + .check_result(&res, &self.circuit_breaker_429_tripped_callback); match res { Ok(entry) => Ok(Some(entry)), @@ -508,7 +519,8 @@ impl Api { .check_json() .await; - self.circuit_breaker_429_tripped.check_result(&res); + self.circuit_breaker_429_tripped + .check_result(&res, &self.circuit_breaker_429_tripped_callback); res } @@ -535,7 +547,8 @@ impl Api { .check() .await { - self.circuit_breaker_429_tripped.check_err(&e); + self.circuit_breaker_429_tripped + .check_err(&e, &self.circuit_breaker_429_tripped_callback); return Err(e); } @@ -610,18 +623,26 @@ async fn handle_error(res: reqwest::Response) -> Error { } trait AtomicCircuitBreaker { - fn check_err(&self, e: &Error); - fn check_result(&self, r: &std::result::Result); + fn check_err(&self, e: &Error, callback: &CircuitBreakerTrippedCallback); + fn check_result( + &self, + r: &std::result::Result, + callback: &CircuitBreakerTrippedCallback, + ); } impl AtomicCircuitBreaker for AtomicBool { - fn check_result(&self, r: &std::result::Result) { + fn check_result( + &self, + r: &std::result::Result, + callback: &CircuitBreakerTrippedCallback, + ) { if let Err(ref e) = r { - self.check_err(e) + self.check_err(e, callback) } } - fn check_err(&self, e: &Error) { + fn check_err(&self, e: &Error, callback: &CircuitBreakerTrippedCallback) { if let Error::ApiError { status: reqwest::StatusCode::TOO_MANY_REQUESTS, .. @@ -636,6 +657,7 @@ impl AtomicCircuitBreaker for AtomicBool { "; println!("::notice title={title}::{msg}"); self.store(true, Ordering::Relaxed); + callback(); } } } diff --git a/magic-nix-cache/src/gha.rs b/magic-nix-cache/src/gha.rs index cd4df57..086c97d 100644 --- a/magic-nix-cache/src/gha.rs +++ b/magic-nix-cache/src/gha.rs @@ -37,7 +37,15 @@ impl GhaCache { metrics: Arc, narinfo_negative_cache: Arc>>, ) -> Result { - let mut api = Api::new(credentials)?; + let cb_metrics = metrics.clone(); + let mut api = Api::new( + credentials, + Arc::new(Box::new(move || { + cb_metrics + .tripped_429 + .store(true, std::sync::atomic::Ordering::Relaxed); + })), + )?; if let Some(cache_version) = &cache_version { api.mutate_version(cache_version.as_bytes()); diff --git a/magic-nix-cache/src/telemetry.rs b/magic-nix-cache/src/telemetry.rs index c9dfc3e..59de261 100644 --- a/magic-nix-cache/src/telemetry.rs +++ b/magic-nix-cache/src/telemetry.rs @@ -28,6 +28,8 @@ pub struct TelemetryReport { pub num_original_paths: Metric, pub num_final_paths: Metric, pub num_new_paths: Metric, + + pub tripped_429: std::sync::atomic::AtomicBool, } #[derive(Debug, Default, serde::Serialize)]