Merge pull request #83 from DeterminateSystems/colemickens/subscribe-uds-built-paths

subscribe to determinate-nixd's uds socket and built-paths endpoint when available
This commit is contained in:
Cole Mickens 2024-08-15 16:41:31 -05:00 committed by GitHub
commit deeb8d1d79
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 262 additions and 113 deletions

View file

@ -47,6 +47,7 @@
stable.cargo
stable.rustfmt
stable.rust-src
stable.rust-analyzer
] ++ nixpkgs.lib.optionals (system == "x86_64-linux") [
targets.x86_64-unknown-linux-musl.stable.rust-std
] ++ nixpkgs.lib.optionals (system == "aarch64-linux") [

View file

@ -147,7 +147,7 @@ async fn post_enqueue_paths(
Ok(Json(EnqueuePathsResponse {}))
}
async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
pub async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
if let Some(gha_cache) = &state.gha_cache {
gha_cache
.enqueue_paths(state.store.clone(), store_paths.clone())

View file

@ -18,6 +18,7 @@ mod env;
mod error;
mod flakehub;
mod gha;
mod pbh;
mod telemetry;
mod util;
@ -25,7 +26,6 @@ use std::collections::HashSet;
use std::fs::{self, create_dir_all};
use std::io::Write;
use std::net::SocketAddr;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@ -33,10 +33,9 @@ use ::attic::nix_store::NixStore;
use anyhow::{anyhow, Context, Result};
use axum::{extract::Extension, routing::get, Router};
use clap::Parser;
use tempfile::NamedTempFile;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::sync::{oneshot, Mutex, RwLock};
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
@ -44,6 +43,17 @@ use tracing_subscriber::util::SubscriberInitExt;
use gha_cache::Credentials;
const DETERMINATE_STATE_DIR: &str = "/nix/var/determinate";
const DETERMINATE_NIXD_SOCKET_NAME: &str = "determinate-nixd.socket";
// TODO(colemickens): refactor, move with other UDS stuff (or all PBH stuff) to new file
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "c", rename_all = "kebab-case")]
pub struct BuiltPathResponseEventV1 {
pub drv: PathBuf,
pub outputs: Vec<PathBuf>,
}
type State = Arc<StateInner>;
/// GitHub Actions-powered Nix binary cache
@ -292,66 +302,6 @@ async fn main_cli() -> Result<()> {
None
};
/* Write the post-build hook script. Note that the shell script
* ignores errors, to avoid the Nix build from failing. */
let post_build_hook_script = {
let mut file = NamedTempFile::with_prefix("magic-nix-cache-build-hook-")
.with_context(|| "Creating a temporary file for the post-build hook")?;
file.write_all(
format!(
// NOTE(cole-h): We want to exit 0 even if the hook failed, otherwise it'll fail the
// build itself
"#! /bin/sh\nRUST_LOG=trace RUST_BACKTRACE=full {} --server {} || :\n",
std::env::current_exe()
.with_context(|| "Getting the path of magic-nix-cache")?
.display(),
args.listen
)
.as_bytes(),
)
.with_context(|| "Writing the post-build hook")?;
let path = file
.keep()
.with_context(|| "Keeping the post-build hook")?
.1;
fs::set_permissions(&path, fs::Permissions::from_mode(0o755))
.with_context(|| "Setting permissions on the post-build hook")?;
/* Copy the script to the Nix store so we know for sure that
* it's accessible to the Nix daemon, which might have a
* different /tmp from us. */
let res = Command::new("nix")
.args([
"--extra-experimental-features",
"nix-command",
"store",
"add-path",
&path.display().to_string(),
])
.output()
.await?;
if res.status.success() {
tokio::fs::remove_file(path).await?;
PathBuf::from(String::from_utf8_lossy(&res.stdout).trim())
} else {
path
}
};
/* Update nix.conf. */
nix_conf
.write_all(
format!(
"fallback = true\npost-build-hook = {}\n",
post_build_hook_script.display()
)
.as_bytes(),
)
.with_context(|| "Writing to nix.conf")?;
drop(nix_conf);
let diagnostic_endpoint = match args.diagnostic_endpoint.as_str() {
"" => {
tracing::info!("Diagnostics disabled.");
@ -375,6 +325,19 @@ async fn main_cli() -> Result<()> {
original_paths,
});
let dnixd_uds_socket_dir: &Path = Path::new(&DETERMINATE_STATE_DIR);
let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME);
if dnixd_uds_socket_path.exists() {
tracing::info!("Subscribing to Determinate Nixd build events.");
crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone()).await?;
} else {
tracing::info!("Patching nix.conf to use a post-build-hook.");
crate::pbh::setup_legacy_post_build_hook(&args.listen, &mut nix_conf).await?;
}
drop(nix_conf);
let app = Router::new()
.route("/", get(root))
.merge(api::get_router())
@ -448,58 +411,10 @@ async fn main_cli() -> Result<()> {
Ok(())
}
async fn post_build_hook(out_paths: &str) -> Result<()> {
#[derive(Parser, Debug)]
struct Args {
/// `magic-nix-cache` daemon to connect to.
#[arg(short = 'l', long, default_value = "127.0.0.1:3000")]
server: SocketAddr,
}
let args = Args::parse();
let store_paths: Vec<_> = out_paths
.split_whitespace()
.map(|s| s.trim().to_owned())
.collect();
let request = api::EnqueuePathsRequest { store_paths };
let response = reqwest::Client::new()
.post(format!("http://{}/api/enqueue-paths", &args.server))
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(
serde_json::to_string(&request)
.with_context(|| "Decoding the response from the magic-nix-cache server")?,
)
.send()
.await;
match response {
Ok(response) if !response.status().is_success() => Err(anyhow!(
"magic-nix-cache server failed to enqueue the push request: {}\n{}",
response.status(),
response
.text()
.await
.unwrap_or_else(|_| "<no response text>".to_owned()),
))?,
Ok(response) => response
.json::<api::EnqueuePathsResponse>()
.await
.with_context(|| "magic-nix-cache-server didn't return a valid response")?,
Err(err) => {
Err(err).with_context(|| "magic-nix-cache server failed to send the enqueue request")?
}
};
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
match std::env::var("OUT_PATHS") {
Ok(out_paths) => post_build_hook(&out_paths).await,
Ok(out_paths) => pbh::handle_legacy_post_build_hook(&out_paths).await,
Err(_) => main_cli().await,
}
}

