built-store-paths: mnc can reconnect in dnixd restarts
This commit is contained in:
parent
3a001d12e5
commit
685fe75327
|
@ -26,24 +26,16 @@ use std::collections::HashSet;
|
|||
use std::fs::{self, create_dir_all};
|
||||
use std::io::Write;
|
||||
use std::net::SocketAddr;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use ::attic::nix_store::NixStore;
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use axum::body::Body;
|
||||
use axum::{extract::Extension, routing::get, Router};
|
||||
use clap::Parser;
|
||||
use futures::StreamExt;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tempfile::NamedTempFile;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::UnixStream;
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::{oneshot, Mutex, RwLock};
|
||||
use tracing_subscriber::filter::EnvFilter;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
|
@ -337,7 +329,7 @@ async fn main_cli() -> Result<()> {
|
|||
let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME);
|
||||
|
||||
if dnixd_uds_socket_path.exists() {
|
||||
crate::pbh::subscribe_uds_post_build_hook(&dnixd_uds_socket_path, state.clone()).await?;
|
||||
crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone()).await?;
|
||||
} else {
|
||||
crate::pbh::setup_legacy_post_build_hook(&args.listen, &mut nix_conf).await?;
|
||||
}
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
use std::io::Write as _;
|
||||
use std::net::SocketAddr;
|
||||
use std::os::unix::fs::PermissionsExt as _;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use anyhow::Context as _;
|
||||
use anyhow::Result;
|
||||
use axum::body::Body;
|
||||
use clap::Parser;
|
||||
use futures::StreamExt as _;
|
||||
use http_body_util::BodyExt as _;
|
||||
|
@ -21,62 +19,104 @@ use crate::BuiltPathResponseEventV1;
|
|||
use crate::State;
|
||||
|
||||
pub async fn subscribe_uds_post_build_hook(
|
||||
dnixd_uds_socket_path: &Path,
|
||||
dnixd_uds_socket_path: PathBuf,
|
||||
state: State,
|
||||
) -> Result<()> {
|
||||
let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?);
|
||||
tokio::spawn(async move {
|
||||
let dnixd_uds_socket_path = &dnixd_uds_socket_path;
|
||||
loop {
|
||||
let Ok(socket_conn) = UnixStream::connect(dnixd_uds_socket_path).await else {
|
||||
tracing::error!("built-paths: failed to connect to determinate-nixd's socket");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||
continue;
|
||||
};
|
||||
let stream = TokioIo::new(socket_conn);
|
||||
let executor: TokioExecutor = TokioExecutor::new();
|
||||
|
||||
// TODO(colemickens): loop retry/reconnect/etc
|
||||
let sender_conn = hyper::client::conn::http2::handshake(executor, stream).await;
|
||||
|
||||
let executor = TokioExecutor::new();
|
||||
let (mut sender, conn): (hyper::client::conn::http2::SendRequest<Body>, _) =
|
||||
hyper::client::conn::http2::handshake(executor, stream)
|
||||
.await
|
||||
.unwrap();
|
||||
let Ok((mut sender, conn)) = sender_conn else {
|
||||
tracing::error!("built-paths: failed to http2 handshake");
|
||||
continue;
|
||||
};
|
||||
|
||||
// NOTE(colemickens): for now we just drop the joinhandle and let it keep running
|
||||
let _join_handle = tokio::task::spawn(async move {
|
||||
if let Err(err) = conn.await {
|
||||
tracing::error!("Connection failed: {:?}", err);
|
||||
// NOTE(colemickens): for now we just drop the joinhandle and let it keep running
|
||||
let _join_handle = tokio::task::spawn(async move {
|
||||
if let Err(err) = conn.await {
|
||||
tracing::error!("Connection failed: {:?}", err);
|
||||
}
|
||||
});
|
||||
|
||||
let request = http::Request::builder()
|
||||
.method(http::Method::GET)
|
||||
.uri("http://localhost/built-paths")
|
||||
.body(axum::body::Body::empty());
|
||||
let Ok(request) = request else {
|
||||
tracing::error!("built-paths: failed to create request to subscribe");
|
||||
continue;
|
||||
};
|
||||
|
||||
let response = sender.send_request(request).await;
|
||||
let Ok(response) = response else {
|
||||
tracing::error!("buit-paths: failed to send subscription request");
|
||||
continue;
|
||||
};
|
||||
let mut data = response.into_data_stream();
|
||||
|
||||
while let Some(event_str) = data.next().await {
|
||||
let event_str = match event_str {
|
||||
Ok(event) => event,
|
||||
Err(e) => {
|
||||
tracing::error!("built-paths: error while receiving: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(event_str) = event_str.strip_prefix("data: ".as_bytes()) else {
|
||||
tracing::debug!("built-paths subscription: ignoring non-data frame");
|
||||
continue;
|
||||
};
|
||||
let Ok(event): core::result::Result<BuiltPathResponseEventV1, _> =
|
||||
serde_json::from_slice(event_str)
|
||||
else {
|
||||
tracing::error!(
|
||||
"failed to decode built-path response as BuiltPathResponseEventV1"
|
||||
);
|
||||
continue;
|
||||
};
|
||||
|
||||
// TODO(colemickens): error handling:::
|
||||
let maybe_store_paths = event
|
||||
.outputs
|
||||
.iter()
|
||||
.map(|path| {
|
||||
state
|
||||
.store
|
||||
.follow_store_path(path)
|
||||
.map_err(|_| anyhow!("ahhhhh"))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>();
|
||||
|
||||
let Ok(store_paths) = maybe_store_paths else {
|
||||
tracing::error!(
|
||||
"built-paths: encountered an error aggregating build store paths"
|
||||
);
|
||||
continue;
|
||||
};
|
||||
|
||||
tracing::debug!("about to enqueue paths: {:?}", store_paths);
|
||||
if let Err(e) = crate::api::enqueue_paths(&state, store_paths).await {
|
||||
tracing::error!(
|
||||
"built-paths: failed to enqueue paths for drv ({}): {}",
|
||||
event.drv.display(),
|
||||
e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let request = http::Request::builder()
|
||||
.method(http::Method::GET)
|
||||
.uri("http://localhost/built-paths")
|
||||
.body(axum::body::Body::empty())?;
|
||||
|
||||
let response = sender.send_request(request).await?;
|
||||
let mut data = response.into_data_stream();
|
||||
|
||||
while let Some(event_str) = data.next().await {
|
||||
let event_str = event_str.unwrap(); // TODO(colemickens): error handle
|
||||
|
||||
let event_str = match event_str.strip_prefix("data: ".as_bytes()) {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
tracing::debug!("built-paths subscription: ignoring non-data frame");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let event: BuiltPathResponseEventV1 = serde_json::from_slice(&event_str)?;
|
||||
|
||||
// TODO(colemickens): error handling:::
|
||||
let store_paths = event
|
||||
.outputs
|
||||
.iter()
|
||||
.map(|path| {
|
||||
state
|
||||
.store
|
||||
.follow_store_path(path)
|
||||
.map_err(|_| anyhow!("ahhhhh"))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
tracing::debug!("about to enqueue paths: {:?}", store_paths);
|
||||
crate::api::enqueue_paths(&state, store_paths).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue