From 625e95f4849ee57dc760e087d8b95953d39a4df3 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Thu, 29 Feb 2024 16:31:29 +0100 Subject: [PATCH] Trigger GHA cache uploads from the post-build hook Also, a worker task now does the uploads directly rather than having magic-nix-cache invoke "nix copy" via HTTP to itself. --- magic-nix-cache/src/api.rs | 67 +++------- magic-nix-cache/src/binary_cache.rs | 24 ++-- magic-nix-cache/src/error.rs | 3 - magic-nix-cache/src/gha.rs | 193 ++++++++++++++++++++++++++++ magic-nix-cache/src/main.rs | 47 +++---- magic-nix-cache/src/util.rs | 91 ------------- 6 files changed, 247 insertions(+), 178 deletions(-) create mode 100644 magic-nix-cache/src/gha.rs delete mode 100644 magic-nix-cache/src/util.rs diff --git a/magic-nix-cache/src/api.rs b/magic-nix-cache/src/api.rs index db0435f..ae643bd 100644 --- a/magic-nix-cache/src/api.rs +++ b/magic-nix-cache/src/api.rs @@ -2,26 +2,19 @@ //! //! This API is intended to be used by nix-installer-action. -use std::net::SocketAddr; - -use axum::{extract::Extension, http::uri::Uri, routing::post, Json, Router}; +use axum::{extract::Extension, routing::post, Json, Router}; use axum_macros::debug_handler; use serde::{Deserialize, Serialize}; use super::State; use crate::error::{Error, Result}; -use crate::util::{get_store_paths, upload_paths}; #[derive(Debug, Clone, Serialize)] -struct WorkflowStartResponse { - num_original_paths: usize, -} +struct WorkflowStartResponse {} #[derive(Debug, Clone, Serialize)] struct WorkflowFinishResponse { - num_original_paths: usize, - num_final_paths: usize, - num_new_paths: usize, + //num_new_paths: usize, } pub fn get_router() -> Router { @@ -33,15 +26,12 @@ pub fn get_router() -> Router { /// Record existing paths. #[debug_handler] -async fn workflow_start(Extension(state): Extension) -> Result> { +async fn workflow_start( + Extension(_state): Extension, +) -> Result> { tracing::info!("Workflow started"); - let mut original_paths = state.original_paths.lock().await; - *original_paths = get_store_paths(&state.store).await?; - - Ok(Json(WorkflowStartResponse { - num_original_paths: original_paths.len(), - })) + Ok(Json(WorkflowStartResponse {})) } /// Push new paths and shut down. @@ -49,17 +39,10 @@ async fn workflow_finish( Extension(state): Extension, ) -> Result> { tracing::info!("Workflow finished"); - let original_paths = state.original_paths.lock().await; - let final_paths = get_store_paths(&state.store).await?; - let new_paths = final_paths - .difference(&original_paths) - .cloned() - .collect::>(); - if state.api.is_some() { - tracing::info!("Pushing {} new paths to GHA cache", new_paths.len()); - let store_uri = make_store_uri(&state.self_endpoint); - upload_paths(new_paths.clone(), &store_uri).await?; + if let Some(gha_cache) = &state.gha_cache { + tracing::info!("Waiting for GitHub action cache uploads to finish"); + gha_cache.shutdown().await?; } if let Some(sender) = state.shutdown_sender.lock().await.take() { @@ -69,36 +52,18 @@ async fn workflow_finish( // Wait for the Attic push workers to finish. if let Some(attic_state) = state.flakehub_state.write().await.take() { + tracing::info!("Waiting for FlakeHub cache uploads to finish"); attic_state.push_session.wait().await?; } } - let reply = WorkflowFinishResponse { - num_original_paths: original_paths.len(), - num_final_paths: final_paths.len(), - num_new_paths: new_paths.len(), - }; + let reply = WorkflowFinishResponse {}; - state - .metrics - .num_original_paths - .set(reply.num_original_paths); - state.metrics.num_final_paths.set(reply.num_final_paths); - state.metrics.num_new_paths.set(reply.num_new_paths); + //state.metrics.num_new_paths.set(num_new_paths); Ok(Json(reply)) } -fn make_store_uri(self_endpoint: &SocketAddr) -> String { - Uri::builder() - .scheme("http") - .authority(self_endpoint.to_string()) - .path_and_query("/?compression=zstd¶llel-compression=true") - .build() - .expect("Cannot construct URL to self") - .to_string() -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EnqueuePathsRequest { pub store_paths: Vec, @@ -120,6 +85,12 @@ async fn enqueue_paths( .map(|path| state.store.follow_store_path(path).map_err(Error::Attic)) .collect::>>()?; + if let Some(gha_cache) = &state.gha_cache { + gha_cache + .enqueue_paths(state.store.clone(), store_paths.clone()) + .await?; + } + if let Some(flakehub_state) = &*state.flakehub_state.read().await { crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?; } diff --git a/magic-nix-cache/src/binary_cache.rs b/magic-nix-cache/src/binary_cache.rs index 316548e..f8ea5c4 100644 --- a/magic-nix-cache/src/binary_cache.rs +++ b/magic-nix-cache/src/binary_cache.rs @@ -61,8 +61,8 @@ async fn get_narinfo( return pull_through(&state, &path); } - if let Some(api) = &state.api { - if let Some(url) = api.get_file_url(&[&key]).await? { + if let Some(gha_cache) = &state.gha_cache { + if let Some(url) = gha_cache.api.get_file_url(&[&key]).await? { state.metrics.narinfos_served.incr(); return Ok(Redirect::temporary(&url)); } @@ -75,6 +75,7 @@ async fn get_narinfo( state.metrics.narinfos_negative_cache_misses.incr(); pull_through(&state, &path) } + async fn put_narinfo( Extension(state): Extension, Path(path): Path, @@ -90,15 +91,15 @@ async fn put_narinfo( return Err(Error::BadRequest); } - let api = state.api.as_ref().ok_or(Error::GHADisabled)?; + let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?; let store_path_hash = components[0].to_string(); let key = format!("{}.narinfo", store_path_hash); - let allocation = api.allocate_file_with_random_suffix(&key).await?; + let allocation = gha_cache.api.allocate_file_with_random_suffix(&key).await?; let stream = StreamReader::new( body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))), ); - api.upload_file(allocation, stream).await?; + gha_cache.api.upload_file(allocation, stream).await?; state.metrics.narinfos_uploaded.incr(); state @@ -112,9 +113,10 @@ async fn put_narinfo( async fn get_nar(Extension(state): Extension, Path(path): Path) -> Result { if let Some(url) = state - .api + .gha_cache .as_ref() .ok_or(Error::GHADisabled)? + .api .get_file_url(&[&path]) .await? { @@ -129,18 +131,22 @@ async fn get_nar(Extension(state): Extension, Path(path): Path) - Err(Error::NotFound) } } + async fn put_nar( Extension(state): Extension, Path(path): Path, body: BodyStream, ) -> Result<()> { - let api = state.api.as_ref().ok_or(Error::GHADisabled)?; + let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?; - let allocation = api.allocate_file_with_random_suffix(&path).await?; + let allocation = gha_cache + .api + .allocate_file_with_random_suffix(&path) + .await?; let stream = StreamReader::new( body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))), ); - api.upload_file(allocation, stream).await?; + gha_cache.api.upload_file(allocation, stream).await?; state.metrics.nars_uploaded.incr(); Ok(()) diff --git a/magic-nix-cache/src/error.rs b/magic-nix-cache/src/error.rs index 314f428..7acaec6 100644 --- a/magic-nix-cache/src/error.rs +++ b/magic-nix-cache/src/error.rs @@ -22,9 +22,6 @@ pub enum Error { #[error("I/O error: {0}")] Io(#[from] std::io::Error), - #[error("Failed to upload paths")] - FailedToUpload, - #[error("GHA cache is disabled")] GHADisabled, diff --git a/magic-nix-cache/src/gha.rs b/magic-nix-cache/src/gha.rs new file mode 100644 index 0000000..6ede0f0 --- /dev/null +++ b/magic-nix-cache/src/gha.rs @@ -0,0 +1,193 @@ +use std::{collections::HashSet, sync::Arc}; + +use crate::error::{Error, Result}; +use crate::telemetry; +use attic::nix_store::{NixStore, StorePath, ValidPathInfo}; +use attic_server::narinfo::{Compression, NarInfo}; +use futures::stream::StreamExt; +use gha_cache::{Api, Credentials}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + RwLock, +}; + +pub struct GhaCache { + /// The GitHub Actions Cache API. + pub api: Arc, + + /// The future from the completion of the worker. + worker_result: RwLock>>>, + + channel_tx: UnboundedSender, +} + +#[derive(Debug)] +enum Request { + Shutdown, + Upload(StorePath), +} + +impl GhaCache { + pub fn new( + credentials: Credentials, + cache_version: Option, + store: Arc, + metrics: Arc, + ) -> Result { + let mut api = Api::new(credentials)?; + + if let Some(cache_version) = &cache_version { + api.mutate_version(cache_version.as_bytes()); + } + + let (channel_tx, channel_rx) = unbounded_channel(); + + let api = Arc::new(api); + + let api2 = api.clone(); + + let worker_result = + tokio::task::spawn(async move { worker(&api2, store, channel_rx, metrics).await }); + + Ok(GhaCache { + api, + worker_result: RwLock::new(Some(worker_result)), + channel_tx, + }) + } + + pub async fn shutdown(&self) -> Result<()> { + if let Some(worker_result) = self.worker_result.write().await.take() { + self.channel_tx + .send(Request::Shutdown) + .expect("Cannot send shutdown message"); + worker_result.await.unwrap() + } else { + Ok(()) + } + } + + pub async fn enqueue_paths( + &self, + store: Arc, + store_paths: Vec, + ) -> Result<()> { + // FIXME: make sending the closure optional. We might want to + // only send the paths that have been built by the user, under + // the assumption that everything else is already in a binary + // cache. + // FIXME: compute_fs_closure_multi doesn't return a + // toposort, though it doesn't really matter for the GHA + // cache. + let closure = store + .compute_fs_closure_multi(store_paths, false, false, false) + .await?; + + for p in closure { + self.channel_tx + .send(Request::Upload(p)) + .map_err(|_| Error::Internal("Cannot send upload message".to_owned()))?; + } + + Ok(()) + } +} + +async fn worker( + api: &Api, + store: Arc, + mut channel_rx: UnboundedReceiver, + metrics: Arc, +) -> Result<()> { + let mut done = HashSet::new(); + + while let Some(req) = channel_rx.recv().await { + match req { + Request::Shutdown => { + break; + } + Request::Upload(path) => { + if !done.insert(path.clone()) { + continue; + } + + if let Err(err) = upload_path(api, store.clone(), &path, metrics.clone()).await { + tracing::error!( + "Upload of path '{}' failed: {}", + store.get_full_path(&path).display(), + err + ); + } + } + } + } + + Ok(()) +} + +async fn upload_path( + api: &Api, + store: Arc, + path: &StorePath, + metrics: Arc, +) -> Result<()> { + let path_info = store.query_path_info(path.clone()).await?; + + // Upload the NAR. + let nar_path = format!("nar/{}.nar", path_info.nar_hash.to_base32()); + + let nar_allocation = api.allocate_file_with_random_suffix(&nar_path).await?; + + let mut nar_stream = store.nar_from_path(path.clone()); + + let mut nar: Vec = vec![]; + + // FIXME: make this streaming and compress. + while let Some(data) = nar_stream.next().await { + nar.append(&mut data?); + } + + tracing::info!("Uploading NAR {} (size {})", nar_path, nar.len()); + + api.upload_file(nar_allocation, &nar[..]).await?; + metrics.nars_uploaded.incr(); + + // Upload the narinfo. + let narinfo_path = format!("{}.narinfo", path.to_hash()); + + let narinfo_allocation = api.allocate_file_with_random_suffix(&narinfo_path).await?; + + let narinfo = path_info_to_nar_info(store.clone(), &path_info, nar_path) + .to_string() + .unwrap(); + + tracing::info!("Uploading {}: {}", narinfo_path, narinfo); + + api.upload_file(narinfo_allocation, narinfo.as_bytes()) + .await?; + metrics.narinfos_uploaded.incr(); + + Ok(()) +} + +// FIXME: move to attic. +fn path_info_to_nar_info(store: Arc, path_info: &ValidPathInfo, url: String) -> NarInfo { + NarInfo { + store_path: store.get_full_path(&path_info.path), + url, + compression: Compression::None, + file_hash: None, + file_size: None, + nar_hash: path_info.nar_hash.clone(), + nar_size: path_info.nar_size as usize, + references: path_info + .references + .iter() + .map(|r| r.file_name().unwrap().to_str().unwrap().to_owned()) + .collect(), + system: None, + deriver: None, + signatures: None, + ca: path_info.ca.clone(), + } +} diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 6c32d4e..8fe3f39 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -16,8 +16,8 @@ mod api; mod binary_cache; mod error; mod flakehub; +mod gha; mod telemetry; -mod util; use std::collections::HashSet; use std::fs::{self, create_dir_all, OpenOptions}; @@ -35,7 +35,7 @@ use tempfile::NamedTempFile; use tokio::sync::{oneshot, Mutex, RwLock}; use tracing_subscriber::filter::EnvFilter; -use gha_cache::{Api, Credentials}; +use gha_cache::Credentials; type State = Arc; @@ -113,8 +113,8 @@ struct Args { /// The global server state. struct StateInner { - /// The GitHub Actions Cache API. - api: Option, + /// State for uploading to the GHA cache. + gha_cache: Option, /// The upstream cache. upstream: Option, @@ -122,19 +122,11 @@ struct StateInner { /// The sender half of the oneshot channel to trigger a shutdown. shutdown_sender: Mutex>>, - /// List of store paths originally present. - original_paths: Mutex>, - /// Set of store path hashes that are not present in GHAC. narinfo_nagative_cache: RwLock>, - /// Endpoint of ourselves. - /// - /// This is used by our Action API to invoke `nix copy` to upload new paths. - self_endpoint: SocketAddr, - /// Metrics for sending to perf at shutdown - metrics: telemetry::TelemetryReport, + metrics: Arc, /// Connection to the local Nix store. store: Arc, @@ -148,6 +140,8 @@ async fn main_cli() -> Result<()> { let args = Args::parse(); + let metrics = Arc::new(telemetry::TelemetryReport::new()); + if let Some(parent) = Path::new(&args.nix_conf).parent() { create_dir_all(parent).with_context(|| "Creating parent directories of nix.conf")?; } @@ -213,7 +207,7 @@ async fn main_cli() -> Result<()> { None }; - let api = if args.use_gha_cache { + let gha_cache = if args.use_gha_cache { let credentials = if let Some(credentials_file) = &args.credentials_file { tracing::info!("Loading credentials from {:?}", credentials_file); let bytes = fs::read(credentials_file).with_context(|| { @@ -235,19 +229,20 @@ async fn main_cli() -> Result<()> { .with_context(|| "Failed to load credentials from environment (see README.md)")? }; - let mut api = Api::new(credentials) - .with_context(|| "Failed to initialize GitHub Actions Cache API")?; - - if let Some(cache_version) = &args.cache_version { - api.mutate_version(cache_version.as_bytes()); - } + let gha_cache = gha::GhaCache::new( + credentials, + args.cache_version, + store.clone(), + metrics.clone(), + ) + .with_context(|| "Failed to initialize GitHub Actions Cache API")?; nix_conf .write_all(format!("extra-substituters = http://{}?trusted=1&compression=zstd¶llel-compression=true&priority=1\n", args.listen).as_bytes()) .with_context(|| "Writing to nix.conf")?; tracing::info!("Native GitHub Action cache is enabled."); - Some(api) + Some(gha_cache) } else { tracing::info!("Native GitHub Action cache is disabled."); None @@ -303,13 +298,11 @@ async fn main_cli() -> Result<()> { let (shutdown_sender, shutdown_receiver) = oneshot::channel(); let state = Arc::new(StateInner { - api, + gha_cache, upstream: args.upstream.clone(), shutdown_sender: Mutex::new(Some(shutdown_sender)), - original_paths: Mutex::new(HashSet::new()), narinfo_nagative_cache: RwLock::new(HashSet::new()), - self_endpoint: args.listen.to_owned(), - metrics: telemetry::TelemetryReport::new(), + metrics, store, flakehub_state: RwLock::new(flakehub_state), }); @@ -451,8 +444,8 @@ async fn dump_api_stats( request: axum::http::Request, next: axum::middleware::Next, ) -> axum::response::Response { - if let Some(api) = &state.api { - api.dump_stats(); + if let Some(gha_cache) = &state.gha_cache { + gha_cache.api.dump_stats(); } next.run(request).await } diff --git a/magic-nix-cache/src/util.rs b/magic-nix-cache/src/util.rs deleted file mode 100644 index e9f916e..0000000 --- a/magic-nix-cache/src/util.rs +++ /dev/null @@ -1,91 +0,0 @@ -//! Utilities. - -use std::collections::HashSet; -use std::path::{Path, PathBuf}; - -use attic::nix_store::NixStore; -use tokio::{fs, process::Command}; - -use crate::error::{Error, Result}; - -/// Returns the list of store paths that are currently present. -pub async fn get_store_paths(store: &NixStore) -> Result> { - // FIXME: use the Nix API. - let store_dir = store.store_dir(); - let mut listing = fs::read_dir(store_dir).await?; - let mut paths = HashSet::new(); - while let Some(entry) = listing.next_entry().await? { - let file_name = entry.file_name(); - let file_name = Path::new(&file_name); - - if let Some(extension) = file_name.extension() { - match extension.to_str() { - None | Some("drv") | Some("lock") | Some("chroot") => { - // Malformed or not interesting - continue; - } - _ => {} - } - } - - if let Some(s) = file_name.to_str() { - // Let's not push any sources - if s.ends_with("-source") { - continue; - } - - // Special paths (so far only `.links`) - if s.starts_with('.') { - continue; - } - } - - paths.insert(store_dir.join(file_name)); - } - Ok(paths) -} - -/// Uploads a list of store paths to a store URI. -pub async fn upload_paths(mut paths: Vec, store_uri: &str) -> Result<()> { - // When the daemon started Nix may not have been installed - let env_path = Command::new("sh") - .args(["-lc", "echo $PATH"]) - .output() - .await? - .stdout; - let env_path = String::from_utf8(env_path) - .map_err(|_| Error::Config("PATH contains invalid UTF-8".to_owned()))?; - - while !paths.is_empty() { - let mut batch = Vec::new(); - let mut total_len = 0; - - while total_len < 1024 * 1024 { - if let Some(p) = paths.pop() { - total_len += p.as_os_str().len() + 1; - batch.push(p); - } else { - break; - } - } - - tracing::debug!("{} paths in this batch", batch.len()); - - let status = Command::new("nix") - .args(["--extra-experimental-features", "nix-command"]) - .args(["copy", "--to", store_uri]) - .args(&batch) - .env("PATH", &env_path) - .status() - .await?; - - if status.success() { - tracing::debug!("Uploaded batch"); - } else { - tracing::error!("Failed to upload batch: {:?}", status); - return Err(Error::FailedToUpload); - } - } - - Ok(()) -}