Trip a circuit breaker when we get a 429 so we don't keep doing useless work

This commit is contained in:
Graham Christensen 2024-06-12 15:44:26 -04:00
parent 6742c6a85e
commit 9c7b8e3fc9
4 changed files with 52 additions and 5 deletions

View file

@ -56,6 +56,7 @@
cargo-bloat cargo-bloat
cargo-edit cargo-edit
cargo-udeps cargo-udeps
cargo-watch
bacon bacon
age age

View file

@ -4,7 +4,7 @@
use std::fmt; use std::fmt;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@ -53,6 +53,9 @@ pub enum Error {
#[error("Failed to initialize the client: {0}")] #[error("Failed to initialize the client: {0}")]
InitError(Box<dyn std::error::Error + Send + Sync>), InitError(Box<dyn std::error::Error + Send + Sync>),
#[error("Circuit breaker tripped.")]
CircuitBreakerTripped,
#[error("Request error: {0}")] #[error("Request error: {0}")]
RequestError(#[from] reqwest::Error), // TODO: Better errors RequestError(#[from] reqwest::Error), // TODO: Better errors
@ -96,6 +99,8 @@ pub struct Api {
/// The concurrent upload limit. /// The concurrent upload limit.
concurrency_limit: Arc<Semaphore>, concurrency_limit: Arc<Semaphore>,
circuit_breaker_429_tripped: Arc<AtomicBool>,
/// Backend request statistics. /// Backend request statistics.
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
stats: RequestStats, stats: RequestStats,
@ -264,11 +269,16 @@ impl Api {
version_hasher, version_hasher,
client, client,
concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)), concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)),
circuit_breaker_429_tripped: Arc::new(AtomicBool::from(false)),
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
stats: Default::default(), stats: Default::default(),
}) })
} }
pub fn circuit_breaker_tripped(&self) -> bool {
self.circuit_breaker_429_tripped.load(Ordering::Relaxed)
}
/// Mutates the cache version/namespace. /// Mutates the cache version/namespace.
pub fn mutate_version(&mut self, data: &[u8]) { pub fn mutate_version(&mut self, data: &[u8]) {
self.version_hasher.update(data); self.version_hasher.update(data);
@ -324,6 +334,10 @@ impl Api {
where where
S: AsyncRead + Unpin + Send, S: AsyncRead + Unpin + Send,
{ {
if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) {
return Err(Error::CircuitBreakerTripped);
}
let mut offset = 0; let mut offset = 0;
let mut futures = Vec::new(); let mut futures = Vec::new();
loop { loop {
@ -347,6 +361,7 @@ impl Api {
futures.push({ futures.push({
let client = self.client.clone(); let client = self.client.clone();
let concurrency_limit = self.concurrency_limit.clone(); let concurrency_limit = self.concurrency_limit.clone();
let circuit_breaker_429_tripped = self.circuit_breaker_429_tripped.clone();
let url = self.construct_url(&format!("caches/{}", allocation.0 .0)); let url = self.construct_url(&format!("caches/{}", allocation.0 .0));
tokio::task::spawn(async move { tokio::task::spawn(async move {
@ -380,6 +395,11 @@ impl Api {
drop(permit); drop(permit);
if let Err(Error::ApiError{ status: reqwest::StatusCode::TOO_MANY_REQUESTS, info: ref _info }) = r {
circuit_breaker_429_tripped.store(true, Ordering::Relaxed);
}
r r
}) })
}); });
@ -401,6 +421,11 @@ impl Api {
/// Downloads a file based on a list of key prefixes. /// Downloads a file based on a list of key prefixes.
pub async fn get_file_url(&self, keys: &[&str]) -> Result<Option<String>> { pub async fn get_file_url(&self, keys: &[&str]) -> Result<Option<String>> {
if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) {
return Err(Error::CircuitBreakerTripped);
}
Ok(self Ok(self
.get_cache_entry(keys) .get_cache_entry(keys)
.await? .await?
@ -419,6 +444,10 @@ impl Api {
/// Retrieves a cache based on a list of key prefixes. /// Retrieves a cache based on a list of key prefixes.
async fn get_cache_entry(&self, keys: &[&str]) -> Result<Option<ArtifactCacheEntry>> { async fn get_cache_entry(&self, keys: &[&str]) -> Result<Option<ArtifactCacheEntry>> {
if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) {
return Err(Error::CircuitBreakerTripped);
}
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
self.stats.get.fetch_add(1, Ordering::SeqCst); self.stats.get.fetch_add(1, Ordering::SeqCst);
@ -448,6 +477,11 @@ impl Api {
key: &str, key: &str,
cache_size: Option<usize>, cache_size: Option<usize>,
) -> Result<ReserveCacheResponse> { ) -> Result<ReserveCacheResponse> {
if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) {
return Err(Error::CircuitBreakerTripped);
}
tracing::debug!("Reserving cache for {}", key); tracing::debug!("Reserving cache for {}", key);
let req = ReserveCacheRequest { let req = ReserveCacheRequest {
@ -473,6 +507,11 @@ impl Api {
/// Finalizes uploading to a cache. /// Finalizes uploading to a cache.
async fn commit_cache(&self, cache_id: CacheId, size: usize) -> Result<()> { async fn commit_cache(&self, cache_id: CacheId, size: usize) -> Result<()> {
if self.circuit_breaker_429_tripped.load(Ordering::Relaxed) {
return Err(Error::CircuitBreakerTripped);
}
tracing::debug!("Commiting cache {:?}", cache_id); tracing::debug!("Commiting cache {:?}", cache_id);
let req = CommitCacheRequest { size }; let req = CommitCacheRequest { size };

View file

@ -10,8 +10,8 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum Error { pub enum Error {
#[error("GitHub API error: {0}")] #[error("GitHub API error: {0}")]
Api(#[from] gha_cache::api::Error), Api(#[from] gha_cache::api::Error),
#[error("Not Found")] #[error("Not Found")]
NotFound, NotFound,

View file

@ -119,6 +119,13 @@ async fn worker(
break; break;
} }
Request::Upload(path) => { Request::Upload(path) => {
if (api.circuit_breaker_tripped()) {
tracing::trace!(
"GitHub Actions gave us a 429, so we're done.",
);
continue;
}
if !done.insert(path.clone()) { if !done.insert(path.clone()) {
continue; continue;
} }
@ -188,8 +195,8 @@ async fn upload_path(
tracing::debug!("Uploading '{}'", narinfo_path); tracing::debug!("Uploading '{}'", narinfo_path);
api.upload_file(narinfo_allocation, narinfo.as_bytes()) api.upload_file(narinfo_allocation, narinfo.as_bytes()).await?;
.await?;
metrics.narinfos_uploaded.incr(); metrics.narinfos_uploaded.incr();
narinfo_negative_cache narinfo_negative_cache