From 6bf609975a45edf01d2fe3c05cee8f491dbf4346 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 13 Dec 2023 22:17:08 +0100 Subject: [PATCH] Push paths to FlakeHub from the post-build-hook Also get rid of daemonization, it causes too many problems with tokio. --- magic-nix-cache/src/api.rs | 47 +++++++++---- magic-nix-cache/src/flakehub.rs | 58 ++++++--------- magic-nix-cache/src/main.rs | 120 ++++++++++++++++++-------------- 3 files changed, 125 insertions(+), 100 deletions(-) diff --git a/magic-nix-cache/src/api.rs b/magic-nix-cache/src/api.rs index 3a8ea2f..299f040 100644 --- a/magic-nix-cache/src/api.rs +++ b/magic-nix-cache/src/api.rs @@ -6,7 +6,7 @@ use std::net::SocketAddr; use axum::{extract::Extension, http::uri::Uri, routing::post, Json, Router}; use axum_macros::debug_handler; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use super::State; use crate::error::Result; @@ -28,6 +28,7 @@ pub fn get_router() -> Router { Router::new() .route("/api/workflow-start", post(workflow_start)) .route("/api/workflow-finish", post(workflow_finish)) + .route("/api/enqueue-paths", post(enqueue_paths)) } /// Record existing paths. @@ -62,20 +63,14 @@ async fn workflow_finish( upload_paths(new_paths.clone(), &store_uri).await?; } - if let Some(attic_state) = &state.flakehub_state { - tracing::info!("Pushing {} new paths to Attic", new_paths.len()); - - let new_paths = new_paths - .iter() - .map(|path| state.store.follow_store_path(path).unwrap()) - .collect(); - - crate::flakehub::push(attic_state, state.store.clone(), new_paths).await?; - } - let sender = state.shutdown_sender.lock().await.take().unwrap(); sender.send(()).unwrap(); + // Wait for the Attic push workers to finish. + if let Some(attic_state) = state.flakehub_state.write().await.take() { + attic_state.push_session.wait().await.unwrap(); + } + let reply = WorkflowFinishResponse { num_original_paths: original_paths.len(), num_final_paths: final_paths.len(), @@ -101,3 +96,31 @@ fn make_store_uri(self_endpoint: &SocketAddr) -> String { .unwrap() .to_string() } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EnqueuePathsRequest { + pub store_paths: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EnqueuePathsResponse {} + +/// Schedule paths in the local Nix store for uploading. +async fn enqueue_paths( + Extension(state): Extension, + Json(req): Json, +) -> Result> { + tracing::info!("Enqueueing {:?}", req.store_paths); + + let store_paths: Vec<_> = req + .store_paths + .iter() + .map(|path| state.store.follow_store_path(path).unwrap()) + .collect(); + + if let Some(flakehub_state) = &*state.flakehub_state.read().await { + crate::flakehub::enqueue_paths(flakehub_state, store_paths).await.unwrap(); + } + + Ok(Json(EnqueuePathsResponse {})) +} diff --git a/magic-nix-cache/src/flakehub.rs b/magic-nix-cache/src/flakehub.rs index c935ee7..43c92d5 100644 --- a/magic-nix-cache/src/flakehub.rs +++ b/magic-nix-cache/src/flakehub.rs @@ -2,6 +2,7 @@ use crate::error::Result; use attic::api::v1::cache_config::{CreateCacheRequest, KeypairConfig}; use attic::cache::CacheSliceIdentifier; use attic::nix_store::{NixStore, StorePath}; +use attic_client::push::{PushSession, PushSessionConfig}; use attic_client::{ api::{ApiClient, ApiError}, config::ServerConfig, @@ -23,13 +24,14 @@ pub struct State { pub substituter: String, - api: ApiClient, + pub push_session: PushSession, } pub async fn init_cache( flakehub_api_server: &str, flakehub_api_server_netrc: &Path, flakehub_cache_server: &str, + store: Arc, ) -> Result { // Parse netrc to get the credentials for api.flakehub.com. let netrc = { @@ -225,15 +227,7 @@ pub async fn init_cache( tracing::info!("Created cache {} on {}.", cache_name, flakehub_cache_server); } - Ok(State { - cache, - substituter: flakehub_cache_server.to_owned(), - api, - }) -} - -pub async fn push(state: &State, store: Arc, store_paths: Vec) -> Result<()> { - let cache_config = state.api.get_cache_config(&state.cache).await.unwrap(); + let cache_config = api.get_cache_config(&cache).await.unwrap(); let push_config = PushConfig { num_workers: 5, // FIXME: use number of CPUs? @@ -242,37 +236,31 @@ pub async fn push(state: &State, store: Arc, store_paths: Vec, +) -> Result<()> { + state.push_session.queue_many(store_paths).unwrap(); Ok(()) } diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 076cdf5..86abd29 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -21,21 +21,16 @@ mod telemetry; mod util; use std::collections::HashSet; -use std::fs::{self, create_dir_all, File, OpenOptions}; +use std::fs::{self, create_dir_all, OpenOptions}; use std::io::Write; use std::net::SocketAddr; -use std::os::fd::OwnedFd; use std::path::{Path, PathBuf}; use std::sync::Arc; use ::attic::nix_store::NixStore; use axum::{extract::Extension, routing::get, Router}; use clap::Parser; -use daemonize::Daemonize; -use tokio::{ - runtime::Runtime, - sync::{oneshot, Mutex, RwLock}, -}; +use tokio::sync::{oneshot, Mutex, RwLock}; use tracing_subscriber::filter::EnvFilter; use gha_cache::{Api, Credentials}; @@ -82,12 +77,6 @@ struct Args { )] diagnostic_endpoint: String, - /// Daemonize the server. - /// - /// This is for use in the GitHub Action only. - #[arg(long, hide = true)] - daemon_dir: Option, - /// The FlakeHub API server. #[arg(long)] flakehub_api_server: Option, @@ -142,10 +131,10 @@ struct StateInner { store: Arc, /// FlakeHub cache state. - flakehub_state: Option, + flakehub_state: RwLock>, } -fn main() { +async fn main_cli() { init_logging(); let args = Args::parse(); @@ -169,18 +158,16 @@ fn main() { .flakehub_api_server_netrc .expect("--flakehub-api-server-netrc is required"); - let rt = Runtime::new().unwrap(); - - match rt.block_on(async { - flakehub::init_cache( - &args - .flakehub_api_server - .expect("--flakehub-api-server is required"), - &flakehub_api_server_netrc, - &flakehub_cache_server, - ) - .await - }) { + match flakehub::init_cache( + &args + .flakehub_api_server + .expect("--flakehub-api-server is required"), + &flakehub_api_server_netrc, + &flakehub_cache_server, + store.clone(), + ) + .await + { Ok(state) => { nix_conf .write_all( @@ -236,7 +223,15 @@ fn main() { }; nix_conf - .write_all("fallback = true\n".as_bytes()) + .write_all( + format!( + "fallback = true\npost-build-hook = {}\n", + std::env::current_exe() + .expect("Getting the path of magic-nix-cache") + .display() + ) + .as_bytes(), + ) .expect("Writing to nix.conf"); drop(nix_conf); @@ -260,7 +255,7 @@ fn main() { self_endpoint: args.listen.to_owned(), metrics: telemetry::TelemetryReport::new(), store, - flakehub_state, + flakehub_state: RwLock::new(flakehub_state), }); let app = Router::new() @@ -275,35 +270,54 @@ fn main() { let app = app.layer(Extension(state.clone())); - if args.daemon_dir.is_some() { - let dir = args.daemon_dir.as_ref().unwrap(); - let logfile: OwnedFd = File::create(dir.join("daemon.log")).unwrap().into(); - let daemon = Daemonize::new() - .pid_file(dir.join("daemon.pid")) - .stdout(File::from(logfile.try_clone().unwrap())) - .stderr(File::from(logfile)); + tracing::info!("Listening on {}", args.listen); + let ret = axum::Server::bind(&args.listen) + .serve(app.into_make_service()) + .with_graceful_shutdown(async move { + shutdown_receiver.await.ok(); + tracing::info!("Shutting down"); + }) + .await; - tracing::info!("Forking into the background"); - daemon.start().expect("Failed to fork into the background"); + if let Some(diagnostic_endpoint) = diagnostic_endpoint { + state.metrics.send(diagnostic_endpoint).await; } - let rt = Runtime::new().unwrap(); - rt.block_on(async move { - tracing::info!("Listening on {}", args.listen); - let ret = axum::Server::bind(&args.listen) - .serve(app.into_make_service()) - .with_graceful_shutdown(async move { - shutdown_receiver.await.ok(); - tracing::info!("Shutting down"); + ret.unwrap() +} - if let Some(diagnostic_endpoint) = diagnostic_endpoint { - state.metrics.send(diagnostic_endpoint).await; - } - }) - .await; +async fn post_build_hook(out_paths: &str) { + let store_paths: Vec<_> = out_paths.lines().map(str::to_owned).collect(); - ret.unwrap() - }); + let request = api::EnqueuePathsRequest { store_paths }; + + let response = reqwest::Client::new() + .post(format!( + "http://{}/api/enqueue-paths", + std::env::var("INPUT_LISTEN").unwrap_or_else(|_| "127.0.0.1:37515".to_owned()) + )) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&request).unwrap()) + .send() + .await + .unwrap(); + + if !response.status().is_success() { + eprintln!( + "magic-nix-cache server failed to enqueue the push request: {}", + response.status() + ); + } else { + response.json::().await.unwrap(); + } +} + +#[tokio::main] +async fn main() { + match std::env::var("OUT_PATHS") { + Ok(out_paths) => post_build_hook(&out_paths).await, + Err(_) => main_cli().await, + } } fn init_logging() {