Automatically push new store paths at the end of workflow

This commit is contained in:
Zhaofeng Li 2023-05-19 02:48:52 -06:00
parent 33d85fe7aa
commit d683065dc1
6 changed files with 190 additions and 17 deletions

14
Cargo.lock generated
View file

@ -123,6 +123,18 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum-macros"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bb524613be645939e280b7279f7b017f98cf7f5ef084ec374df373530e73277"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.15",
]
[[package]]
name = "base64"
version = "0.21.0"
@ -721,10 +733,12 @@ name = "nix-actions-cache"
version = "0.1.0"
dependencies = [
"axum",
"axum-macros",
"clap",
"daemonize",
"gha-cache",
"rand",
"serde",
"serde_json",
"thiserror",
"tokio",

View file

@ -9,10 +9,12 @@ edition = "2021"
gha-cache = { path = "../gha-cache" }
axum = "0.6.18"
axum-macros = "0.3.7"
clap = { version = "4.2.7", features = ["derive"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tower-http = { version = "0.4.0", features = ["trace"] }
serde = { version = "1.0.162", features = ["derive"] }
serde_json = "1.0.96"
thiserror = "1.0.40"
tokio-stream = "0.1.14"

View file

@ -0,0 +1,68 @@
//! Action API.
//!
//! This API is intended to be used by nix-installer-action.
use axum::{extract::Extension, routing::post, Json, Router};
use axum_macros::debug_handler;
use serde::Serialize;
use super::State;
use crate::error::Result;
use crate::util::{get_store_paths, upload_paths};
#[derive(Debug, Clone, Serialize)]
struct WorkflowStartResponse {
num_original_paths: usize,
}
#[derive(Debug, Clone, Serialize)]
struct WorkflowFinishResponse {
num_original_paths: usize,
num_final_paths: usize,
num_new_paths: usize,
}
pub fn get_router() -> Router {
Router::new()
.route("/api/workflow-start", post(workflow_start))
.route("/api/workflow-finish", post(workflow_finish))
}
/// Record existing paths.
#[debug_handler]
async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> {
tracing::info!("Workflow started");
let mut original_paths = state.original_paths.lock().await;
*original_paths = get_store_paths().await?;
Ok(Json(WorkflowStartResponse {
num_original_paths: original_paths.len(),
}))
}
/// Push new paths and shut down.
async fn workflow_finish(
Extension(state): Extension<State>,
) -> Result<Json<WorkflowFinishResponse>> {
tracing::info!("Workflow finished");
let original_paths = state.original_paths.lock().await;
let final_paths = get_store_paths().await?;
let new_paths = final_paths
.difference(&original_paths)
.cloned()
.collect::<Vec<_>>();
tracing::info!("Pushing {} new paths", new_paths.len());
upload_paths(new_paths.clone()).await?;
let sender = state.shutdown_sender.lock().await.take().unwrap();
sender.send(()).unwrap();
Ok(Json(WorkflowFinishResponse {
num_original_paths: original_paths.len(),
num_final_paths: final_paths.len(),
num_new_paths: new_paths.len(),
}))
}

View file

@ -1,5 +1,7 @@
//! Errors.
use std::io::Error as IoError;
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
@ -20,6 +22,12 @@ pub enum Error {
#[error("Bad Request")]
BadRequest,
#[error("I/O error: {0}")]
IoError(#[from] IoError),
#[error("Failed to upload paths")]
FailedToUpload,
}
impl IntoResponse for Error {
@ -29,6 +37,7 @@ impl IntoResponse for Error {
Self::ApiError(_) => StatusCode::IM_A_TEAPOT,
Self::NotFound => StatusCode::NOT_FOUND,
Self::BadRequest => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
(code, format!("{}", self)).into_response()

View file

@ -13,23 +13,25 @@
deny(unused_imports, unused_mut, unused_variables,)
)]
mod api;
mod binary_cache;
mod error;
mod util;
use std::collections::HashSet;
use std::fs::{self, File};
use std::net::SocketAddr;
use std::os::fd::OwnedFd;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use axum::{
extract::Extension,
routing::{get, post},
Router,
};
use axum::{extract::Extension, routing::get, Router};
use clap::Parser;
use daemonize::Daemonize;
use tokio::{runtime::Runtime, sync::oneshot};
use tokio::{
runtime::Runtime,
sync::{oneshot, Mutex},
};
use tracing_subscriber::EnvFilter;
use gha_cache::{Api, Credentials};
@ -79,6 +81,9 @@ struct StateInner {
api: Api,
upstream: Option<String>,
shutdown_sender: Mutex<Option<oneshot::Sender<()>>>,
/// List of store paths originally present.
original_paths: Mutex<HashSet<PathBuf>>,
}
fn main() {
@ -109,11 +114,12 @@ fn main() {
api,
upstream: args.upstream.clone(),
shutdown_sender: Mutex::new(Some(shutdown_sender)),
original_paths: Mutex::new(HashSet::new()),
});
let app = Router::new()
.route("/", get(root))
.route("/api/finish", post(finish))
.merge(api::get_router())
.merge(binary_cache::get_router());
#[cfg(debug_assertions)]
@ -155,7 +161,7 @@ fn init_logging() {
return EnvFilter::new("info,gha_cache=debug,nix_action_cache=debug");
#[cfg(not(debug_assertions))]
return EnvFilter::default();
return EnvFilter::new("info");
});
tracing_subscriber::fmt().with_env_filter(filter).init();
}
@ -173,11 +179,3 @@ async fn dump_api_stats<B>(
async fn root() -> &'static str {
"cache the world 🚀"
}
async fn finish(Extension(state): Extension<State>) -> &'static str {
tracing::info!("Workflow finished - Pushing new store paths");
let sender = state.shutdown_sender.lock().unwrap().take().unwrap();
sender.send(()).unwrap();
"Shutting down"
}

View file

@ -0,0 +1,82 @@
//! Utilities.
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use tokio::{fs, process::Command};
use crate::error::{Error, Result};
/// Returns the list of store paths that are currently present.
pub async fn get_store_paths() -> Result<HashSet<PathBuf>> {
let store_dir = Path::new("/nix/store");
let mut listing = fs::read_dir(store_dir).await?;
let mut paths = HashSet::new();
while let Some(entry) = listing.next_entry().await? {
let file_name = entry.file_name();
let file_name = Path::new(&file_name);
if let Some(extension) = file_name.extension() {
match extension.to_str() {
None | Some("drv") | Some("lock") => {
// Malformed or not interesting
continue;
}
_ => {}
}
}
if let Some(s) = file_name.to_str() {
// Let's not push any sources
if s.ends_with("-source") {
continue;
}
}
paths.insert(store_dir.join(file_name));
}
Ok(paths)
}
/// Uploads a list of store paths to the cache.
pub async fn upload_paths(mut paths: Vec<PathBuf>) -> Result<()> {
// When the daemon started Nix may not have been installed
let env_path = Command::new("sh")
.args(&["-lc", "echo $PATH"])
.output()
.await?
.stdout;
let env_path = String::from_utf8(env_path)
.expect("PATH contains invalid UTF-8");
while !paths.is_empty() {
let mut batch = Vec::new();
let mut total_len = 0;
while !paths.is_empty() && total_len < 1024 * 1024 {
let p = paths.pop().unwrap();
total_len += p.as_os_str().len() + 1;
batch.push(p);
}
tracing::debug!("{} paths in this batch", batch.len());
let status = Command::new("nix")
.args(&["--extra-experimental-features", "nix-command"])
// FIXME: Port and compression settings
.args(&["copy", "--to", "http://127.0.0.1:3000"])
.args(&batch)
.env("PATH", &env_path)
.status()
.await?;
if status.success() {
tracing::debug!("Uploaded batch");
} else {
tracing::error!("Failed to upload batch: {:?}", status);
return Err(Error::FailedToUpload);
}
}
Ok(())
}