Trigger GHA cache uploads from the post-build hook

Also, a worker task now does the uploads directly rather than having
magic-nix-cache invoke "nix copy" via HTTP to itself.
This commit is contained in:
Eelco Dolstra 2024-02-29 16:31:29 +01:00
parent a560959d65
commit 625e95f484
6 changed files with 247 additions and 178 deletions

View file

@ -2,26 +2,19 @@
//!
//! This API is intended to be used by nix-installer-action.
use std::net::SocketAddr;
use axum::{extract::Extension, http::uri::Uri, routing::post, Json, Router};
use axum::{extract::Extension, routing::post, Json, Router};
use axum_macros::debug_handler;
use serde::{Deserialize, Serialize};
use super::State;
use crate::error::{Error, Result};
use crate::util::{get_store_paths, upload_paths};
#[derive(Debug, Clone, Serialize)]
struct WorkflowStartResponse {
num_original_paths: usize,
}
struct WorkflowStartResponse {}
#[derive(Debug, Clone, Serialize)]
struct WorkflowFinishResponse {
num_original_paths: usize,
num_final_paths: usize,
num_new_paths: usize,
//num_new_paths: usize,
}
pub fn get_router() -> Router {
@ -33,15 +26,12 @@ pub fn get_router() -> Router {
/// Record existing paths.
#[debug_handler]
async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> {
async fn workflow_start(
Extension(_state): Extension<State>,
) -> Result<Json<WorkflowStartResponse>> {
tracing::info!("Workflow started");
let mut original_paths = state.original_paths.lock().await;
*original_paths = get_store_paths(&state.store).await?;
Ok(Json(WorkflowStartResponse {
num_original_paths: original_paths.len(),
}))
Ok(Json(WorkflowStartResponse {}))
}
/// Push new paths and shut down.
@ -49,17 +39,10 @@ async fn workflow_finish(
Extension(state): Extension<State>,
) -> Result<Json<WorkflowFinishResponse>> {
tracing::info!("Workflow finished");
let original_paths = state.original_paths.lock().await;
let final_paths = get_store_paths(&state.store).await?;
let new_paths = final_paths
.difference(&original_paths)
.cloned()
.collect::<Vec<_>>();
if state.api.is_some() {
tracing::info!("Pushing {} new paths to GHA cache", new_paths.len());
let store_uri = make_store_uri(&state.self_endpoint);
upload_paths(new_paths.clone(), &store_uri).await?;
if let Some(gha_cache) = &state.gha_cache {
tracing::info!("Waiting for GitHub action cache uploads to finish");
gha_cache.shutdown().await?;
}
if let Some(sender) = state.shutdown_sender.lock().await.take() {
@ -69,36 +52,18 @@ async fn workflow_finish(
// Wait for the Attic push workers to finish.
if let Some(attic_state) = state.flakehub_state.write().await.take() {
tracing::info!("Waiting for FlakeHub cache uploads to finish");
attic_state.push_session.wait().await?;
}
}
let reply = WorkflowFinishResponse {
num_original_paths: original_paths.len(),
num_final_paths: final_paths.len(),
num_new_paths: new_paths.len(),
};
let reply = WorkflowFinishResponse {};
state
.metrics
.num_original_paths
.set(reply.num_original_paths);
state.metrics.num_final_paths.set(reply.num_final_paths);
state.metrics.num_new_paths.set(reply.num_new_paths);
//state.metrics.num_new_paths.set(num_new_paths);
Ok(Json(reply))
}
fn make_store_uri(self_endpoint: &SocketAddr) -> String {
Uri::builder()
.scheme("http")
.authority(self_endpoint.to_string())
.path_and_query("/?compression=zstd&parallel-compression=true")
.build()
.expect("Cannot construct URL to self")
.to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnqueuePathsRequest {
pub store_paths: Vec<String>,
@ -120,6 +85,12 @@ async fn enqueue_paths(
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
.collect::<Result<Vec<_>>>()?;
if let Some(gha_cache) = &state.gha_cache {
gha_cache
.enqueue_paths(state.store.clone(), store_paths.clone())
.await?;
}
if let Some(flakehub_state) = &*state.flakehub_state.read().await {
crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?;
}

View file

@ -61,8 +61,8 @@ async fn get_narinfo(
return pull_through(&state, &path);
}
if let Some(api) = &state.api {
if let Some(url) = api.get_file_url(&[&key]).await? {
if let Some(gha_cache) = &state.gha_cache {
if let Some(url) = gha_cache.api.get_file_url(&[&key]).await? {
state.metrics.narinfos_served.incr();
return Ok(Redirect::temporary(&url));
}
@ -75,6 +75,7 @@ async fn get_narinfo(
state.metrics.narinfos_negative_cache_misses.incr();
pull_through(&state, &path)
}
async fn put_narinfo(
Extension(state): Extension<State>,
Path(path): Path<String>,
@ -90,15 +91,15 @@ async fn put_narinfo(
return Err(Error::BadRequest);
}
let api = state.api.as_ref().ok_or(Error::GHADisabled)?;
let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?;
let store_path_hash = components[0].to_string();
let key = format!("{}.narinfo", store_path_hash);
let allocation = api.allocate_file_with_random_suffix(&key).await?;
let allocation = gha_cache.api.allocate_file_with_random_suffix(&key).await?;
let stream = StreamReader::new(
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);
api.upload_file(allocation, stream).await?;
gha_cache.api.upload_file(allocation, stream).await?;
state.metrics.narinfos_uploaded.incr();
state
@ -112,9 +113,10 @@ async fn put_narinfo(
async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -> Result<Redirect> {
if let Some(url) = state
.api
.gha_cache
.as_ref()
.ok_or(Error::GHADisabled)?
.api
.get_file_url(&[&path])
.await?
{
@ -129,18 +131,22 @@ async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -
Err(Error::NotFound)
}
}
async fn put_nar(
Extension(state): Extension<State>,
Path(path): Path<String>,
body: BodyStream,
) -> Result<()> {
let api = state.api.as_ref().ok_or(Error::GHADisabled)?;
let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?;
let allocation = api.allocate_file_with_random_suffix(&path).await?;
let allocation = gha_cache
.api
.allocate_file_with_random_suffix(&path)
.await?;
let stream = StreamReader::new(
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);
api.upload_file(allocation, stream).await?;
gha_cache.api.upload_file(allocation, stream).await?;
state.metrics.nars_uploaded.incr();
Ok(())

View file

@ -22,9 +22,6 @@ pub enum Error {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Failed to upload paths")]
FailedToUpload,
#[error("GHA cache is disabled")]
GHADisabled,

193
magic-nix-cache/src/gha.rs Normal file
View file

@ -0,0 +1,193 @@
use std::{collections::HashSet, sync::Arc};
use crate::error::{Error, Result};
use crate::telemetry;
use attic::nix_store::{NixStore, StorePath, ValidPathInfo};
use attic_server::narinfo::{Compression, NarInfo};
use futures::stream::StreamExt;
use gha_cache::{Api, Credentials};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
RwLock,
};
pub struct GhaCache {
/// The GitHub Actions Cache API.
pub api: Arc<Api>,
/// The future from the completion of the worker.
worker_result: RwLock<Option<tokio::task::JoinHandle<Result<()>>>>,
channel_tx: UnboundedSender<Request>,
}
#[derive(Debug)]
enum Request {
Shutdown,
Upload(StorePath),
}
impl GhaCache {
pub fn new(
credentials: Credentials,
cache_version: Option<String>,
store: Arc<NixStore>,
metrics: Arc<telemetry::TelemetryReport>,
) -> Result<GhaCache> {
let mut api = Api::new(credentials)?;
if let Some(cache_version) = &cache_version {
api.mutate_version(cache_version.as_bytes());
}
let (channel_tx, channel_rx) = unbounded_channel();
let api = Arc::new(api);
let api2 = api.clone();
let worker_result =
tokio::task::spawn(async move { worker(&api2, store, channel_rx, metrics).await });
Ok(GhaCache {
api,
worker_result: RwLock::new(Some(worker_result)),
channel_tx,
})
}
pub async fn shutdown(&self) -> Result<()> {
if let Some(worker_result) = self.worker_result.write().await.take() {
self.channel_tx
.send(Request::Shutdown)
.expect("Cannot send shutdown message");
worker_result.await.unwrap()
} else {
Ok(())
}
}
pub async fn enqueue_paths(
&self,
store: Arc<NixStore>,
store_paths: Vec<StorePath>,
) -> Result<()> {
// FIXME: make sending the closure optional. We might want to
// only send the paths that have been built by the user, under
// the assumption that everything else is already in a binary
// cache.
// FIXME: compute_fs_closure_multi doesn't return a
// toposort, though it doesn't really matter for the GHA
// cache.
let closure = store
.compute_fs_closure_multi(store_paths, false, false, false)
.await?;
for p in closure {
self.channel_tx
.send(Request::Upload(p))
.map_err(|_| Error::Internal("Cannot send upload message".to_owned()))?;
}
Ok(())
}
}
async fn worker(
api: &Api,
store: Arc<NixStore>,
mut channel_rx: UnboundedReceiver<Request>,
metrics: Arc<telemetry::TelemetryReport>,
) -> Result<()> {
let mut done = HashSet::new();
while let Some(req) = channel_rx.recv().await {
match req {
Request::Shutdown => {
break;
}
Request::Upload(path) => {
if !done.insert(path.clone()) {
continue;
}
if let Err(err) = upload_path(api, store.clone(), &path, metrics.clone()).await {
tracing::error!(
"Upload of path '{}' failed: {}",
store.get_full_path(&path).display(),
err
);
}
}
}
}
Ok(())
}
async fn upload_path(
api: &Api,
store: Arc<NixStore>,
path: &StorePath,
metrics: Arc<telemetry::TelemetryReport>,
) -> Result<()> {
let path_info = store.query_path_info(path.clone()).await?;
// Upload the NAR.
let nar_path = format!("nar/{}.nar", path_info.nar_hash.to_base32());
let nar_allocation = api.allocate_file_with_random_suffix(&nar_path).await?;
let mut nar_stream = store.nar_from_path(path.clone());
let mut nar: Vec<u8> = vec![];
// FIXME: make this streaming and compress.
while let Some(data) = nar_stream.next().await {
nar.append(&mut data?);
}
tracing::info!("Uploading NAR {} (size {})", nar_path, nar.len());
api.upload_file(nar_allocation, &nar[..]).await?;
metrics.nars_uploaded.incr();
// Upload the narinfo.
let narinfo_path = format!("{}.narinfo", path.to_hash());
let narinfo_allocation = api.allocate_file_with_random_suffix(&narinfo_path).await?;
let narinfo = path_info_to_nar_info(store.clone(), &path_info, nar_path)
.to_string()
.unwrap();
tracing::info!("Uploading {}: {}", narinfo_path, narinfo);
api.upload_file(narinfo_allocation, narinfo.as_bytes())
.await?;
metrics.narinfos_uploaded.incr();
Ok(())
}
// FIXME: move to attic.
fn path_info_to_nar_info(store: Arc<NixStore>, path_info: &ValidPathInfo, url: String) -> NarInfo {
NarInfo {
store_path: store.get_full_path(&path_info.path),
url,
compression: Compression::None,
file_hash: None,
file_size: None,
nar_hash: path_info.nar_hash.clone(),
nar_size: path_info.nar_size as usize,
references: path_info
.references
.iter()
.map(|r| r.file_name().unwrap().to_str().unwrap().to_owned())
.collect(),
system: None,
deriver: None,
signatures: None,
ca: path_info.ca.clone(),
}
}

View file

@ -16,8 +16,8 @@ mod api;
mod binary_cache;
mod error;
mod flakehub;
mod gha;
mod telemetry;
mod util;
use std::collections::HashSet;
use std::fs::{self, create_dir_all, OpenOptions};
@ -35,7 +35,7 @@ use tempfile::NamedTempFile;
use tokio::sync::{oneshot, Mutex, RwLock};
use tracing_subscriber::filter::EnvFilter;
use gha_cache::{Api, Credentials};
use gha_cache::Credentials;
type State = Arc<StateInner>;
@ -113,8 +113,8 @@ struct Args {
/// The global server state.
struct StateInner {
/// The GitHub Actions Cache API.
api: Option<Api>,
/// State for uploading to the GHA cache.
gha_cache: Option<gha::GhaCache>,
/// The upstream cache.
upstream: Option<String>,
@ -122,19 +122,11 @@ struct StateInner {
/// The sender half of the oneshot channel to trigger a shutdown.
shutdown_sender: Mutex<Option<oneshot::Sender<()>>>,
/// List of store paths originally present.
original_paths: Mutex<HashSet<PathBuf>>,
/// Set of store path hashes that are not present in GHAC.
narinfo_nagative_cache: RwLock<HashSet<String>>,
/// Endpoint of ourselves.
///
/// This is used by our Action API to invoke `nix copy` to upload new paths.
self_endpoint: SocketAddr,
/// Metrics for sending to perf at shutdown
metrics: telemetry::TelemetryReport,
metrics: Arc<telemetry::TelemetryReport>,
/// Connection to the local Nix store.
store: Arc<NixStore>,
@ -148,6 +140,8 @@ async fn main_cli() -> Result<()> {
let args = Args::parse();
let metrics = Arc::new(telemetry::TelemetryReport::new());
if let Some(parent) = Path::new(&args.nix_conf).parent() {
create_dir_all(parent).with_context(|| "Creating parent directories of nix.conf")?;
}
@ -213,7 +207,7 @@ async fn main_cli() -> Result<()> {
None
};
let api = if args.use_gha_cache {
let gha_cache = if args.use_gha_cache {
let credentials = if let Some(credentials_file) = &args.credentials_file {
tracing::info!("Loading credentials from {:?}", credentials_file);
let bytes = fs::read(credentials_file).with_context(|| {
@ -235,19 +229,20 @@ async fn main_cli() -> Result<()> {
.with_context(|| "Failed to load credentials from environment (see README.md)")?
};
let mut api = Api::new(credentials)
let gha_cache = gha::GhaCache::new(
credentials,
args.cache_version,
store.clone(),
metrics.clone(),
)
.with_context(|| "Failed to initialize GitHub Actions Cache API")?;
if let Some(cache_version) = &args.cache_version {
api.mutate_version(cache_version.as_bytes());
}
nix_conf
.write_all(format!("extra-substituters = http://{}?trusted=1&compression=zstd&parallel-compression=true&priority=1\n", args.listen).as_bytes())
.with_context(|| "Writing to nix.conf")?;
tracing::info!("Native GitHub Action cache is enabled.");
Some(api)
Some(gha_cache)
} else {
tracing::info!("Native GitHub Action cache is disabled.");
None
@ -303,13 +298,11 @@ async fn main_cli() -> Result<()> {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let state = Arc::new(StateInner {
api,
gha_cache,
upstream: args.upstream.clone(),
shutdown_sender: Mutex::new(Some(shutdown_sender)),
original_paths: Mutex::new(HashSet::new()),
narinfo_nagative_cache: RwLock::new(HashSet::new()),
self_endpoint: args.listen.to_owned(),
metrics: telemetry::TelemetryReport::new(),
metrics,
store,
flakehub_state: RwLock::new(flakehub_state),
});
@ -451,8 +444,8 @@ async fn dump_api_stats<B>(
request: axum::http::Request<B>,
next: axum::middleware::Next<B>,
) -> axum::response::Response {
if let Some(api) = &state.api {
api.dump_stats();
if let Some(gha_cache) = &state.gha_cache {
gha_cache.api.dump_stats();
}
next.run(request).await
}

View file

@ -1,91 +0,0 @@
//! Utilities.
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use attic::nix_store::NixStore;
use tokio::{fs, process::Command};
use crate::error::{Error, Result};
/// Returns the list of store paths that are currently present.
pub async fn get_store_paths(store: &NixStore) -> Result<HashSet<PathBuf>> {
// FIXME: use the Nix API.
let store_dir = store.store_dir();
let mut listing = fs::read_dir(store_dir).await?;
let mut paths = HashSet::new();
while let Some(entry) = listing.next_entry().await? {
let file_name = entry.file_name();
let file_name = Path::new(&file_name);
if let Some(extension) = file_name.extension() {
match extension.to_str() {
None | Some("drv") | Some("lock") | Some("chroot") => {
// Malformed or not interesting
continue;
}
_ => {}
}
}
if let Some(s) = file_name.to_str() {
// Let's not push any sources
if s.ends_with("-source") {
continue;
}
// Special paths (so far only `.links`)
if s.starts_with('.') {
continue;
}
}
paths.insert(store_dir.join(file_name));
}
Ok(paths)
}
/// Uploads a list of store paths to a store URI.
pub async fn upload_paths(mut paths: Vec<PathBuf>, store_uri: &str) -> Result<()> {
// When the daemon started Nix may not have been installed
let env_path = Command::new("sh")
.args(["-lc", "echo $PATH"])
.output()
.await?
.stdout;
let env_path = String::from_utf8(env_path)
.map_err(|_| Error::Config("PATH contains invalid UTF-8".to_owned()))?;
while !paths.is_empty() {
let mut batch = Vec::new();
let mut total_len = 0;
while total_len < 1024 * 1024 {
if let Some(p) = paths.pop() {
total_len += p.as_os_str().len() + 1;
batch.push(p);
} else {
break;
}
}
tracing::debug!("{} paths in this batch", batch.len());
let status = Command::new("nix")
.args(["--extra-experimental-features", "nix-command"])
.args(["copy", "--to", store_uri])
.args(&batch)
.env("PATH", &env_path)
.status()
.await?;
if status.success() {
tracing::debug!("Uploaded batch");
} else {
tracing::error!("Failed to upload batch: {:?}", status);
return Err(Error::FailedToUpload);
}
}
Ok(())
}