Merge pull request #63 from DeterminateSystems/bring-back-store-diffing

Bring back store diffing
This commit is contained in:
Cole Helbling 2024-05-22 13:10:02 -07:00 committed by GitHub
commit 545d3b7bac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 124 additions and 14 deletions

View file

@ -2,6 +2,7 @@
//! //!
//! This API is intended to be used by nix-installer-action. //! This API is intended to be used by nix-installer-action.
use attic::nix_store::StorePath;
use axum::{extract::Extension, routing::post, Json, Router}; use axum::{extract::Extension, routing::post, Json, Router};
use axum_macros::debug_handler; use axum_macros::debug_handler;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -10,28 +11,46 @@ use super::State;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
struct WorkflowStartResponse {} struct WorkflowStartResponse {
num_original_paths: Option<usize>,
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
struct WorkflowFinishResponse { struct WorkflowFinishResponse {
//num_new_paths: usize, num_original_paths: Option<usize>,
num_final_paths: Option<usize>,
num_new_paths: Option<usize>,
} }
pub fn get_router() -> Router { pub fn get_router() -> Router {
Router::new() Router::new()
.route("/api/workflow-start", post(workflow_start)) .route("/api/workflow-start", post(workflow_start))
.route("/api/workflow-finish", post(workflow_finish)) .route("/api/workflow-finish", post(workflow_finish))
.route("/api/enqueue-paths", post(enqueue_paths)) .route("/api/enqueue-paths", post(post_enqueue_paths))
} }
/// Record existing paths. /// Record existing paths.
#[debug_handler] #[debug_handler]
async fn workflow_start( async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> {
Extension(_state): Extension<State>,
) -> Result<Json<WorkflowStartResponse>> {
tracing::info!("Workflow started"); tracing::info!("Workflow started");
let reply = if let Some(original_paths) = &state.original_paths {
let mut original_paths = original_paths.lock().await;
*original_paths = crate::util::get_store_paths(&state.store).await?;
Ok(Json(WorkflowStartResponse {})) let reply = WorkflowStartResponse {
num_original_paths: Some(original_paths.len()),
};
state.metrics.num_original_paths.set(original_paths.len());
reply
} else {
WorkflowStartResponse {
num_original_paths: None,
}
};
Ok(Json(reply))
} }
/// Push new paths and shut down. /// Push new paths and shut down.
@ -40,6 +59,43 @@ async fn workflow_finish(
) -> Result<Json<WorkflowFinishResponse>> { ) -> Result<Json<WorkflowFinishResponse>> {
tracing::info!("Workflow finished"); tracing::info!("Workflow finished");
let response = if let Some(original_paths) = &state.original_paths {
let original_paths = original_paths.lock().await;
let final_paths = crate::util::get_store_paths(&state.store).await?;
let new_paths = final_paths
.difference(&original_paths)
.cloned()
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
.collect::<Result<Vec<_>>>()?;
let num_original_paths = original_paths.len();
let num_final_paths = final_paths.len();
let num_new_paths = new_paths.len();
let reply = WorkflowFinishResponse {
num_original_paths: Some(num_original_paths),
num_final_paths: Some(num_final_paths),
num_new_paths: Some(num_new_paths),
};
state.metrics.num_original_paths.set(num_original_paths);
state.metrics.num_final_paths.set(num_final_paths);
state.metrics.num_new_paths.set(num_new_paths);
// NOTE(cole-h): If we're substituting from an upstream cache, those paths won't have the
// post-build-hook run on it, so we diff the store to ensure we cache everything we can.
tracing::info!("Diffing the store and uploading any new paths before we shut down");
enqueue_paths(&state, new_paths).await?;
reply
} else {
WorkflowFinishResponse {
num_original_paths: None,
num_final_paths: None,
num_new_paths: None,
}
};
if let Some(gha_cache) = &state.gha_cache { if let Some(gha_cache) = &state.gha_cache {
tracing::info!("Waiting for GitHub action cache uploads to finish"); tracing::info!("Waiting for GitHub action cache uploads to finish");
gha_cache.shutdown().await?; gha_cache.shutdown().await?;
@ -63,11 +119,7 @@ async fn workflow_finish(
println!("\n{logfile_contents}\n"); println!("\n{logfile_contents}\n");
} }
let reply = WorkflowFinishResponse {}; Ok(Json(response))
//state.metrics.num_new_paths.set(num_new_paths);
Ok(Json(reply))
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -80,7 +132,7 @@ pub struct EnqueuePathsResponse {}
/// Schedule paths in the local Nix store for uploading. /// Schedule paths in the local Nix store for uploading.
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn enqueue_paths( async fn post_enqueue_paths(
Extension(state): Extension<State>, Extension(state): Extension<State>,
Json(req): Json<EnqueuePathsRequest>, Json(req): Json<EnqueuePathsRequest>,
) -> Result<Json<EnqueuePathsResponse>> { ) -> Result<Json<EnqueuePathsResponse>> {
@ -92,6 +144,12 @@ async fn enqueue_paths(
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic)) .map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
enqueue_paths(&state, store_paths).await?;
Ok(Json(EnqueuePathsResponse {}))
}
async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
if let Some(gha_cache) = &state.gha_cache { if let Some(gha_cache) = &state.gha_cache {
gha_cache gha_cache
.enqueue_paths(state.store.clone(), store_paths.clone()) .enqueue_paths(state.store.clone(), store_paths.clone())
@ -102,5 +160,5 @@ async fn enqueue_paths(
crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?; crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?;
} }
Ok(Json(EnqueuePathsResponse {})) Ok(())
} }

View file

@ -19,6 +19,7 @@ mod error;
mod flakehub; mod flakehub;
mod gha; mod gha;
mod telemetry; mod telemetry;
mod util;
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::{self, create_dir_all}; use std::fs::{self, create_dir_all};
@ -119,6 +120,10 @@ struct Args {
/// File to write to when indicating startup. /// File to write to when indicating startup.
#[arg(long)] #[arg(long)]
startup_notification_file: Option<PathBuf>, startup_notification_file: Option<PathBuf>,
/// Whether or not to diff the store before and after Magic Nix Cache runs
#[arg(long, default_value_t = false)]
diff_store: bool,
} }
impl Args { impl Args {
@ -164,6 +169,9 @@ struct StateInner {
/// Where all of tracing will log to when GitHub Actions is run in debug mode /// Where all of tracing will log to when GitHub Actions is run in debug mode
logfile: Option<PathBuf>, logfile: Option<PathBuf>,
/// The paths in the Nix store when Magic Nix Cache started, if store diffing is enabled.
original_paths: Option<Mutex<HashSet<PathBuf>>>,
} }
async fn main_cli() -> Result<()> { async fn main_cli() -> Result<()> {
@ -352,6 +360,7 @@ async fn main_cli() -> Result<()> {
let (shutdown_sender, shutdown_receiver) = oneshot::channel(); let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new()));
let state = Arc::new(StateInner { let state = Arc::new(StateInner {
gha_cache, gha_cache,
upstream: args.upstream.clone(), upstream: args.upstream.clone(),
@ -361,6 +370,7 @@ async fn main_cli() -> Result<()> {
store, store,
flakehub_state: RwLock::new(flakehub_state), flakehub_state: RwLock::new(flakehub_state),
logfile: guard.logfile, logfile: guard.logfile,
original_paths,
}); });
let app = Router::new() let app = Router::new()

View file

@ -0,0 +1,42 @@
//! Utilities.
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use attic::nix_store::NixStore;
use crate::error::Result;
/// Returns the list of store paths that are currently present.
pub async fn get_store_paths(store: &NixStore) -> Result<HashSet<PathBuf>> {
// FIXME: use the Nix API.
let store_dir = store.store_dir();
let mut listing = tokio::fs::read_dir(store_dir).await?;
let mut paths = HashSet::new();
while let Some(entry) = listing.next_entry().await? {
let file_name = entry.file_name();
let file_name = Path::new(&file_name);
if let Some(extension) = file_name.extension() {
match extension.to_str() {
None | Some("drv") | Some("chroot") => {
tracing::debug!(
"skipping file with weird or uninteresting extension {extension:?}"
);
continue;
}
_ => {}
}
}
if let Some(s) = file_name.to_str() {
// Special paths (so far only `.links`)
if s == ".links" {
continue;
}
}
paths.insert(store_dir.join(file_name));
}
Ok(paths)
}