diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 8ff28f3..d8aaa6f 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -18,6 +18,7 @@ mod env; mod error; mod flakehub; mod gha; +mod pbh; mod telemetry; mod util; @@ -34,10 +35,10 @@ use anyhow::{anyhow, Context, Result}; use axum::body::Body; use axum::{extract::Extension, routing::get, Router}; use clap::Parser; +use futures::StreamExt; use http_body_util::BodyExt; use hyper_util::rt::{TokioExecutor, TokioIo}; -use futures::StreamExt; -use serde::{Deserialize,Serialize}; +use serde::{Deserialize, Serialize}; use tempfile::NamedTempFile; use tokio::fs::File; use tokio::io::AsyncWriteExt; @@ -53,7 +54,6 @@ use gha_cache::Credentials; const DETERMINATE_STATE_DIR: &str = "/nix/var/determinate"; const DETERMINATE_NIXD_SOCKET_NAME: &str = "determinate-nixd.socket"; - // TODO(colemickens): refactor, move with other UDS stuff (or all PBH stuff) to new file #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "c", rename_all = "kebab-case")] @@ -338,7 +338,7 @@ async fn main_cli() -> Result<()> { if dnixd_uds_socket_path.exists() { let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?); - + // TODO(colemickens): loop retry/reconnect/etc let executor = TokioExecutor::new(); @@ -370,14 +370,20 @@ async fn main_cli() -> Result<()> { None => { tracing::debug!("built-paths subscription: ignoring non-data frame"); continue; - }, + } }; let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?; // TODO(colemickens): error handling::: - let store_paths = event.outputs + let store_paths = event + .outputs .iter() - .map(|path| state.store.follow_store_path(path).map_err(|_| anyhow!("ahhhhh"))) + .map(|path| { + state + .store + .follow_store_path(path) + .map_err(|_| anyhow!("ahhhhh")) + }) .collect::>>()?; tracing::debug!("about to enqueue paths: {:?}", store_paths); @@ -520,58 +526,10 @@ async fn main_cli() -> Result<()> { Ok(()) } -async fn post_build_hook(out_paths: &str) -> Result<()> { - #[derive(Parser, Debug)] - struct Args { - /// `magic-nix-cache` daemon to connect to. - #[arg(short = 'l', long, default_value = "127.0.0.1:3000")] - server: SocketAddr, - } - - let args = Args::parse(); - - let store_paths: Vec<_> = out_paths - .split_whitespace() - .map(|s| s.trim().to_owned()) - .collect(); - - let request = api::EnqueuePathsRequest { store_paths }; - - let response = reqwest::Client::new() - .post(format!("http://{}/api/enqueue-paths", &args.server)) - .header(reqwest::header::CONTENT_TYPE, "application/json") - .body( - serde_json::to_string(&request) - .with_context(|| "Decoding the response from the magic-nix-cache server")?, - ) - .send() - .await; - - match response { - Ok(response) if !response.status().is_success() => Err(anyhow!( - "magic-nix-cache server failed to enqueue the push request: {}\n{}", - response.status(), - response - .text() - .await - .unwrap_or_else(|_| "".to_owned()), - ))?, - Ok(response) => response - .json::() - .await - .with_context(|| "magic-nix-cache-server didn't return a valid response")?, - Err(err) => { - Err(err).with_context(|| "magic-nix-cache server failed to send the enqueue request")? - } - }; - - Ok(()) -} - #[tokio::main] async fn main() -> Result<()> { match std::env::var("OUT_PATHS") { - Ok(out_paths) => post_build_hook(&out_paths).await, + Ok(out_paths) => pbh::post_build_hook(&out_paths).await, Err(_) => main_cli().await, } } diff --git a/magic-nix-cache/src/pbh.rs b/magic-nix-cache/src/pbh.rs new file mode 100644 index 0000000..a0becb8 --- /dev/null +++ b/magic-nix-cache/src/pbh.rs @@ -0,0 +1,54 @@ +use std::net::SocketAddr; + +use anyhow::anyhow; +use anyhow::Context as _; +use anyhow::Result; +use clap::Parser; + +pub async fn post_build_hook(out_paths: &str) -> Result<()> { + #[derive(Parser, Debug)] + struct Args { + /// `magic-nix-cache` daemon to connect to. + #[arg(short = 'l', long, default_value = "127.0.0.1:3000")] + server: SocketAddr, + } + + let args = Args::parse(); + + let store_paths: Vec<_> = out_paths + .split_whitespace() + .map(|s| s.trim().to_owned()) + .collect(); + + let request = crate::api::EnqueuePathsRequest { store_paths }; + + let response = reqwest::Client::new() + .post(format!("http://{}/api/enqueue-paths", &args.server)) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body( + serde_json::to_string(&request) + .with_context(|| "Decoding the response from the magic-nix-cache server")?, + ) + .send() + .await; + + match response { + Ok(response) if !response.status().is_success() => Err(anyhow!( + "magic-nix-cache server failed to enqueue the push request: {}\n{}", + response.status(), + response + .text() + .await + .unwrap_or_else(|_| "".to_owned()), + ))?, + Ok(response) => response + .json::() + .await + .with_context(|| "magic-nix-cache-server didn't return a valid response")?, + Err(err) => { + Err(err).with_context(|| "magic-nix-cache server failed to send the enqueue request")? + } + }; + + Ok(()) +}