send and wait for dnixd flush signal

This commit is contained in:
Cole Mickens 2025-04-01 13:01:33 -07:00
parent cf00f851e1
commit 4631601e5d
3 changed files with 23 additions and 1 deletions

View file

@ -94,6 +94,10 @@ async fn workflow_finish(
}
};
// maybe here send the request to Dnixd to Flush
// save uuid from response
// then wait on receiver until we get that same uuid back
if let Some(gha_cache) = &state.gha_cache {
tracing::info!("Waiting for GitHub action cache uploads to finish");
gha_cache.shutdown().await?;

View file

@ -37,6 +37,7 @@ use clap::Parser;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::{oneshot, Mutex, RwLock};
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
@ -227,6 +228,9 @@ struct StateInner {
/// The paths in the Nix store when Magic Nix Cache started, if store diffing is enabled.
original_paths: Option<Mutex<HashSet<PathBuf>>>,
/// The receiver side of the channel we use to get flush events from dnixd back to the workflow_shutdown handler
dnixd_flush_receiver: Option<UnboundedReceiver<uuid::Uuid>>,
}
#[derive(Debug, Clone)]
@ -422,6 +426,15 @@ async fn main_cli() -> Result<()> {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let dnixd_flush_channel = if dnixd_available == Dnixd::Available {
Some(tokio::sync::mpsc::unbounded_channel())
} else {
None
};
// ????
let dnixd_flush_sender = dnixd_flush_channel.as_ref().map(|c| c.0.clone());
let dnixd_flush_receiver = dnixd_flush_channel.map(|c| c.1);
let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new()));
let state = Arc::new(StateInner {
gha_cache,
@ -433,11 +446,12 @@ async fn main_cli() -> Result<()> {
flakehub_state: RwLock::new(flakehub_state),
logfile: guard.logfile,
original_paths,
dnixd_flush_receiver,
});
if dnixd_available == Dnixd::Available {
tracing::info!("Subscribing to Determinate Nixd build events.");
crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone()).await?;
crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone(), dnixd_flush_sender).await?;
} else {
tracing::info!("Patching nix.conf to use a post-build-hook.");
crate::pbh::setup_legacy_post_build_hook(&args.listen, &mut nix_conf).await?;

View file

@ -21,6 +21,7 @@ use crate::State;
pub async fn subscribe_uds_post_build_hook(
dnixd_uds_socket_path: PathBuf,
state: State,
dnixd_flush_sender: Option<tokio::sync::mpsc::UnboundedSender<uuid::Uuid>>,
) -> Result<()> {
tokio::spawn(async move {
let dnixd_uds_socket_path = &dnixd_uds_socket_path;
@ -79,6 +80,9 @@ pub async fn subscribe_uds_post_build_hook(
tracing::debug!("built-paths subscription: ignoring non-data frame");
continue;
};
// TODO: check for flush event of flush type - send it to the sender
let Ok(event): core::result::Result<BuiltPathResponseEventV1, _> =
serde_json::from_slice(event_str)
else {