From 4631601e5db112eb9ea0bb1bccda981ebb487644 Mon Sep 17 00:00:00 2001 From: Cole Mickens Date: Tue, 1 Apr 2025 13:01:33 -0700 Subject: [PATCH] send and wait for dnixd flush signal --- magic-nix-cache/src/api.rs | 4 ++++ magic-nix-cache/src/main.rs | 16 +++++++++++++++- magic-nix-cache/src/pbh.rs | 4 ++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/magic-nix-cache/src/api.rs b/magic-nix-cache/src/api.rs index 1fa145e..49fdb7a 100644 --- a/magic-nix-cache/src/api.rs +++ b/magic-nix-cache/src/api.rs @@ -94,6 +94,10 @@ async fn workflow_finish( } }; + // maybe here send the request to Dnixd to Flush + // save uuid from response + // then wait on receiver until we get that same uuid back + if let Some(gha_cache) = &state.gha_cache { tracing::info!("Waiting for GitHub action cache uploads to finish"); gha_cache.shutdown().await?; diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 4d7151e..01d561d 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -37,6 +37,7 @@ use clap::Parser; use serde::{Deserialize, Serialize}; use tokio::fs::File; use tokio::io::AsyncWriteExt; +use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::{oneshot, Mutex, RwLock}; use tracing_subscriber::filter::EnvFilter; use tracing_subscriber::layer::SubscriberExt; @@ -227,6 +228,9 @@ struct StateInner { /// The paths in the Nix store when Magic Nix Cache started, if store diffing is enabled. original_paths: Option>>, + + /// The receiver side of the channel we use to get flush events from dnixd back to the workflow_shutdown handler + dnixd_flush_receiver: Option>, } #[derive(Debug, Clone)] @@ -422,6 +426,15 @@ async fn main_cli() -> Result<()> { let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let dnixd_flush_channel = if dnixd_available == Dnixd::Available { + Some(tokio::sync::mpsc::unbounded_channel()) + } else { + None + }; + // ???? + let dnixd_flush_sender = dnixd_flush_channel.as_ref().map(|c| c.0.clone()); + let dnixd_flush_receiver = dnixd_flush_channel.map(|c| c.1); + let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new())); let state = Arc::new(StateInner { gha_cache, @@ -433,11 +446,12 @@ async fn main_cli() -> Result<()> { flakehub_state: RwLock::new(flakehub_state), logfile: guard.logfile, original_paths, + dnixd_flush_receiver, }); if dnixd_available == Dnixd::Available { tracing::info!("Subscribing to Determinate Nixd build events."); - crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone()).await?; + crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone(), dnixd_flush_sender).await?; } else { tracing::info!("Patching nix.conf to use a post-build-hook."); crate::pbh::setup_legacy_post_build_hook(&args.listen, &mut nix_conf).await?; diff --git a/magic-nix-cache/src/pbh.rs b/magic-nix-cache/src/pbh.rs index 43f5cc3..a39ab61 100644 --- a/magic-nix-cache/src/pbh.rs +++ b/magic-nix-cache/src/pbh.rs @@ -21,6 +21,7 @@ use crate::State; pub async fn subscribe_uds_post_build_hook( dnixd_uds_socket_path: PathBuf, state: State, + dnixd_flush_sender: Option>, ) -> Result<()> { tokio::spawn(async move { let dnixd_uds_socket_path = &dnixd_uds_socket_path; @@ -79,6 +80,9 @@ pub async fn subscribe_uds_post_build_hook( tracing::debug!("built-paths subscription: ignoring non-data frame"); continue; }; + + // TODO: check for flush event of flush type - send it to the sender + let Ok(event): core::result::Result = serde_json::from_slice(event_str) else {