233
magic-nix-cache/src/pbh.rs Normal file
View file

@ -0,0 +1,233 @@
use std::io::Write as _;
use std::net::SocketAddr;
use std::os::unix::fs::PermissionsExt as _;
use std::path::PathBuf;
use anyhow::anyhow;
use anyhow::Context as _;
use anyhow::Result;
use clap::Parser;
use futures::StreamExt as _;
use http_body_util::BodyExt as _;
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioIo;
use tempfile::NamedTempFile;
use tokio::net::UnixStream;
use tokio::process::Command;
use crate::BuiltPathResponseEventV1;
use crate::State;
pub async fn subscribe_uds_post_build_hook(
dnixd_uds_socket_path: PathBuf,
state: State,
) -> Result<()> {
tokio::spawn(async move {
let dnixd_uds_socket_path = &dnixd_uds_socket_path;
loop {
let Ok(socket_conn) = UnixStream::connect(dnixd_uds_socket_path).await else {
tracing::error!("built-paths: failed to connect to determinate-nixd's socket");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
continue;
};
let stream = TokioIo::new(socket_conn);
let executor: TokioExecutor = TokioExecutor::new();
let sender_conn = hyper::client::conn::http2::handshake(executor, stream).await;
let Ok((mut sender, conn)) = sender_conn else {
tracing::error!("built-paths: failed to http2 handshake");
continue;
};
// NOTE(colemickens): for now we just drop the joinhandle and let it keep running
let _join_handle = tokio::task::spawn(async move {
if let Err(err) = conn.await {
tracing::error!("Connection failed: {:?}", err);
}
});
let request = http::Request::builder()
.method(http::Method::GET)
.uri("http://localhost/events")
.body(axum::body::Body::empty());
let Ok(request) = request else {
tracing::error!("built-paths: failed to create request to subscribe");
continue;
};
let response = sender.send_request(request).await;
let Ok(response) = response else {
tracing::error!("buit-paths: failed to send subscription request");
continue;
};
let mut data = response.into_data_stream();
while let Some(event_str) = data.next().await {
let event_str = match event_str {
Ok(event) => event,
Err(e) => {
tracing::error!("built-paths: error while receiving: {}", e);
break;
}
};
let Some(event_str) = event_str.strip_prefix("data: ".as_bytes()) else {
tracing::debug!("built-paths subscription: ignoring non-data frame");
continue;
};
let Ok(event): core::result::Result<BuiltPathResponseEventV1, _> =
serde_json::from_slice(event_str)
else {
tracing::error!(
"failed to decode built-path response as BuiltPathResponseEventV1"
);
continue;
};
let maybe_store_paths = event
.outputs
.iter()
.map(|path| {
state
.store
.follow_store_path(path)
.map_err(|_| anyhow!("failed to collect store paths"))
})
.collect::<Result<Vec<_>>>();
let Ok(store_paths) = maybe_store_paths else {
tracing::error!(
"built-paths: encountered an error aggregating build store paths"
);
continue;
};
tracing::debug!("about to enqueue paths: {:?}", store_paths);
if let Err(e) = crate::api::enqueue_paths(&state, store_paths).await {
tracing::error!(
"built-paths: failed to enqueue paths for drv ({}): {}",
event.drv.display(),
e
);
continue;
}
}
}
});
Ok(())
}
pub async fn setup_legacy_post_build_hook(
listen: &SocketAddr,
nix_conf: &mut std::fs::File,
) -> Result<()> {
/* Write the post-build hook script. Note that the shell script
* ignores errors, to avoid the Nix build from failing. */
let post_build_hook_script = {
let mut file = NamedTempFile::with_prefix("magic-nix-cache-build-hook-")
.with_context(|| "Creating a temporary file for the post-build hook")?;
file.write_all(
format!(
// NOTE(cole-h): We want to exit 0 even if the hook failed, otherwise it'll fail the
// build itself
"#! /bin/sh\nRUST_LOG=trace RUST_BACKTRACE=full {} --server {} || :\n",
std::env::current_exe()
.with_context(|| "Getting the path of magic-nix-cache")?
.display(),
listen
)
.as_bytes(),
)
.with_context(|| "Writing the post-build hook")?;
let path = file
.keep()
.with_context(|| "Keeping the post-build hook")?
.1;
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755))
.with_context(|| "Setting permissions on the post-build hook")?;
/* Copy the script to the Nix store so we know for sure that
* it's accessible to the Nix daemon, which might have a
* different /tmp from us. */
let res = Command::new("nix")
.args([
"--extra-experimental-features",
"nix-command",
"store",
"add-path",
&path.display().to_string(),
])
.output()
.await?;
if res.status.success() {
tokio::fs::remove_file(path).await?;
PathBuf::from(String::from_utf8_lossy(&res.stdout).trim())
} else {
path
}
};
/* Update nix.conf. */
nix_conf
.write_all(
format!(
"fallback = true\npost-build-hook = {}\n",
post_build_hook_script.display()
)
.as_bytes(),
)
.with_context(|| "Writing to nix.conf")?;
Ok(())
}
pub async fn handle_legacy_post_build_hook(out_paths: &str) -> Result<()> {
#[derive(Parser, Debug)]
struct Args {
/// `magic-nix-cache` daemon to connect to.
#[arg(short = 'l', long, default_value = "127.0.0.1:3000")]
server: SocketAddr,
}
let args = Args::parse();
let store_paths: Vec<_> = out_paths
.split_whitespace()
.map(|s| s.trim().to_owned())
.collect();
let request = crate::api::EnqueuePathsRequest { store_paths };
let response = reqwest::Client::new()
.post(format!("http://{}/api/enqueue-paths", &args.server))
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(
serde_json::to_string(&request)
.with_context(|| "Decoding the response from the magic-nix-cache server")?,
)
.send()
.await;
match response {
Ok(response) if !response.status().is_success() => Err(anyhow!(
"magic-nix-cache server failed to enqueue the push request: {}\n{}",
response.status(),
response
.text()
.await
.unwrap_or_else(|_| "<no response text>".to_owned()),
))?,
Ok(response) => response
.json::<crate::api::EnqueuePathsResponse>()
.await
.with_context(|| "magic-nix-cache-server didn't return a valid response")?,
Err(err) => {
Err(err).with_context(|| "magic-nix-cache server failed to send the enqueue request")?
}
};
Ok(())
}