From e5d511802278e25c7e78614e8cca7d911d2553fa Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Wed, 7 Aug 2024 17:44:18 -0700 Subject: [PATCH 01/12] uds subscription wip --- magic-nix-cache/src/main.rs | 165 ++++++++++++++++++++++++------------ 1 file changed, 112 insertions(+), 53 deletions(-) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 9ecc088..20aa088 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -31,11 +31,16 @@ use std::sync::Arc; use ::attic::nix_store::NixStore; use anyhow::{anyhow, Context, Result}; +use axum::body::Body; use axum::{extract::Extension, routing::get, Router}; use clap::Parser; +use http_body_util::BodyExt; +use hyper_util::rt::{TokioExecutor, TokioIo}; +use futures::StreamExt; use tempfile::NamedTempFile; use tokio::fs::File; use tokio::io::AsyncWriteExt; +use tokio::net::UnixStream; use tokio::process::Command; use tokio::sync::{oneshot, Mutex, RwLock}; use tracing_subscriber::filter::EnvFilter; @@ -44,6 +49,9 @@ use tracing_subscriber::util::SubscriberInitExt; use gha_cache::Credentials; +const DETERMINATE_STATE_DIR: &str = "/nix/var/determinate"; +const DETERMINATE_NIXD_SOCKET_NAME: &str = "determinate-nixd.socket"; + type State = Arc; /// GitHub Actions-powered Nix binary cache @@ -292,65 +300,116 @@ async fn main_cli() -> Result<()> { None }; - /* Write the post-build hook script. Note that the shell script - * ignores errors, to avoid the Nix build from failing. */ - let post_build_hook_script = { - let mut file = NamedTempFile::with_prefix("magic-nix-cache-build-hook-") - .with_context(|| "Creating a temporary file for the post-build hook")?; - file.write_all( - format!( - // NOTE(cole-h): We want to exit 0 even if the hook failed, otherwise it'll fail the - // build itself - "#! /bin/sh\nRUST_LOG=trace RUST_BACKTRACE=full {} --server {} || :\n", - std::env::current_exe() - .with_context(|| "Getting the path of magic-nix-cache")? - .display(), - args.listen - ) - .as_bytes(), - ) - .with_context(|| "Writing the post-build hook")?; - let path = file - .keep() - .with_context(|| "Keeping the post-build hook")? - .1; + let dnixd_uds_socket_dir: &Path = Path::new(&DETERMINATE_STATE_DIR); + let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME); - fs::set_permissions(&path, fs::Permissions::from_mode(0o755)) - .with_context(|| "Setting permissions on the post-build hook")?; + if dnixd_uds_socket_path.exists() { + let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?); + // let (mut sender, conn): (SendRequest, _) = + // hyper::client::conn::http1::handshake(stream).await?; - /* Copy the script to the Nix store so we know for sure that - * it's accessible to the Nix daemon, which might have a - * different /tmp from us. */ - let res = Command::new("nix") - .args([ - "--extra-experimental-features", - "nix-command", - "store", - "add-path", - &path.display().to_string(), - ]) - .output() - .await?; - if res.status.success() { - tokio::fs::remove_file(path).await?; - PathBuf::from(String::from_utf8_lossy(&res.stdout).trim()) - } else { - path + let executor = TokioExecutor::new(); + let (mut sender, conn): (hyper::client::conn::http2::SendRequest, _) = + hyper::client::conn::http2::handshake(executor, stream) + .await + .unwrap(); + + // NOTE(colemickens): for now we just drop the joinhandle and let it keep running + let _join_handle = tokio::task::spawn(async move { + if let Err(err) = conn.await { + tracing::error!("Connection failed: {:?}", err); + } + }); + + let request = http::Request::builder() + .method(http::Method::GET) + .uri("http://localhost/built-paths") + .body(axum::body::Body::empty())?; + + let response = sender.send_request(request).await?; + + // NOTE(to self): there was a note somewhere to use this _IF_ you wanted all data, including trialers etc + // so ... we'll see.. + let mut data = response.into_data_stream(); + + while let Some(foo) = data.next().await { + tracing::info!("got {:?}", foo); } - }; + + // while let Some(frame_result) = response.frame().await { + // let frame = frame_result?; - /* Update nix.conf. */ - nix_conf - .write_all( - format!( - "fallback = true\npost-build-hook = {}\n", - post_build_hook_script.display() + // if let Some(segment) = frame.data_ref() { + // tokio::io::stdout() + // .write_all(segment.iter().as_slice()) + // .await?; + // } + // } + + // TODO: do something with these paths + } else { + // TODO: split into own function(s) + + /* Write the post-build hook script. Note that the shell script + * ignores errors, to avoid the Nix build from failing. */ + let post_build_hook_script = { + let mut file = NamedTempFile::with_prefix("magic-nix-cache-build-hook-") + .with_context(|| "Creating a temporary file for the post-build hook")?; + file.write_all( + format!( + // NOTE(cole-h): We want to exit 0 even if the hook failed, otherwise it'll fail the + // build itself + "#! /bin/sh\nRUST_LOG=trace RUST_BACKTRACE=full {} --server {} || :\n", + std::env::current_exe() + .with_context(|| "Getting the path of magic-nix-cache")? + .display(), + args.listen + ) + .as_bytes(), ) - .as_bytes(), - ) - .with_context(|| "Writing to nix.conf")?; + .with_context(|| "Writing the post-build hook")?; + let path = file + .keep() + .with_context(|| "Keeping the post-build hook")? + .1; - drop(nix_conf); + fs::set_permissions(&path, fs::Permissions::from_mode(0o755)) + .with_context(|| "Setting permissions on the post-build hook")?; + + /* Copy the script to the Nix store so we know for sure that + * it's accessible to the Nix daemon, which might have a + * different /tmp from us. */ + let res = Command::new("nix") + .args([ + "--extra-experimental-features", + "nix-command", + "store", + "add-path", + &path.display().to_string(), + ]) + .output() + .await?; + if res.status.success() { + tokio::fs::remove_file(path).await?; + PathBuf::from(String::from_utf8_lossy(&res.stdout).trim()) + } else { + path + } + }; + + /* Update nix.conf. */ + nix_conf + .write_all( + format!( + "fallback = true\npost-build-hook = {}\n", + post_build_hook_script.display() + ) + .as_bytes(), + ) + .with_context(|| "Writing to nix.conf")?; + + drop(nix_conf); + } let diagnostic_endpoint = match args.diagnostic_endpoint.as_str() { "" => { From 57eb3e75c0376a21eeaf62684a91d692f4d229c6 Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Wed, 7 Aug 2024 17:44:18 -0700 Subject: [PATCH 02/12] uds subscription: enqueue_paths --- magic-nix-cache/src/api.rs | 2 +- magic-nix-cache/src/main.rs | 77 ++++++++++++++++++------------------- 2 files changed, 38 insertions(+), 41 deletions(-) diff --git a/magic-nix-cache/src/api.rs b/magic-nix-cache/src/api.rs index ae76120..7aa6c63 100644 --- a/magic-nix-cache/src/api.rs +++ b/magic-nix-cache/src/api.rs @@ -147,7 +147,7 @@ async fn post_enqueue_paths( Ok(Json(EnqueuePathsResponse {})) } -async fn enqueue_paths(state: &State, store_paths: Vec) -> Result<()> { +pub async fn enqueue_paths(state: &State, store_paths: Vec) -> Result<()> { if let Some(gha_cache) = &state.gha_cache { gha_cache .enqueue_paths(state.store.clone(), store_paths.clone()) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 20aa088..5e1a90e 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -300,13 +300,36 @@ async fn main_cli() -> Result<()> { None }; + let diagnostic_endpoint = match args.diagnostic_endpoint.as_str() { + "" => { + tracing::info!("Diagnostics disabled."); + None + } + url => Some(url), + }; + + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + + let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new())); + let state = Arc::new(StateInner { + gha_cache, + upstream: args.upstream.clone(), + shutdown_sender: Mutex::new(Some(shutdown_sender)), + narinfo_negative_cache, + metrics, + store, + flakehub_state: RwLock::new(flakehub_state), + logfile: guard.logfile, + original_paths, + }); + let dnixd_uds_socket_dir: &Path = Path::new(&DETERMINATE_STATE_DIR); let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME); if dnixd_uds_socket_path.exists() { let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?); - // let (mut sender, conn): (SendRequest, _) = - // hyper::client::conn::http1::handshake(stream).await?; + + // TODO(colemickens): loop retry/reconnect/etc let executor = TokioExecutor::new(); let (mut sender, conn): (hyper::client::conn::http2::SendRequest, _) = @@ -327,26 +350,23 @@ async fn main_cli() -> Result<()> { .body(axum::body::Body::empty())?; let response = sender.send_request(request).await?; - - // NOTE(to self): there was a note somewhere to use this _IF_ you wanted all data, including trialers etc - // so ... we'll see.. let mut data = response.into_data_stream(); while let Some(foo) = data.next().await { tracing::info!("got {:?}", foo); + + + let sp = String::from_utf8_lossy(&foo.unwrap()).to_string(); + let store_paths = vec![sp]; + + // TODO(colemickens): error handling::: + let store_paths = store_paths + .iter() + .map(|path| state.store.follow_store_path(path).map_err(|_| anyhow!("ahhhhh"))) + .collect::>>()?; + + crate::api::enqueue_paths(&state, store_paths).await?; } - - // while let Some(frame_result) = response.frame().await { - // let frame = frame_result?; - - // if let Some(segment) = frame.data_ref() { - // tokio::io::stdout() - // .write_all(segment.iter().as_slice()) - // .await?; - // } - // } - - // TODO: do something with these paths } else { // TODO: split into own function(s) @@ -411,29 +431,6 @@ async fn main_cli() -> Result<()> { drop(nix_conf); } - let diagnostic_endpoint = match args.diagnostic_endpoint.as_str() { - "" => { - tracing::info!("Diagnostics disabled."); - None - } - url => Some(url), - }; - - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - - let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new())); - let state = Arc::new(StateInner { - gha_cache, - upstream: args.upstream.clone(), - shutdown_sender: Mutex::new(Some(shutdown_sender)), - narinfo_negative_cache, - metrics, - store, - flakehub_state: RwLock::new(flakehub_state), - logfile: guard.logfile, - original_paths, - }); - let app = Router::new() .route("/", get(root)) .merge(api::get_router()) From f82811cc9c82ed8ea1f0788bdaf44374dc2f173d Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Thu, 8 Aug 2024 16:23:24 -0700 Subject: [PATCH 03/12] hack: skip default keep-alive --- magic-nix-cache/src/main.rs | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 5e1a90e..ba8d747 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -37,6 +37,7 @@ use clap::Parser; use http_body_util::BodyExt; use hyper_util::rt::{TokioExecutor, TokioIo}; use futures::StreamExt; +use serde::{Deserialize,Serialize}; use tempfile::NamedTempFile; use tokio::fs::File; use tokio::io::AsyncWriteExt; @@ -52,6 +53,15 @@ use gha_cache::Credentials; const DETERMINATE_STATE_DIR: &str = "/nix/var/determinate"; const DETERMINATE_NIXD_SOCKET_NAME: &str = "determinate-nixd.socket"; + +// TODO(colemickens): refactor, move with other UDS stuff (or all PBH stuff) to new file +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "c", rename_all = "kebab-case")] +pub struct BuiltPathResponseEventV1 { + pub drv: PathBuf, + pub outputs: Vec, +} + type State = Arc; /// GitHub Actions-powered Nix binary cache @@ -352,15 +362,21 @@ async fn main_cli() -> Result<()> { let response = sender.send_request(request).await?; let mut data = response.into_data_stream(); - while let Some(foo) = data.next().await { - tracing::info!("got {:?}", foo); + while let Some(event_str) = data.next().await { + tracing::info!("got {:?}", event_str); + // TOOD: skip our keep-alive, maybe we should set it, we rely on the axum default "\n\n" right now + // but we need to skip those lines, anyway, and not bother trying to parse them - - let sp = String::from_utf8_lossy(&foo.unwrap()).to_string(); - let store_paths = vec![sp]; + // TODO(colemickens): error handle + let event_str = event_str.unwrap(); + if event_str == "\n\n" { + // TODO: hacky, could be better + continue + } + let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?; // TODO(colemickens): error handling::: - let store_paths = store_paths + let store_paths = event.outputs .iter() .map(|path| state.store.follow_store_path(path).map_err(|_| anyhow!("ahhhhh"))) .collect::>>()?; From c0b8f7b57b0e26d425fc8291e19e249251f15ce4 Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Thu, 8 Aug 2024 16:26:48 -0700 Subject: [PATCH 04/12] hack: stip data frame prefix --- magic-nix-cache/src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index ba8d747..9379c78 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -373,6 +373,8 @@ async fn main_cli() -> Result<()> { // TODO: hacky, could be better continue } + // TOOD: another sorta hack + let event_str = event_str.strip_prefix("data: ".as_bytes()).unwrap(); // TODO: omg let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?; // TODO(colemickens): error handling::: From 594748fe30344b9d5f0b906618892f98d87d6143 Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Fri, 9 Aug 2024 11:55:37 -0700 Subject: [PATCH 05/12] built-paths subscription: better heuristic for sipping event frames --- magic-nix-cache/src/main.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 9379c78..8ff28f3 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -363,18 +363,15 @@ async fn main_cli() -> Result<()> { let mut data = response.into_data_stream(); while let Some(event_str) = data.next().await { - tracing::info!("got {:?}", event_str); - // TOOD: skip our keep-alive, maybe we should set it, we rely on the axum default "\n\n" right now - // but we need to skip those lines, anyway, and not bother trying to parse them + let event_str = event_str.unwrap(); // TODO(colemickens): error handle - // TODO(colemickens): error handle - let event_str = event_str.unwrap(); - if event_str == "\n\n" { - // TODO: hacky, could be better - continue - } - // TOOD: another sorta hack - let event_str = event_str.strip_prefix("data: ".as_bytes()).unwrap(); // TODO: omg + let event_str = match event_str.strip_prefix("data: ".as_bytes()) { + Some(s) => s, + None => { + tracing::debug!("built-paths subscription: ignoring non-data frame"); + continue; + }, + }; let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?; // TODO(colemickens): error handling::: @@ -382,7 +379,8 @@ async fn main_cli() -> Result<()> { .iter() .map(|path| state.store.follow_store_path(path).map_err(|_| anyhow!("ahhhhh"))) .collect::>>()?; - + + tracing::debug!("about to enqueue paths: {:?}", store_paths); crate::api::enqueue_paths(&state, store_paths).await?; } } else { From 10cbd94f3c7c19af3c2a2ffc91128be4c570d9b1 Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Fri, 9 Aug 2024 13:58:24 -0700 Subject: [PATCH 06/12] flake.nix: plz obiwan rust-analyzer, save me --- flake.nix | 1 + 1 file changed, 1 insertion(+) diff --git a/flake.nix b/flake.nix index 6f3384b..c6b3a97 100644 --- a/flake.nix +++ b/flake.nix @@ -47,6 +47,7 @@ stable.cargo stable.rustfmt stable.rust-src + stable.rust-analyzer ] ++ nixpkgs.lib.optionals (system == "x86_64-linux") [ targets.x86_64-unknown-linux-musl.stable.rust-std ] ++ nixpkgs.lib.optionals (system == "aarch64-linux") [ From 0f476bd77503b31a06f28e01bcfecaa5acc7d027 Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Fri, 9 Aug 2024 13:58:24 -0700 Subject: [PATCH 07/12] factor 'legacy' pbh out into separate file --- magic-nix-cache/src/main.rs | 70 ++++++++----------------------------- magic-nix-cache/src/pbh.rs | 54 ++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 56 deletions(-) create mode 100644 magic-nix-cache/src/pbh.rs diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 8ff28f3..d8aaa6f 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -18,6 +18,7 @@ mod env; mod error; mod flakehub; mod gha; +mod pbh; mod telemetry; mod util; @@ -34,10 +35,10 @@ use anyhow::{anyhow, Context, Result}; use axum::body::Body; use axum::{extract::Extension, routing::get, Router}; use clap::Parser; +use futures::StreamExt; use http_body_util::BodyExt; use hyper_util::rt::{TokioExecutor, TokioIo}; -use futures::StreamExt; -use serde::{Deserialize,Serialize}; +use serde::{Deserialize, Serialize}; use tempfile::NamedTempFile; use tokio::fs::File; use tokio::io::AsyncWriteExt; @@ -53,7 +54,6 @@ use gha_cache::Credentials; const DETERMINATE_STATE_DIR: &str = "/nix/var/determinate"; const DETERMINATE_NIXD_SOCKET_NAME: &str = "determinate-nixd.socket"; - // TODO(colemickens): refactor, move with other UDS stuff (or all PBH stuff) to new file #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "c", rename_all = "kebab-case")] @@ -338,7 +338,7 @@ async fn main_cli() -> Result<()> { if dnixd_uds_socket_path.exists() { let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?); - + // TODO(colemickens): loop retry/reconnect/etc let executor = TokioExecutor::new(); @@ -370,14 +370,20 @@ async fn main_cli() -> Result<()> { None => { tracing::debug!("built-paths subscription: ignoring non-data frame"); continue; - }, + } }; let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?; // TODO(colemickens): error handling::: - let store_paths = event.outputs + let store_paths = event + .outputs .iter() - .map(|path| state.store.follow_store_path(path).map_err(|_| anyhow!("ahhhhh"))) + .map(|path| { + state + .store + .follow_store_path(path) + .map_err(|_| anyhow!("ahhhhh")) + }) .collect::>>()?; tracing::debug!("about to enqueue paths: {:?}", store_paths); @@ -520,58 +526,10 @@ async fn main_cli() -> Result<()> { Ok(()) } -async fn post_build_hook(out_paths: &str) -> Result<()> { - #[derive(Parser, Debug)] - struct Args { - /// `magic-nix-cache` daemon to connect to. - #[arg(short = 'l', long, default_value = "127.0.0.1:3000")] - server: SocketAddr, - } - - let args = Args::parse(); - - let store_paths: Vec<_> = out_paths - .split_whitespace() - .map(|s| s.trim().to_owned()) - .collect(); - - let request = api::EnqueuePathsRequest { store_paths }; - - let response = reqwest::Client::new() - .post(format!("http://{}/api/enqueue-paths", &args.server)) - .header(reqwest::header::CONTENT_TYPE, "application/json") - .body( - serde_json::to_string(&request) - .with_context(|| "Decoding the response from the magic-nix-cache server")?, - ) - .send() - .await; - - match response { - Ok(response) if !response.status().is_success() => Err(anyhow!( - "magic-nix-cache server failed to enqueue the push request: {}\n{}", - response.status(), - response - .text() - .await - .unwrap_or_else(|_| "".to_owned()), - ))?, - Ok(response) => response - .json::() - .await - .with_context(|| "magic-nix-cache-server didn't return a valid response")?, - Err(err) => { - Err(err).with_context(|| "magic-nix-cache server failed to send the enqueue request")? - } - }; - - Ok(()) -} - #[tokio::main] async fn main() -> Result<()> { match std::env::var("OUT_PATHS") { - Ok(out_paths) => post_build_hook(&out_paths).await, + Ok(out_paths) => pbh::post_build_hook(&out_paths).await, Err(_) => main_cli().await, } } diff --git a/magic-nix-cache/src/pbh.rs b/magic-nix-cache/src/pbh.rs new file mode 100644 index 0000000..a0becb8 --- /dev/null +++ b/magic-nix-cache/src/pbh.rs @@ -0,0 +1,54 @@ +use std::net::SocketAddr; + +use anyhow::anyhow; +use anyhow::Context as _; +use anyhow::Result; +use clap::Parser; + +pub async fn post_build_hook(out_paths: &str) -> Result<()> { + #[derive(Parser, Debug)] + struct Args { + /// `magic-nix-cache` daemon to connect to. + #[arg(short = 'l', long, default_value = "127.0.0.1:3000")] + server: SocketAddr, + } + + let args = Args::parse(); + + let store_paths: Vec<_> = out_paths + .split_whitespace() + .map(|s| s.trim().to_owned()) + .collect(); + + let request = crate::api::EnqueuePathsRequest { store_paths }; + + let response = reqwest::Client::new() + .post(format!("http://{}/api/enqueue-paths", &args.server)) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body( + serde_json::to_string(&request) + .with_context(|| "Decoding the response from the magic-nix-cache server")?, + ) + .send() + .await; + + match response { + Ok(response) if !response.status().is_success() => Err(anyhow!( + "magic-nix-cache server failed to enqueue the push request: {}\n{}", + response.status(), + response + .text() + .await + .unwrap_or_else(|_| "".to_owned()), + ))?, + Ok(response) => response + .json::() + .await + .with_context(|| "magic-nix-cache-server didn't return a valid response")?, + Err(err) => { + Err(err).with_context(|| "magic-nix-cache server failed to send the enqueue request")? + } + }; + + Ok(()) +} From 3a001d12e58833c9439d456a40b609596163feea Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Mon, 12 Aug 2024 10:50:51 -0700 Subject: [PATCH 08/12] built-store-paths: finish pbh code rearrangement --- magic-nix-cache/src/main.rs | 119 ++---------------------------- magic-nix-cache/src/pbh.rs | 142 +++++++++++++++++++++++++++++++++++- 2 files changed, 146 insertions(+), 115 deletions(-) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index d8aaa6f..dfa9a7b 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -337,122 +337,13 @@ async fn main_cli() -> Result<()> { let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME); if dnixd_uds_socket_path.exists() { - let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?); - - // TODO(colemickens): loop retry/reconnect/etc - - let executor = TokioExecutor::new(); - let (mut sender, conn): (hyper::client::conn::http2::SendRequest, _) = - hyper::client::conn::http2::handshake(executor, stream) - .await - .unwrap(); - - // NOTE(colemickens): for now we just drop the joinhandle and let it keep running - let _join_handle = tokio::task::spawn(async move { - if let Err(err) = conn.await { - tracing::error!("Connection failed: {:?}", err); - } - }); - - let request = http::Request::builder() - .method(http::Method::GET) - .uri("http://localhost/built-paths") - .body(axum::body::Body::empty())?; - - let response = sender.send_request(request).await?; - let mut data = response.into_data_stream(); - - while let Some(event_str) = data.next().await { - let event_str = event_str.unwrap(); // TODO(colemickens): error handle - - let event_str = match event_str.strip_prefix("data: ".as_bytes()) { - Some(s) => s, - None => { - tracing::debug!("built-paths subscription: ignoring non-data frame"); - continue; - } - }; - let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?; - - // TODO(colemickens): error handling::: - let store_paths = event - .outputs - .iter() - .map(|path| { - state - .store - .follow_store_path(path) - .map_err(|_| anyhow!("ahhhhh")) - }) - .collect::>>()?; - - tracing::debug!("about to enqueue paths: {:?}", store_paths); - crate::api::enqueue_paths(&state, store_paths).await?; - } + crate::pbh::subscribe_uds_post_build_hook(&dnixd_uds_socket_path, state.clone()).await?; } else { - // TODO: split into own function(s) - - /* Write the post-build hook script. Note that the shell script - * ignores errors, to avoid the Nix build from failing. */ - let post_build_hook_script = { - let mut file = NamedTempFile::with_prefix("magic-nix-cache-build-hook-") - .with_context(|| "Creating a temporary file for the post-build hook")?; - file.write_all( - format!( - // NOTE(cole-h): We want to exit 0 even if the hook failed, otherwise it'll fail the - // build itself - "#! /bin/sh\nRUST_LOG=trace RUST_BACKTRACE=full {} --server {} || :\n", - std::env::current_exe() - .with_context(|| "Getting the path of magic-nix-cache")? - .display(), - args.listen - ) - .as_bytes(), - ) - .with_context(|| "Writing the post-build hook")?; - let path = file - .keep() - .with_context(|| "Keeping the post-build hook")? - .1; - - fs::set_permissions(&path, fs::Permissions::from_mode(0o755)) - .with_context(|| "Setting permissions on the post-build hook")?; - - /* Copy the script to the Nix store so we know for sure that - * it's accessible to the Nix daemon, which might have a - * different /tmp from us. */ - let res = Command::new("nix") - .args([ - "--extra-experimental-features", - "nix-command", - "store", - "add-path", - &path.display().to_string(), - ]) - .output() - .await?; - if res.status.success() { - tokio::fs::remove_file(path).await?; - PathBuf::from(String::from_utf8_lossy(&res.stdout).trim()) - } else { - path - } - }; - - /* Update nix.conf. */ - nix_conf - .write_all( - format!( - "fallback = true\npost-build-hook = {}\n", - post_build_hook_script.display() - ) - .as_bytes(), - ) - .with_context(|| "Writing to nix.conf")?; - - drop(nix_conf); + crate::pbh::setup_legacy_post_build_hook(&args.listen, &mut nix_conf).await?; } + drop(nix_conf); + let app = Router::new() .route("/", get(root)) .merge(api::get_router()) @@ -529,7 +420,7 @@ async fn main_cli() -> Result<()> { #[tokio::main] async fn main() -> Result<()> { match std::env::var("OUT_PATHS") { - Ok(out_paths) => pbh::post_build_hook(&out_paths).await, + Ok(out_paths) => pbh::handle_legacy_post_build_hook(&out_paths).await, Err(_) => main_cli().await, } } diff --git a/magic-nix-cache/src/pbh.rs b/magic-nix-cache/src/pbh.rs index a0becb8..a81f62c 100644 --- a/magic-nix-cache/src/pbh.rs +++ b/magic-nix-cache/src/pbh.rs @@ -1,11 +1,151 @@ +use std::io::Write as _; use std::net::SocketAddr; +use std::os::unix::fs::PermissionsExt as _; +use std::path::Path; +use std::path::PathBuf; use anyhow::anyhow; use anyhow::Context as _; use anyhow::Result; +use axum::body::Body; use clap::Parser; +use futures::StreamExt as _; +use http_body_util::BodyExt as _; +use hyper_util::rt::TokioExecutor; +use hyper_util::rt::TokioIo; +use tempfile::NamedTempFile; +use tokio::net::UnixStream; +use tokio::process::Command; -pub async fn post_build_hook(out_paths: &str) -> Result<()> { +use crate::BuiltPathResponseEventV1; +use crate::State; + +pub async fn subscribe_uds_post_build_hook( + dnixd_uds_socket_path: &Path, + state: State, +) -> Result<()> { + let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?); + + // TODO(colemickens): loop retry/reconnect/etc + + let executor = TokioExecutor::new(); + let (mut sender, conn): (hyper::client::conn::http2::SendRequest, _) = + hyper::client::conn::http2::handshake(executor, stream) + .await + .unwrap(); + + // NOTE(colemickens): for now we just drop the joinhandle and let it keep running + let _join_handle = tokio::task::spawn(async move { + if let Err(err) = conn.await { + tracing::error!("Connection failed: {:?}", err); + } + }); + + let request = http::Request::builder() + .method(http::Method::GET) + .uri("http://localhost/built-paths") + .body(axum::body::Body::empty())?; + + let response = sender.send_request(request).await?; + let mut data = response.into_data_stream(); + + while let Some(event_str) = data.next().await { + let event_str = event_str.unwrap(); // TODO(colemickens): error handle + + let event_str = match event_str.strip_prefix("data: ".as_bytes()) { + Some(s) => s, + None => { + tracing::debug!("built-paths subscription: ignoring non-data frame"); + continue; + } + }; + let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?; + + // TODO(colemickens): error handling::: + let store_paths = event + .outputs + .iter() + .map(|path| { + state + .store + .follow_store_path(path) + .map_err(|_| anyhow!("ahhhhh")) + }) + .collect::>>()?; + + tracing::debug!("about to enqueue paths: {:?}", store_paths); + crate::api::enqueue_paths(&state, store_paths).await?; + } + + Ok(()) +} + +pub async fn setup_legacy_post_build_hook( + listen: &SocketAddr, + nix_conf: &mut std::fs::File, +) -> Result<()> { + /* Write the post-build hook script. Note that the shell script + * ignores errors, to avoid the Nix build from failing. */ + let post_build_hook_script = { + let mut file = NamedTempFile::with_prefix("magic-nix-cache-build-hook-") + .with_context(|| "Creating a temporary file for the post-build hook")?; + file.write_all( + format!( + // NOTE(cole-h): We want to exit 0 even if the hook failed, otherwise it'll fail the + // build itself + "#! /bin/sh\nRUST_LOG=trace RUST_BACKTRACE=full {} --server {} || :\n", + std::env::current_exe() + .with_context(|| "Getting the path of magic-nix-cache")? + .display(), + listen + ) + .as_bytes(), + ) + .with_context(|| "Writing the post-build hook")?; + let path = file + .keep() + .with_context(|| "Keeping the post-build hook")? + .1; + + std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755)) + .with_context(|| "Setting permissions on the post-build hook")?; + + /* Copy the script to the Nix store so we know for sure that + * it's accessible to the Nix daemon, which might have a + * different /tmp from us. */ + let res = Command::new("nix") + .args([ + "--extra-experimental-features", + "nix-command", + "store", + "add-path", + &path.display().to_string(), + ]) + .output() + .await?; + if res.status.success() { + tokio::fs::remove_file(path).await?; + PathBuf::from(String::from_utf8_lossy(&res.stdout).trim()) + } else { + path + } + }; + + /* Update nix.conf. */ + nix_conf + .write_all( + format!( + "fallback = true\npost-build-hook = {}\n", + post_build_hook_script.display() + ) + .as_bytes(), + ) + .with_context(|| "Writing to nix.conf")?; + + Ok(()) +} + +pub async fn handle_legacy_post_build_hook(out_paths: &str) -> Result<()> { #[derive(Parser, Debug)] struct Args { /// `magic-nix-cache` daemon to connect to. From 685fe75327d24520ccc41aef107dfc4729c580a9 Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Mon, 12 Aug 2024 12:34:20 -0700 Subject: [PATCH 09/12] built-store-paths: mnc can reconnect in dnixd restarts --- magic-nix-cache/src/main.rs | 10 +-- magic-nix-cache/src/pbh.rs | 140 +++++++++++++++++++++++------------- 2 files changed, 91 insertions(+), 59 deletions(-) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index dfa9a7b..2e49c75 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -26,24 +26,16 @@ use std::collections::HashSet; use std::fs::{self, create_dir_all}; use std::io::Write; use std::net::SocketAddr; -use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use std::sync::Arc; use ::attic::nix_store::NixStore; use anyhow::{anyhow, Context, Result}; -use axum::body::Body; use axum::{extract::Extension, routing::get, Router}; use clap::Parser; -use futures::StreamExt; -use http_body_util::BodyExt; -use hyper_util::rt::{TokioExecutor, TokioIo}; use serde::{Deserialize, Serialize}; -use tempfile::NamedTempFile; use tokio::fs::File; use tokio::io::AsyncWriteExt; -use tokio::net::UnixStream; -use tokio::process::Command; use tokio::sync::{oneshot, Mutex, RwLock}; use tracing_subscriber::filter::EnvFilter; use tracing_subscriber::layer::SubscriberExt; @@ -337,7 +329,7 @@ async fn main_cli() -> Result<()> { let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME); if dnixd_uds_socket_path.exists() { - crate::pbh::subscribe_uds_post_build_hook(&dnixd_uds_socket_path, state.clone()).await?; + crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone()).await?; } else { crate::pbh::setup_legacy_post_build_hook(&args.listen, &mut nix_conf).await?; } diff --git a/magic-nix-cache/src/pbh.rs b/magic-nix-cache/src/pbh.rs index a81f62c..c12c84e 100644 --- a/magic-nix-cache/src/pbh.rs +++ b/magic-nix-cache/src/pbh.rs @@ -1,13 +1,11 @@ use std::io::Write as _; use std::net::SocketAddr; use std::os::unix::fs::PermissionsExt as _; -use std::path::Path; use std::path::PathBuf; use anyhow::anyhow; use anyhow::Context as _; use anyhow::Result; -use axum::body::Body; use clap::Parser; use futures::StreamExt as _; use http_body_util::BodyExt as _; @@ -21,62 +19,104 @@ use crate::BuiltPathResponseEventV1; use crate::State; pub async fn subscribe_uds_post_build_hook( - dnixd_uds_socket_path: &Path, + dnixd_uds_socket_path: PathBuf, state: State, ) -> Result<()> { - let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?); + tokio::spawn(async move { + let dnixd_uds_socket_path = &dnixd_uds_socket_path; + loop { + let Ok(socket_conn) = UnixStream::connect(dnixd_uds_socket_path).await else { + tracing::error!("built-paths: failed to connect to determinate-nixd's socket"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + continue; + }; + let stream = TokioIo::new(socket_conn); + let executor: TokioExecutor = TokioExecutor::new(); - // TODO(colemickens): loop retry/reconnect/etc + let sender_conn = hyper::client::conn::http2::handshake(executor, stream).await; - let executor = TokioExecutor::new(); - let (mut sender, conn): (hyper::client::conn::http2::SendRequest, _) = - hyper::client::conn::http2::handshake(executor, stream) - .await - .unwrap(); + let Ok((mut sender, conn)) = sender_conn else { + tracing::error!("built-paths: failed to http2 handshake"); + continue; + }; - // NOTE(colemickens): for now we just drop the joinhandle and let it keep running - let _join_handle = tokio::task::spawn(async move { - if let Err(err) = conn.await { - tracing::error!("Connection failed: {:?}", err); + // NOTE(colemickens): for now we just drop the joinhandle and let it keep running + let _join_handle = tokio::task::spawn(async move { + if let Err(err) = conn.await { + tracing::error!("Connection failed: {:?}", err); + } + }); + + let request = http::Request::builder() + .method(http::Method::GET) + .uri("http://localhost/built-paths") + .body(axum::body::Body::empty()); + let Ok(request) = request else { + tracing::error!("built-paths: failed to create request to subscribe"); + continue; + }; + + let response = sender.send_request(request).await; + let Ok(response) = response else { + tracing::error!("buit-paths: failed to send subscription request"); + continue; + }; + let mut data = response.into_data_stream(); + + while let Some(event_str) = data.next().await { + let event_str = match event_str { + Ok(event) => event, + Err(e) => { + tracing::error!("built-paths: error while receiving: {}", e); + break; + } + }; + + let Some(event_str) = event_str.strip_prefix("data: ".as_bytes()) else { + tracing::debug!("built-paths subscription: ignoring non-data frame"); + continue; + }; + let Ok(event): core::result::Result = + serde_json::from_slice(event_str) + else { + tracing::error!( + "failed to decode built-path response as BuiltPathResponseEventV1" + ); + continue; + }; + + // TODO(colemickens): error handling::: + let maybe_store_paths = event + .outputs + .iter() + .map(|path| { + state + .store + .follow_store_path(path) + .map_err(|_| anyhow!("ahhhhh")) + }) + .collect::>>(); + + let Ok(store_paths) = maybe_store_paths else { + tracing::error!( + "built-paths: encountered an error aggregating build store paths" + ); + continue; + }; + + tracing::debug!("about to enqueue paths: {:?}", store_paths); + if let Err(e) = crate::api::enqueue_paths(&state, store_paths).await { + tracing::error!( + "built-paths: failed to enqueue paths for drv ({}): {}", + event.drv.display(), + e + ); + continue; + } + } } }); - let request = http::Request::builder() - .method(http::Method::GET) - .uri("http://localhost/built-paths") - .body(axum::body::Body::empty())?; - - let response = sender.send_request(request).await?; - let mut data = response.into_data_stream(); - - while let Some(event_str) = data.next().await { - let event_str = event_str.unwrap(); // TODO(colemickens): error handle - - let event_str = match event_str.strip_prefix("data: ".as_bytes()) { - Some(s) => s, - None => { - tracing::debug!("built-paths subscription: ignoring non-data frame"); - continue; - } - }; - let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?; - - // TODO(colemickens): error handling::: - let store_paths = event - .outputs - .iter() - .map(|path| { - state - .store - .follow_store_path(path) - .map_err(|_| anyhow!("ahhhhh")) - }) - .collect::>>()?; - - tracing::debug!("about to enqueue paths: {:?}", store_paths); - crate::api::enqueue_paths(&state, store_paths).await?; - } - Ok(()) } From a6daff9a652f49001d65824aa17308b5dc088cbd Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Mon, 12 Aug 2024 18:43:57 -0700 Subject: [PATCH 10/12] built-paths: switch to using /events from /built-paths --- magic-nix-cache/src/pbh.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/magic-nix-cache/src/pbh.rs b/magic-nix-cache/src/pbh.rs index c12c84e..cabf407 100644 --- a/magic-nix-cache/src/pbh.rs +++ b/magic-nix-cache/src/pbh.rs @@ -49,7 +49,7 @@ pub async fn subscribe_uds_post_build_hook( let request = http::Request::builder() .method(http::Method::GET) - .uri("http://localhost/built-paths") + .uri("http://localhost/events") .body(axum::body::Body::empty()); let Ok(request) = request else { tracing::error!("built-paths: failed to create request to subscribe"); From c41207df35cdb504efe35bccee37f676384f8017 Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Tue, 13 Aug 2024 12:19:55 -0700 Subject: [PATCH 11/12] built-paths: feedback around error handling for store path collection --- magic-nix-cache/src/pbh.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/magic-nix-cache/src/pbh.rs b/magic-nix-cache/src/pbh.rs index cabf407..62f2823 100644 --- a/magic-nix-cache/src/pbh.rs +++ b/magic-nix-cache/src/pbh.rs @@ -85,7 +85,6 @@ pub async fn subscribe_uds_post_build_hook( continue; }; - // TODO(colemickens): error handling::: let maybe_store_paths = event .outputs .iter() @@ -93,7 +92,7 @@ pub async fn subscribe_uds_post_build_hook( state .store .follow_store_path(path) - .map_err(|_| anyhow!("ahhhhh")) + .map_err(|_| anyhow!("failed to collect store paths")) }) .collect::>>(); From 7c6300cfdcdc6d6fcd6adeccd4f2ed214da1e0f0 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Tue, 13 Aug 2024 23:39:13 -0400 Subject: [PATCH 12/12] Update magic-nix-cache/src/main.rs --- magic-nix-cache/src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 2e49c75..4c26bcc 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -329,8 +329,10 @@ async fn main_cli() -> Result<()> { let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME); if dnixd_uds_socket_path.exists() { + tracing::info!("Subscribing to Determinate Nixd build events."); crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone()).await?; } else { + tracing::info!("Patching nix.conf to use a post-build-hook."); crate::pbh::setup_legacy_post_build_hook(&args.listen, &mut nix_conf).await?; }