From d683065dc19d2c79ac7bf5d4b96da28b1ffbd414 Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Fri, 19 May 2023 02:48:52 -0600 Subject: [PATCH] Automatically push new store paths at the end of workflow --- Cargo.lock | 14 ++++++ nix-actions-cache/Cargo.toml | 2 + nix-actions-cache/src/api.rs | 68 ++++++++++++++++++++++++++++ nix-actions-cache/src/error.rs | 9 ++++ nix-actions-cache/src/main.rs | 32 +++++++------ nix-actions-cache/src/util.rs | 82 ++++++++++++++++++++++++++++++++++ 6 files changed, 190 insertions(+), 17 deletions(-) create mode 100644 nix-actions-cache/src/api.rs create mode 100644 nix-actions-cache/src/util.rs diff --git a/Cargo.lock b/Cargo.lock index 9f0b8bf..322cabd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,6 +123,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bb524613be645939e280b7279f7b017f98cf7f5ef084ec374df373530e73277" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.15", +] + [[package]] name = "base64" version = "0.21.0" @@ -721,10 +733,12 @@ name = "nix-actions-cache" version = "0.1.0" dependencies = [ "axum", + "axum-macros", "clap", "daemonize", "gha-cache", "rand", + "serde", "serde_json", "thiserror", "tokio", diff --git a/nix-actions-cache/Cargo.toml b/nix-actions-cache/Cargo.toml index ce71b96..4ccf541 100644 --- a/nix-actions-cache/Cargo.toml +++ b/nix-actions-cache/Cargo.toml @@ -9,10 +9,12 @@ edition = "2021" gha-cache = { path = "../gha-cache" } axum = "0.6.18" +axum-macros = "0.3.7" clap = { version = "4.2.7", features = ["derive"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } tower-http = { version = "0.4.0", features = ["trace"] } +serde = { version = "1.0.162", features = ["derive"] } serde_json = "1.0.96" thiserror = "1.0.40" tokio-stream = "0.1.14" diff --git a/nix-actions-cache/src/api.rs b/nix-actions-cache/src/api.rs new file mode 100644 index 0000000..66a888f --- /dev/null +++ b/nix-actions-cache/src/api.rs @@ -0,0 +1,68 @@ +//! Action API. +//! +//! This API is intended to be used by nix-installer-action. + +use axum::{extract::Extension, routing::post, Json, Router}; +use axum_macros::debug_handler; +use serde::Serialize; + +use super::State; +use crate::error::Result; +use crate::util::{get_store_paths, upload_paths}; + +#[derive(Debug, Clone, Serialize)] +struct WorkflowStartResponse { + num_original_paths: usize, +} + +#[derive(Debug, Clone, Serialize)] +struct WorkflowFinishResponse { + num_original_paths: usize, + num_final_paths: usize, + num_new_paths: usize, +} + +pub fn get_router() -> Router { + Router::new() + .route("/api/workflow-start", post(workflow_start)) + .route("/api/workflow-finish", post(workflow_finish)) +} + +/// Record existing paths. +#[debug_handler] +async fn workflow_start(Extension(state): Extension) -> Result> { + tracing::info!("Workflow started"); + + let mut original_paths = state.original_paths.lock().await; + *original_paths = get_store_paths().await?; + + Ok(Json(WorkflowStartResponse { + num_original_paths: original_paths.len(), + })) +} + +/// Push new paths and shut down. +async fn workflow_finish( + Extension(state): Extension, +) -> Result> { + tracing::info!("Workflow finished"); + + let original_paths = state.original_paths.lock().await; + let final_paths = get_store_paths().await?; + let new_paths = final_paths + .difference(&original_paths) + .cloned() + .collect::>(); + + tracing::info!("Pushing {} new paths", new_paths.len()); + upload_paths(new_paths.clone()).await?; + + let sender = state.shutdown_sender.lock().await.take().unwrap(); + sender.send(()).unwrap(); + + Ok(Json(WorkflowFinishResponse { + num_original_paths: original_paths.len(), + num_final_paths: final_paths.len(), + num_new_paths: new_paths.len(), + })) +} diff --git a/nix-actions-cache/src/error.rs b/nix-actions-cache/src/error.rs index cc02856..fffb0ea 100644 --- a/nix-actions-cache/src/error.rs +++ b/nix-actions-cache/src/error.rs @@ -1,5 +1,7 @@ //! Errors. +use std::io::Error as IoError; + use axum::{ http::StatusCode, response::{IntoResponse, Response}, @@ -20,6 +22,12 @@ pub enum Error { #[error("Bad Request")] BadRequest, + + #[error("I/O error: {0}")] + IoError(#[from] IoError), + + #[error("Failed to upload paths")] + FailedToUpload, } impl IntoResponse for Error { @@ -29,6 +37,7 @@ impl IntoResponse for Error { Self::ApiError(_) => StatusCode::IM_A_TEAPOT, Self::NotFound => StatusCode::NOT_FOUND, Self::BadRequest => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, }; (code, format!("{}", self)).into_response() diff --git a/nix-actions-cache/src/main.rs b/nix-actions-cache/src/main.rs index b2f7f58..43cee38 100644 --- a/nix-actions-cache/src/main.rs +++ b/nix-actions-cache/src/main.rs @@ -13,23 +13,25 @@ deny(unused_imports, unused_mut, unused_variables,) )] +mod api; mod binary_cache; mod error; +mod util; +use std::collections::HashSet; use std::fs::{self, File}; use std::net::SocketAddr; use std::os::fd::OwnedFd; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; -use axum::{ - extract::Extension, - routing::{get, post}, - Router, -}; +use axum::{extract::Extension, routing::get, Router}; use clap::Parser; use daemonize::Daemonize; -use tokio::{runtime::Runtime, sync::oneshot}; +use tokio::{ + runtime::Runtime, + sync::{oneshot, Mutex}, +}; use tracing_subscriber::EnvFilter; use gha_cache::{Api, Credentials}; @@ -79,6 +81,9 @@ struct StateInner { api: Api, upstream: Option, shutdown_sender: Mutex>>, + + /// List of store paths originally present. + original_paths: Mutex>, } fn main() { @@ -109,11 +114,12 @@ fn main() { api, upstream: args.upstream.clone(), shutdown_sender: Mutex::new(Some(shutdown_sender)), + original_paths: Mutex::new(HashSet::new()), }); let app = Router::new() .route("/", get(root)) - .route("/api/finish", post(finish)) + .merge(api::get_router()) .merge(binary_cache::get_router()); #[cfg(debug_assertions)] @@ -155,7 +161,7 @@ fn init_logging() { return EnvFilter::new("info,gha_cache=debug,nix_action_cache=debug"); #[cfg(not(debug_assertions))] - return EnvFilter::default(); + return EnvFilter::new("info"); }); tracing_subscriber::fmt().with_env_filter(filter).init(); } @@ -173,11 +179,3 @@ async fn dump_api_stats( async fn root() -> &'static str { "cache the world 🚀" } - -async fn finish(Extension(state): Extension) -> &'static str { - tracing::info!("Workflow finished - Pushing new store paths"); - let sender = state.shutdown_sender.lock().unwrap().take().unwrap(); - sender.send(()).unwrap(); - - "Shutting down" -} diff --git a/nix-actions-cache/src/util.rs b/nix-actions-cache/src/util.rs new file mode 100644 index 0000000..c5faeab --- /dev/null +++ b/nix-actions-cache/src/util.rs @@ -0,0 +1,82 @@ +//! Utilities. + +use std::collections::HashSet; +use std::path::{Path, PathBuf}; + +use tokio::{fs, process::Command}; + +use crate::error::{Error, Result}; + +/// Returns the list of store paths that are currently present. +pub async fn get_store_paths() -> Result> { + let store_dir = Path::new("/nix/store"); + let mut listing = fs::read_dir(store_dir).await?; + let mut paths = HashSet::new(); + while let Some(entry) = listing.next_entry().await? { + let file_name = entry.file_name(); + let file_name = Path::new(&file_name); + + if let Some(extension) = file_name.extension() { + match extension.to_str() { + None | Some("drv") | Some("lock") => { + // Malformed or not interesting + continue; + } + _ => {} + } + } + + if let Some(s) = file_name.to_str() { + // Let's not push any sources + if s.ends_with("-source") { + continue; + } + } + + paths.insert(store_dir.join(file_name)); + } + Ok(paths) +} + +/// Uploads a list of store paths to the cache. +pub async fn upload_paths(mut paths: Vec) -> Result<()> { + // When the daemon started Nix may not have been installed + let env_path = Command::new("sh") + .args(&["-lc", "echo $PATH"]) + .output() + .await? + .stdout; + let env_path = String::from_utf8(env_path) + .expect("PATH contains invalid UTF-8"); + + while !paths.is_empty() { + let mut batch = Vec::new(); + let mut total_len = 0; + + while !paths.is_empty() && total_len < 1024 * 1024 { + let p = paths.pop().unwrap(); + total_len += p.as_os_str().len() + 1; + batch.push(p); + } + + tracing::debug!("{} paths in this batch", batch.len()); + + let status = Command::new("nix") + .args(&["--extra-experimental-features", "nix-command"]) + // FIXME: Port and compression settings + .args(&["copy", "--to", "http://127.0.0.1:3000"]) + .args(&batch) + .env("PATH", &env_path) + .status() + .await?; + + if status.success() { + tracing::debug!("Uploaded batch"); + } else { + tracing::error!("Failed to upload batch: {:?}", status); + return Err(Error::FailedToUpload); + } + } + + Ok(()) +}