From 9c7b8e3fc99011bf191be3fe71daab1f3c969d8f Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Wed, 12 Jun 2024 15:44:26 -0400 Subject: [PATCH] Trip a circuit breaker when we get a 429 so we don't keep doing useless work --- flake.nix | 1 + gha-cache/src/api.rs | 41 +++++++++++++++++++++++++++++++++++- magic-nix-cache/src/error.rs | 4 ++-- magic-nix-cache/src/gha.rs | 11 ++++++++-- 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/flake.nix b/flake.nix index 0638acb..704d321 100644 --- a/flake.nix +++ b/flake.nix @@ -56,6 +56,7 @@ cargo-bloat cargo-edit cargo-udeps + cargo-watch bacon age diff --git a/gha-cache/src/api.rs b/gha-cache/src/api.rs index a04a10b..31e1508 100644 --- a/gha-cache/src/api.rs +++ b/gha-cache/src/api.rs @@ -4,7 +4,7 @@ use std::fmt; #[cfg(debug_assertions)] -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use async_trait::async_trait; @@ -53,6 +53,9 @@ pub enum Error { #[error("Failed to initialize the client: {0}")] InitError(Box), + #[error("Circuit breaker tripped.")] + CircuitBreakerTripped, + #[error("Request error: {0}")] RequestError(#[from] reqwest::Error), // TODO: Better errors @@ -96,6 +99,8 @@ pub struct Api { /// The concurrent upload limit. concurrency_limit: Arc, + circuit_breaker_429_tripped: Arc, + /// Backend request statistics. #[cfg(debug_assertions)] stats: RequestStats, @@ -264,11 +269,16 @@ impl Api { version_hasher, client, concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)), + circuit_breaker_429_tripped: Arc::new(AtomicBool::from(false)), #[cfg(debug_assertions)] stats: Default::default(), }) } + pub fn circuit_breaker_tripped(&self) -> bool { + self.circuit_breaker_429_tripped.load(Ordering::Relaxed) + } + /// Mutates the cache version/namespace. pub fn mutate_version(&mut self, data: &[u8]) { self.version_hasher.update(data); @@ -324,6 +334,10 @@ impl Api { where S: AsyncRead + Unpin + Send, { + if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) { + return Err(Error::CircuitBreakerTripped); + } + let mut offset = 0; let mut futures = Vec::new(); loop { @@ -347,6 +361,7 @@ impl Api { futures.push({ let client = self.client.clone(); let concurrency_limit = self.concurrency_limit.clone(); + let circuit_breaker_429_tripped = self.circuit_breaker_429_tripped.clone(); let url = self.construct_url(&format!("caches/{}", allocation.0 .0)); tokio::task::spawn(async move { @@ -380,6 +395,11 @@ impl Api { drop(permit); + + if let Err(Error::ApiError{ status: reqwest::StatusCode::TOO_MANY_REQUESTS, info: ref _info }) = r { + circuit_breaker_429_tripped.store(true, Ordering::Relaxed); + } + r }) }); @@ -401,6 +421,11 @@ impl Api { /// Downloads a file based on a list of key prefixes. pub async fn get_file_url(&self, keys: &[&str]) -> Result> { + + if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) { + return Err(Error::CircuitBreakerTripped); + } + Ok(self .get_cache_entry(keys) .await? @@ -419,6 +444,10 @@ impl Api { /// Retrieves a cache based on a list of key prefixes. async fn get_cache_entry(&self, keys: &[&str]) -> Result> { + if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) { + return Err(Error::CircuitBreakerTripped); + } + #[cfg(debug_assertions)] self.stats.get.fetch_add(1, Ordering::SeqCst); @@ -448,6 +477,11 @@ impl Api { key: &str, cache_size: Option, ) -> Result { + + if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) { + return Err(Error::CircuitBreakerTripped); + } + tracing::debug!("Reserving cache for {}", key); let req = ReserveCacheRequest { @@ -473,6 +507,11 @@ impl Api { /// Finalizes uploading to a cache. async fn commit_cache(&self, cache_id: CacheId, size: usize) -> Result<()> { + + if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) { + return Err(Error::CircuitBreakerTripped); + } + tracing::debug!("Commiting cache {:?}", cache_id); let req = CommitCacheRequest { size }; diff --git a/magic-nix-cache/src/error.rs b/magic-nix-cache/src/error.rs index ec1b8d3..e80ea70 100644 --- a/magic-nix-cache/src/error.rs +++ b/magic-nix-cache/src/error.rs @@ -10,8 +10,8 @@ pub type Result = std::result::Result; #[derive(Error, Debug)] pub enum Error { - #[error("GitHub API error: {0}")] - Api(#[from] gha_cache::api::Error), + #[error("GitHub API error: {0}")] + Api(#[from] gha_cache::api::Error), #[error("Not Found")] NotFound, diff --git a/magic-nix-cache/src/gha.rs b/magic-nix-cache/src/gha.rs index 756b96b..e1f0aa6 100644 --- a/magic-nix-cache/src/gha.rs +++ b/magic-nix-cache/src/gha.rs @@ -119,6 +119,13 @@ async fn worker( break; } Request::Upload(path) => { + if (api.circuit_breaker_tripped()) { + tracing::trace!( + "GitHub Actions gave us a 429, so we're done.", + ); + continue; + } + if !done.insert(path.clone()) { continue; } @@ -188,8 +195,8 @@ async fn upload_path( tracing::debug!("Uploading '{}'", narinfo_path); - api.upload_file(narinfo_allocation, narinfo.as_bytes()) - .await?; + api.upload_file(narinfo_allocation, narinfo.as_bytes()).await?; + metrics.narinfos_uploaded.incr(); narinfo_negative_cache