Make store diffing optional

This commit is contained in:
Cole Helbling 2024-05-15 13:29:51 -07:00
parent 67647c9997
commit 6a58908c6b
2 changed files with 61 additions and 44 deletions

View file

@ -12,14 +12,14 @@ use crate::error::{Error, Result};
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
struct WorkflowStartResponse { struct WorkflowStartResponse {
num_original_paths: usize, num_original_paths: Option<usize>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
struct WorkflowFinishResponse { struct WorkflowFinishResponse {
num_original_paths: usize, num_original_paths: Option<usize>,
num_final_paths: usize, num_final_paths: Option<usize>,
num_new_paths: usize, num_new_paths: Option<usize>,
} }
pub fn get_router() -> Router { pub fn get_router() -> Router {
@ -33,18 +33,23 @@ pub fn get_router() -> Router {
#[debug_handler] #[debug_handler]
async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> { async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> {
tracing::info!("Workflow started"); tracing::info!("Workflow started");
let mut original_paths = state.original_paths.lock().await; let reply = if let Some(original_paths) = &state.original_paths {
*original_paths = crate::util::get_store_paths(&state.store).await?; let mut original_paths = original_paths.lock().await;
*original_paths = crate::util::get_store_paths(&state.store).await?;
let reply = WorkflowStartResponse { let reply = WorkflowStartResponse {
num_original_paths: original_paths.len(), num_original_paths: Some(original_paths.len()),
};
state.metrics.num_original_paths.set(original_paths.len());
reply
} else {
WorkflowStartResponse {
num_original_paths: None,
}
}; };
state
.metrics
.num_original_paths
.set(reply.num_original_paths);
Ok(Json(reply)) Ok(Json(reply))
} }
@ -54,22 +59,42 @@ async fn workflow_finish(
) -> Result<Json<WorkflowFinishResponse>> { ) -> Result<Json<WorkflowFinishResponse>> {
tracing::info!("Workflow finished"); tracing::info!("Workflow finished");
let original_paths = state.original_paths.lock().await; let response = if let Some(original_paths) = &state.original_paths {
let final_paths = crate::util::get_store_paths(&state.store).await?; let original_paths = original_paths.lock().await;
let new_paths = final_paths let final_paths = crate::util::get_store_paths(&state.store).await?;
.difference(&original_paths) let new_paths = final_paths
.cloned() .difference(&original_paths)
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic)) .cloned()
.collect::<Result<Vec<_>>>()?; .map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
.collect::<Result<Vec<_>>>()?;
let num_original_paths = original_paths.len(); let num_original_paths = original_paths.len();
let num_final_paths = final_paths.len(); let num_final_paths = final_paths.len();
let num_new_paths = new_paths.len(); let num_new_paths = new_paths.len();
// NOTE(cole-h): If we're substituting from an upstream cache, those paths won't have the let reply = WorkflowFinishResponse {
// post-build-hook run on it, so we diff the store to ensure we cache everything we can. num_original_paths: Some(num_original_paths),
tracing::info!("Diffing the store and uploading any new paths before we shut down"); num_final_paths: Some(num_final_paths),
enqueue_paths(&state, new_paths).await?; num_new_paths: Some(num_new_paths),
};
state.metrics.num_original_paths.set(num_original_paths);
state.metrics.num_final_paths.set(num_final_paths);
state.metrics.num_new_paths.set(num_new_paths);
// NOTE(cole-h): If we're substituting from an upstream cache, those paths won't have the
// post-build-hook run on it, so we diff the store to ensure we cache everything we can.
tracing::info!("Diffing the store and uploading any new paths before we shut down");
enqueue_paths(&state, new_paths).await?;
reply
} else {
WorkflowFinishResponse {
num_original_paths: None,
num_final_paths: None,
num_new_paths: None,
}
};
if let Some(gha_cache) = &state.gha_cache { if let Some(gha_cache) = &state.gha_cache {
tracing::info!("Waiting for GitHub action cache uploads to finish"); tracing::info!("Waiting for GitHub action cache uploads to finish");
@ -94,20 +119,7 @@ async fn workflow_finish(
println!("\n{logfile_contents}\n"); println!("\n{logfile_contents}\n");
} }
let reply = WorkflowFinishResponse { Ok(Json(response))
num_original_paths,
num_final_paths,
num_new_paths,
};
state
.metrics
.num_original_paths
.set(reply.num_original_paths);
state.metrics.num_final_paths.set(reply.num_final_paths);
state.metrics.num_new_paths.set(reply.num_new_paths);
Ok(Json(reply))
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View file

@ -120,6 +120,10 @@ struct Args {
/// File to write to when indicating startup. /// File to write to when indicating startup.
#[arg(long)] #[arg(long)]
startup_notification_file: Option<PathBuf>, startup_notification_file: Option<PathBuf>,
/// Whether or not to diff the store before and after Magic Nix Cache runs
#[arg(long, default_value_t = false)]
diff_store: bool,
} }
impl Args { impl Args {
@ -166,8 +170,8 @@ struct StateInner {
/// Where all of tracing will log to when GitHub Actions is run in debug mode /// Where all of tracing will log to when GitHub Actions is run in debug mode
logfile: Option<PathBuf>, logfile: Option<PathBuf>,
/// The paths in the Nix store when Magic Nix Cache started. /// The paths in the Nix store when Magic Nix Cache started, if store diffing is enabled.
original_paths: Mutex<HashSet<PathBuf>>, original_paths: Option<Mutex<HashSet<PathBuf>>>,
} }
async fn main_cli() -> Result<()> { async fn main_cli() -> Result<()> {
@ -356,6 +360,7 @@ async fn main_cli() -> Result<()> {
let (shutdown_sender, shutdown_receiver) = oneshot::channel(); let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new()));
let state = Arc::new(StateInner { let state = Arc::new(StateInner {
gha_cache, gha_cache,
upstream: args.upstream.clone(), upstream: args.upstream.clone(),
@ -365,7 +370,7 @@ async fn main_cli() -> Result<()> {
store, store,
flakehub_state: RwLock::new(flakehub_state), flakehub_state: RwLock::new(flakehub_state),
logfile: guard.logfile, logfile: guard.logfile,
original_paths: Mutex::new(HashSet::new()), original_paths,
}); });
let app = Router::new() let app = Router::new()