diff --git a/magic-nix-cache/src/api.rs b/magic-nix-cache/src/api.rs index 5b1cdb8..c9437d2 100644 --- a/magic-nix-cache/src/api.rs +++ b/magic-nix-cache/src/api.rs @@ -2,6 +2,7 @@ //! //! This API is intended to be used by nix-installer-action. +use attic::nix_store::StorePath; use axum::{extract::Extension, routing::post, Json, Router}; use axum_macros::debug_handler; use serde::{Deserialize, Serialize}; @@ -10,28 +11,46 @@ use super::State; use crate::error::{Error, Result}; #[derive(Debug, Clone, Serialize)] -struct WorkflowStartResponse {} +struct WorkflowStartResponse { + num_original_paths: Option, +} #[derive(Debug, Clone, Serialize)] struct WorkflowFinishResponse { - //num_new_paths: usize, + num_original_paths: Option, + num_final_paths: Option, + num_new_paths: Option, } pub fn get_router() -> Router { Router::new() .route("/api/workflow-start", post(workflow_start)) .route("/api/workflow-finish", post(workflow_finish)) - .route("/api/enqueue-paths", post(enqueue_paths)) + .route("/api/enqueue-paths", post(post_enqueue_paths)) } /// Record existing paths. #[debug_handler] -async fn workflow_start( - Extension(_state): Extension, -) -> Result> { +async fn workflow_start(Extension(state): Extension) -> Result> { tracing::info!("Workflow started"); + let reply = if let Some(original_paths) = &state.original_paths { + let mut original_paths = original_paths.lock().await; + *original_paths = crate::util::get_store_paths(&state.store).await?; - Ok(Json(WorkflowStartResponse {})) + let reply = WorkflowStartResponse { + num_original_paths: Some(original_paths.len()), + }; + + state.metrics.num_original_paths.set(original_paths.len()); + + reply + } else { + WorkflowStartResponse { + num_original_paths: None, + } + }; + + Ok(Json(reply)) } /// Push new paths and shut down. @@ -40,6 +59,43 @@ async fn workflow_finish( ) -> Result> { tracing::info!("Workflow finished"); + let response = if let Some(original_paths) = &state.original_paths { + let original_paths = original_paths.lock().await; + let final_paths = crate::util::get_store_paths(&state.store).await?; + let new_paths = final_paths + .difference(&original_paths) + .cloned() + .map(|path| state.store.follow_store_path(path).map_err(Error::Attic)) + .collect::>>()?; + + let num_original_paths = original_paths.len(); + let num_final_paths = final_paths.len(); + let num_new_paths = new_paths.len(); + + let reply = WorkflowFinishResponse { + num_original_paths: Some(num_original_paths), + num_final_paths: Some(num_final_paths), + num_new_paths: Some(num_new_paths), + }; + + state.metrics.num_original_paths.set(num_original_paths); + state.metrics.num_final_paths.set(num_final_paths); + state.metrics.num_new_paths.set(num_new_paths); + + // NOTE(cole-h): If we're substituting from an upstream cache, those paths won't have the + // post-build-hook run on it, so we diff the store to ensure we cache everything we can. + tracing::info!("Diffing the store and uploading any new paths before we shut down"); + enqueue_paths(&state, new_paths).await?; + + reply + } else { + WorkflowFinishResponse { + num_original_paths: None, + num_final_paths: None, + num_new_paths: None, + } + }; + if let Some(gha_cache) = &state.gha_cache { tracing::info!("Waiting for GitHub action cache uploads to finish"); gha_cache.shutdown().await?; @@ -63,11 +119,7 @@ async fn workflow_finish( println!("\n{logfile_contents}\n"); } - let reply = WorkflowFinishResponse {}; - - //state.metrics.num_new_paths.set(num_new_paths); - - Ok(Json(reply)) + Ok(Json(response)) } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -80,7 +132,7 @@ pub struct EnqueuePathsResponse {} /// Schedule paths in the local Nix store for uploading. #[tracing::instrument(skip_all)] -async fn enqueue_paths( +async fn post_enqueue_paths( Extension(state): Extension, Json(req): Json, ) -> Result> { @@ -92,6 +144,12 @@ async fn enqueue_paths( .map(|path| state.store.follow_store_path(path).map_err(Error::Attic)) .collect::>>()?; + enqueue_paths(&state, store_paths).await?; + + Ok(Json(EnqueuePathsResponse {})) +} + +async fn enqueue_paths(state: &State, store_paths: Vec) -> Result<()> { if let Some(gha_cache) = &state.gha_cache { gha_cache .enqueue_paths(state.store.clone(), store_paths.clone()) @@ -102,5 +160,5 @@ async fn enqueue_paths( crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?; } - Ok(Json(EnqueuePathsResponse {})) + Ok(()) } diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index ec75316..d2cc09a 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -19,6 +19,7 @@ mod error; mod flakehub; mod gha; mod telemetry; +mod util; use std::collections::HashSet; use std::fs::{self, create_dir_all}; @@ -119,6 +120,10 @@ struct Args { /// File to write to when indicating startup. #[arg(long)] startup_notification_file: Option, + + /// Whether or not to diff the store before and after Magic Nix Cache runs + #[arg(long, default_value_t = false)] + diff_store: bool, } impl Args { @@ -164,6 +169,9 @@ struct StateInner { /// Where all of tracing will log to when GitHub Actions is run in debug mode logfile: Option, + + /// The paths in the Nix store when Magic Nix Cache started, if store diffing is enabled. + original_paths: Option>>, } async fn main_cli() -> Result<()> { @@ -352,6 +360,7 @@ async fn main_cli() -> Result<()> { let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new())); let state = Arc::new(StateInner { gha_cache, upstream: args.upstream.clone(), @@ -361,6 +370,7 @@ async fn main_cli() -> Result<()> { store, flakehub_state: RwLock::new(flakehub_state), logfile: guard.logfile, + original_paths, }); let app = Router::new() diff --git a/magic-nix-cache/src/util.rs b/magic-nix-cache/src/util.rs new file mode 100644 index 0000000..2c7e759 --- /dev/null +++ b/magic-nix-cache/src/util.rs @@ -0,0 +1,42 @@ +//! Utilities. + +use std::collections::HashSet; +use std::path::{Path, PathBuf}; + +use attic::nix_store::NixStore; + +use crate::error::Result; + +/// Returns the list of store paths that are currently present. +pub async fn get_store_paths(store: &NixStore) -> Result> { + // FIXME: use the Nix API. + let store_dir = store.store_dir(); + let mut listing = tokio::fs::read_dir(store_dir).await?; + let mut paths = HashSet::new(); + while let Some(entry) = listing.next_entry().await? { + let file_name = entry.file_name(); + let file_name = Path::new(&file_name); + + if let Some(extension) = file_name.extension() { + match extension.to_str() { + None | Some("drv") | Some("chroot") => { + tracing::debug!( + "skipping file with weird or uninteresting extension {extension:?}" + ); + continue; + } + _ => {} + } + } + + if let Some(s) = file_name.to_str() { + // Special paths (so far only `.links`) + if s == ".links" { + continue; + } + } + + paths.insert(store_dir.join(file_name)); + } + Ok(paths) +}