diff --git a/magic-nix-cache/src/gha.rs b/magic-nix-cache/src/gha.rs index c6de710..f32d970 100644 --- a/magic-nix-cache/src/gha.rs +++ b/magic-nix-cache/src/gha.rs @@ -35,6 +35,7 @@ impl GhaCache { cache_version: Option, store: Arc, metrics: Arc, + narinfo_negative_cache: Arc>>, ) -> Result { let mut api = Api::new(credentials)?; @@ -48,8 +49,16 @@ impl GhaCache { let api2 = api.clone(); - let worker_result = - tokio::task::spawn(async move { worker(&api2, store, channel_rx, metrics).await }); + let worker_result = tokio::task::spawn(async move { + worker( + &api2, + store, + channel_rx, + metrics, + narinfo_negative_cache.clone(), + ) + .await + }); Ok(GhaCache { api, @@ -100,6 +109,7 @@ async fn worker( store: Arc, mut channel_rx: UnboundedReceiver, metrics: Arc, + narinfo_negative_cache: Arc>>, ) -> Result<()> { let mut done = HashSet::new(); @@ -113,7 +123,15 @@ async fn worker( continue; } - if let Err(err) = upload_path(api, store.clone(), &path, metrics.clone()).await { + if let Err(err) = upload_path( + api, + store.clone(), + &path, + metrics.clone(), + narinfo_negative_cache.clone(), + ) + .await + { tracing::error!( "Upload of path '{}' failed: {}", store.get_full_path(&path).display(), @@ -132,6 +150,7 @@ async fn upload_path( store: Arc, path: &StorePath, metrics: Arc, + narinfo_negative_cache: Arc>>, ) -> Result<()> { let path_info = store.query_path_info(path.clone()).await?; @@ -173,9 +192,14 @@ async fn upload_path( .await?; metrics.narinfos_uploaded.incr(); + narinfo_negative_cache + .write() + .await + .remove(&path.to_hash().to_string()); + tracing::info!( "Uploaded '{}' to the GitHub Action Cache", - store.get_full_path(&path).display() + store.get_full_path(path).display() ); Ok(()) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 3b8ea68..44158f6 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -123,7 +123,7 @@ struct StateInner { shutdown_sender: Mutex>>, /// Set of store path hashes that are not present in GHAC. - narinfo_negative_cache: RwLock>, + narinfo_negative_cache: Arc>>, /// Metrics for sending to perf at shutdown metrics: Arc, @@ -154,6 +154,8 @@ async fn main_cli() -> Result<()> { let store = Arc::new(NixStore::connect()?); + let narinfo_negative_cache = Arc::new(RwLock::new(HashSet::new())); + let flakehub_state = if args.use_flakehub { let flakehub_cache_server = args .flakehub_cache_server @@ -234,6 +236,7 @@ async fn main_cli() -> Result<()> { args.cache_version, store.clone(), metrics.clone(), + narinfo_negative_cache.clone(), ) .with_context(|| "Failed to initialize GitHub Actions Cache API")?; @@ -301,7 +304,7 @@ async fn main_cli() -> Result<()> { gha_cache, upstream: args.upstream.clone(), shutdown_sender: Mutex::new(Some(shutdown_sender)), - narinfo_negative_cache: RwLock::new(HashSet::new()), + narinfo_negative_cache, metrics, store, flakehub_state: RwLock::new(flakehub_state),