diff --git a/.github/workflows/check-and-test.yaml b/.github/workflows/check-and-test.yaml index b517f9d..5f6c221 100644 --- a/.github/workflows/check-and-test.yaml +++ b/.github/workflows/check-and-test.yaml @@ -78,3 +78,9 @@ jobs: - name: Run nix to test magic-nix-cache-action run: | nix develop --command echo "just testing" + - name: Exhaust our GitHub Actions Cache tokens + # Generally skip this step since it is so intensive + if: ${{ false }} + run: | + date >> README.md + nix build .#veryLongChain -v diff --git a/flake.nix b/flake.nix index 560d87c..0eb1d20 100644 --- a/flake.nix +++ b/flake.nix @@ -43,6 +43,36 @@ magic-nix-cache = pkgs.callPackage ./package.nix { }; #inherit (cranePkgs) magic-nix-cache; default = magic-nix-cache; + + veryLongChain = + let + ctx = ./README.md; + + # Function to write the current date to a file + startFile = + pkgs.stdenv.mkDerivation { + name = "start-file"; + buildCommand = '' + cat ${ctx} > $out + ''; + }; + + # Recursive function to create a chain of derivations + createChain = n: startFile: + pkgs.stdenv.mkDerivation { + name = "chain-${toString n}"; + src = + if n == 0 then + startFile + else createChain (n - 1) startFile; + buildCommand = '' + echo $src > $out + ''; + }; + + in + # Starting point of the chain + createChain 200 startFile; }); devShells = forEachSupportedSystem ({ pkgs, cranePkgs, lib }: { @@ -56,10 +86,15 @@ cargo-bloat cargo-edit cargo-udeps + cargo-watch bacon age - ]; + ] ++ lib.optionals pkgs.stdenv.isDarwin (with pkgs.darwin.apple_sdk.frameworks; [ + SystemConfiguration + ]); + + NIX_CFLAGS_LINK = lib.optionalString pkgs.stdenv.isDarwin "-lc++abi"; }; /* diff --git a/gha-cache/src/api.rs b/gha-cache/src/api.rs index a04a10b..c1852df 100644 --- a/gha-cache/src/api.rs +++ b/gha-cache/src/api.rs @@ -4,7 +4,8 @@ use std::fmt; #[cfg(debug_assertions)] -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_trait::async_trait; @@ -53,6 +54,11 @@ pub enum Error { #[error("Failed to initialize the client: {0}")] InitError(Box), + #[error( + "GitHub Actions Cache throttled Magic Nix Cache. Not trying to use it again on this run." + )] + CircuitBreakerTripped, + #[error("Request error: {0}")] RequestError(#[from] reqwest::Error), // TODO: Better errors @@ -96,6 +102,8 @@ pub struct Api { /// The concurrent upload limit. concurrency_limit: Arc, + circuit_breaker_429_tripped: Arc, + /// Backend request statistics. #[cfg(debug_assertions)] stats: RequestStats, @@ -264,11 +272,16 @@ impl Api { version_hasher, client, concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)), + circuit_breaker_429_tripped: Arc::new(AtomicBool::from(false)), #[cfg(debug_assertions)] stats: Default::default(), }) } + pub fn circuit_breaker_tripped(&self) -> bool { + self.circuit_breaker_429_tripped.load(Ordering::Relaxed) + } + /// Mutates the cache version/namespace. pub fn mutate_version(&mut self, data: &[u8]) { self.version_hasher.update(data); @@ -324,6 +337,10 @@ impl Api { where S: AsyncRead + Unpin + Send, { + if self.circuit_breaker_tripped() { + return Err(Error::CircuitBreakerTripped); + } + let mut offset = 0; let mut futures = Vec::new(); loop { @@ -347,6 +364,7 @@ impl Api { futures.push({ let client = self.client.clone(); let concurrency_limit = self.concurrency_limit.clone(); + let circuit_breaker_429_tripped = self.circuit_breaker_429_tripped.clone(); let url = self.construct_url(&format!("caches/{}", allocation.0 .0)); tokio::task::spawn(async move { @@ -380,6 +398,8 @@ impl Api { drop(permit); + circuit_breaker_429_tripped.check_result(&r); + r }) }); @@ -401,6 +421,10 @@ impl Api { /// Downloads a file based on a list of key prefixes. pub async fn get_file_url(&self, keys: &[&str]) -> Result> { + if self.circuit_breaker_tripped() { + return Err(Error::CircuitBreakerTripped); + } + Ok(self .get_cache_entry(keys) .await? @@ -419,6 +443,10 @@ impl Api { /// Retrieves a cache based on a list of key prefixes. async fn get_cache_entry(&self, keys: &[&str]) -> Result> { + if self.circuit_breaker_tripped() { + return Err(Error::CircuitBreakerTripped); + } + #[cfg(debug_assertions)] self.stats.get.fetch_add(1, Ordering::SeqCst); @@ -431,6 +459,8 @@ impl Api { .check_json() .await; + self.circuit_breaker_429_tripped.check_result(&res); + match res { Ok(entry) => Ok(Some(entry)), Err(Error::DecodeError { status, .. }) if status == StatusCode::NO_CONTENT => Ok(None), @@ -448,6 +478,10 @@ impl Api { key: &str, cache_size: Option, ) -> Result { + if self.circuit_breaker_tripped() { + return Err(Error::CircuitBreakerTripped); + } + tracing::debug!("Reserving cache for {}", key); let req = ReserveCacheRequest { @@ -466,13 +500,19 @@ impl Api { .send() .await? .check_json() - .await?; + .await; - Ok(res) + self.circuit_breaker_429_tripped.check_result(&res); + + res } /// Finalizes uploading to a cache. async fn commit_cache(&self, cache_id: CacheId, size: usize) -> Result<()> { + if self.circuit_breaker_tripped() { + return Err(Error::CircuitBreakerTripped); + } + tracing::debug!("Commiting cache {:?}", cache_id); let req = CommitCacheRequest { size }; @@ -480,13 +520,18 @@ impl Api { #[cfg(debug_assertions)] self.stats.post.fetch_add(1, Ordering::SeqCst); - self.client + if let Err(e) = self + .client .post(self.construct_url(&format!("caches/{}", cache_id.0))) .json(&req) .send() .await? .check() - .await?; + .await + { + self.circuit_breaker_429_tripped.check_err(&e); + return Err(e); + } Ok(()) } @@ -554,3 +599,27 @@ async fn handle_error(res: reqwest::Response) -> Error { Error::ApiError { status, info } } + +trait AtomicCircuitBreaker { + fn check_err(&self, e: &Error); + fn check_result(&self, r: &std::result::Result); +} + +impl AtomicCircuitBreaker for AtomicBool { + fn check_result(&self, r: &std::result::Result) { + if let Err(ref e) = r { + self.check_err(e) + } + } + + fn check_err(&self, e: &Error) { + if let Error::ApiError { + status: reqwest::StatusCode::TOO_MANY_REQUESTS, + info: ref _info, + } = e + { + tracing::info!("Disabling GitHub Actions Cache due to 429: Too Many Requests"); + self.store(true, Ordering::Relaxed); + } + } +} diff --git a/magic-nix-cache/src/flakehub.rs b/magic-nix-cache/src/flakehub.rs index 00e0d18..1336d51 100644 --- a/magic-nix-cache/src/flakehub.rs +++ b/magic-nix-cache/src/flakehub.rs @@ -38,9 +38,24 @@ pub async fn init_cache( ) -> Result { // Parse netrc to get the credentials for api.flakehub.com. let netrc = { - let mut netrc_file = File::open(flakehub_api_server_netrc).await?; + let mut netrc_file = File::open(flakehub_api_server_netrc).await.map_err(|e| { + Error::Internal(format!( + "Failed to open {}: {}", + flakehub_api_server_netrc.display(), + e + )) + })?; let mut netrc_contents = String::new(); - netrc_file.read_to_string(&mut netrc_contents).await?; + netrc_file + .read_to_string(&mut netrc_contents) + .await + .map_err(|e| { + Error::Internal(format!( + "Failed to read {} contents: {}", + flakehub_api_server_netrc.display(), + e + )) + })?; netrc_rs::Netrc::parse(netrc_contents, false).map_err(Error::Netrc)? }; @@ -84,7 +99,15 @@ pub async fn init_cache( .create(false) .append(true) .open(flakehub_api_server_netrc) - .await?; + .await + .map_err(|e| { + Error::Internal(format!( + "Failed to open {} for appending: {}", + flakehub_api_server_netrc.display(), + e + )) + })?; + netrc_file .write_all( format!( @@ -93,7 +116,14 @@ pub async fn init_cache( ) .as_bytes(), ) - .await?; + .await + .map_err(|e| { + Error::Internal(format!( + "Failed to write credentials to {}: {}", + flakehub_api_server_netrc.display(), + e + )) + })?; } let server_config = ServerConfig { @@ -315,7 +345,7 @@ async fn rewrite_github_actions_token( let token_response: TokenResponse = token_response .json() .await - .with_context(|| String::from("converting response into json"))?; + .with_context(|| "converting response into json")?; let new_github_jwt_string = token_response.value; let netrc_contents = tokio::fs::read_to_string(netrc_path) diff --git a/magic-nix-cache/src/gha.rs b/magic-nix-cache/src/gha.rs index 756b96b..00e1ba2 100644 --- a/magic-nix-cache/src/gha.rs +++ b/magic-nix-cache/src/gha.rs @@ -119,6 +119,11 @@ async fn worker( break; } Request::Upload(path) => { + if api.circuit_breaker_tripped() { + tracing::trace!("GitHub Actions gave us a 429, so we're done.",); + continue; + } + if !done.insert(path.clone()) { continue; } @@ -190,6 +195,7 @@ async fn upload_path( api.upload_file(narinfo_allocation, narinfo.as_bytes()) .await?; + metrics.narinfos_uploaded.incr(); narinfo_negative_cache