From 685fe75327d24520ccc41aef107dfc4729c580a9 Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Mon, 12 Aug 2024 12:34:20 -0700 Subject: [PATCH] built-store-paths: mnc can reconnect in dnixd restarts --- magic-nix-cache/src/main.rs | 10 +-- magic-nix-cache/src/pbh.rs | 140 +++++++++++++++++++++++------------- 2 files changed, 91 insertions(+), 59 deletions(-) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index dfa9a7b..2e49c75 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -26,24 +26,16 @@ use std::collections::HashSet; use std::fs::{self, create_dir_all}; use std::io::Write; use std::net::SocketAddr; -use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use std::sync::Arc; use ::attic::nix_store::NixStore; use anyhow::{anyhow, Context, Result}; -use axum::body::Body; use axum::{extract::Extension, routing::get, Router}; use clap::Parser; -use futures::StreamExt; -use http_body_util::BodyExt; -use hyper_util::rt::{TokioExecutor, TokioIo}; use serde::{Deserialize, Serialize}; -use tempfile::NamedTempFile; use tokio::fs::File; use tokio::io::AsyncWriteExt; -use tokio::net::UnixStream; -use tokio::process::Command; use tokio::sync::{oneshot, Mutex, RwLock}; use tracing_subscriber::filter::EnvFilter; use tracing_subscriber::layer::SubscriberExt; @@ -337,7 +329,7 @@ async fn main_cli() -> Result<()> { let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME); if dnixd_uds_socket_path.exists() { - crate::pbh::subscribe_uds_post_build_hook(&dnixd_uds_socket_path, state.clone()).await?; + crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone()).await?; } else { crate::pbh::setup_legacy_post_build_hook(&args.listen, &mut nix_conf).await?; } diff --git a/magic-nix-cache/src/pbh.rs b/magic-nix-cache/src/pbh.rs index a81f62c..c12c84e 100644 --- a/magic-nix-cache/src/pbh.rs +++ b/magic-nix-cache/src/pbh.rs @@ -1,13 +1,11 @@ use std::io::Write as _; use std::net::SocketAddr; use std::os::unix::fs::PermissionsExt as _; -use std::path::Path; use std::path::PathBuf; use anyhow::anyhow; use anyhow::Context as _; use anyhow::Result; -use axum::body::Body; use clap::Parser; use futures::StreamExt as _; use http_body_util::BodyExt as _; @@ -21,62 +19,104 @@ use crate::BuiltPathResponseEventV1; use crate::State; pub async fn subscribe_uds_post_build_hook( - dnixd_uds_socket_path: &Path, + dnixd_uds_socket_path: PathBuf, state: State, ) -> Result<()> { - let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?); + tokio::spawn(async move { + let dnixd_uds_socket_path = &dnixd_uds_socket_path; + loop { + let Ok(socket_conn) = UnixStream::connect(dnixd_uds_socket_path).await else { + tracing::error!("built-paths: failed to connect to determinate-nixd's socket"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + continue; + }; + let stream = TokioIo::new(socket_conn); + let executor: TokioExecutor = TokioExecutor::new(); - // TODO(colemickens): loop retry/reconnect/etc + let sender_conn = hyper::client::conn::http2::handshake(executor, stream).await; - let executor = TokioExecutor::new(); - let (mut sender, conn): (hyper::client::conn::http2::SendRequest, _) = - hyper::client::conn::http2::handshake(executor, stream) - .await - .unwrap(); + let Ok((mut sender, conn)) = sender_conn else { + tracing::error!("built-paths: failed to http2 handshake"); + continue; + }; - // NOTE(colemickens): for now we just drop the joinhandle and let it keep running - let _join_handle = tokio::task::spawn(async move { - if let Err(err) = conn.await { - tracing::error!("Connection failed: {:?}", err); + // NOTE(colemickens): for now we just drop the joinhandle and let it keep running + let _join_handle = tokio::task::spawn(async move { + if let Err(err) = conn.await { + tracing::error!("Connection failed: {:?}", err); + } + }); + + let request = http::Request::builder() + .method(http::Method::GET) + .uri("http://localhost/built-paths") + .body(axum::body::Body::empty()); + let Ok(request) = request else { + tracing::error!("built-paths: failed to create request to subscribe"); + continue; + }; + + let response = sender.send_request(request).await; + let Ok(response) = response else { + tracing::error!("buit-paths: failed to send subscription request"); + continue; + }; + let mut data = response.into_data_stream(); + + while let Some(event_str) = data.next().await { + let event_str = match event_str { + Ok(event) => event, + Err(e) => { + tracing::error!("built-paths: error while receiving: {}", e); + break; + } + }; + + let Some(event_str) = event_str.strip_prefix("data: ".as_bytes()) else { + tracing::debug!("built-paths subscription: ignoring non-data frame"); + continue; + }; + let Ok(event): core::result::Result = + serde_json::from_slice(event_str) + else { + tracing::error!( + "failed to decode built-path response as BuiltPathResponseEventV1" + ); + continue; + }; + + // TODO(colemickens): error handling::: + let maybe_store_paths = event + .outputs + .iter() + .map(|path| { + state + .store + .follow_store_path(path) + .map_err(|_| anyhow!("ahhhhh")) + }) + .collect::>>(); + + let Ok(store_paths) = maybe_store_paths else { + tracing::error!( + "built-paths: encountered an error aggregating build store paths" + ); + continue; + }; + + tracing::debug!("about to enqueue paths: {:?}", store_paths); + if let Err(e) = crate::api::enqueue_paths(&state, store_paths).await { + tracing::error!( + "built-paths: failed to enqueue paths for drv ({}): {}", + event.drv.display(), + e + ); + continue; + } + } } }); - let request = http::Request::builder() - .method(http::Method::GET) - .uri("http://localhost/built-paths") - .body(axum::body::Body::empty())?; - - let response = sender.send_request(request).await?; - let mut data = response.into_data_stream(); - - while let Some(event_str) = data.next().await { - let event_str = event_str.unwrap(); // TODO(colemickens): error handle - - let event_str = match event_str.strip_prefix("data: ".as_bytes()) { - Some(s) => s, - None => { - tracing::debug!("built-paths subscription: ignoring non-data frame"); - continue; - } - }; - let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?; - - // TODO(colemickens): error handling::: - let store_paths = event - .outputs - .iter() - .map(|path| { - state - .store - .follow_store_path(path) - .map_err(|_| anyhow!("ahhhhh")) - }) - .collect::>>()?; - - tracing::debug!("about to enqueue paths: {:?}", store_paths); - crate::api::enqueue_paths(&state, store_paths).await?; - } - Ok(()) }