Bring back store diffing
This commit is contained in:
parent
507095f7ea
commit
938f17dd2d
|
@ -2,6 +2,7 @@
|
||||||
//!
|
//!
|
||||||
//! This API is intended to be used by nix-installer-action.
|
//! This API is intended to be used by nix-installer-action.
|
||||||
|
|
||||||
|
use attic::nix_store::StorePath;
|
||||||
use axum::{extract::Extension, routing::post, Json, Router};
|
use axum::{extract::Extension, routing::post, Json, Router};
|
||||||
use axum_macros::debug_handler;
|
use axum_macros::debug_handler;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -10,28 +11,41 @@ use super::State;
|
||||||
use crate::error::{Error, Result};
|
use crate::error::{Error, Result};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
struct WorkflowStartResponse {}
|
struct WorkflowStartResponse {
|
||||||
|
num_original_paths: usize,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
struct WorkflowFinishResponse {
|
struct WorkflowFinishResponse {
|
||||||
//num_new_paths: usize,
|
num_original_paths: usize,
|
||||||
|
num_final_paths: usize,
|
||||||
|
num_new_paths: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_router() -> Router {
|
pub fn get_router() -> Router {
|
||||||
Router::new()
|
Router::new()
|
||||||
.route("/api/workflow-start", post(workflow_start))
|
.route("/api/workflow-start", post(workflow_start))
|
||||||
.route("/api/workflow-finish", post(workflow_finish))
|
.route("/api/workflow-finish", post(workflow_finish))
|
||||||
.route("/api/enqueue-paths", post(enqueue_paths))
|
.route("/api/enqueue-paths", post(post_enqueue_paths))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Record existing paths.
|
/// Record existing paths.
|
||||||
#[debug_handler]
|
#[debug_handler]
|
||||||
async fn workflow_start(
|
async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> {
|
||||||
Extension(_state): Extension<State>,
|
|
||||||
) -> Result<Json<WorkflowStartResponse>> {
|
|
||||||
tracing::info!("Workflow started");
|
tracing::info!("Workflow started");
|
||||||
|
let mut original_paths = state.original_paths.lock().await;
|
||||||
|
*original_paths = crate::util::get_store_paths(&state.store).await?;
|
||||||
|
|
||||||
Ok(Json(WorkflowStartResponse {}))
|
let reply = WorkflowStartResponse {
|
||||||
|
num_original_paths: original_paths.len(),
|
||||||
|
};
|
||||||
|
|
||||||
|
state
|
||||||
|
.metrics
|
||||||
|
.num_original_paths
|
||||||
|
.set(reply.num_original_paths);
|
||||||
|
|
||||||
|
Ok(Json(reply))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push new paths and shut down.
|
/// Push new paths and shut down.
|
||||||
|
@ -40,6 +54,23 @@ async fn workflow_finish(
|
||||||
) -> Result<Json<WorkflowFinishResponse>> {
|
) -> Result<Json<WorkflowFinishResponse>> {
|
||||||
tracing::info!("Workflow finished");
|
tracing::info!("Workflow finished");
|
||||||
|
|
||||||
|
let original_paths = state.original_paths.lock().await;
|
||||||
|
let final_paths = crate::util::get_store_paths(&state.store).await?;
|
||||||
|
let new_paths = final_paths
|
||||||
|
.difference(&original_paths)
|
||||||
|
.cloned()
|
||||||
|
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
|
||||||
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
let num_original_paths = original_paths.len();
|
||||||
|
let num_final_paths = final_paths.len();
|
||||||
|
let num_new_paths = new_paths.len();
|
||||||
|
|
||||||
|
// NOTE(cole-h): If we're substituting from an upstream cache, those paths won't have the
|
||||||
|
// post-build-hook run on it, so we diff the store to ensure we cache everything we can.
|
||||||
|
tracing::info!("Diffing the store and uploading any new paths before we shut down");
|
||||||
|
enqueue_paths(&state, new_paths).await?;
|
||||||
|
|
||||||
if let Some(gha_cache) = &state.gha_cache {
|
if let Some(gha_cache) = &state.gha_cache {
|
||||||
tracing::info!("Waiting for GitHub action cache uploads to finish");
|
tracing::info!("Waiting for GitHub action cache uploads to finish");
|
||||||
gha_cache.shutdown().await?;
|
gha_cache.shutdown().await?;
|
||||||
|
@ -63,9 +94,18 @@ async fn workflow_finish(
|
||||||
println!("Every log line throughout the lifetime of the program:");
|
println!("Every log line throughout the lifetime of the program:");
|
||||||
println!("\n{logfile_contents}\n");
|
println!("\n{logfile_contents}\n");
|
||||||
|
|
||||||
let reply = WorkflowFinishResponse {};
|
let reply = WorkflowFinishResponse {
|
||||||
|
num_original_paths,
|
||||||
|
num_final_paths,
|
||||||
|
num_new_paths,
|
||||||
|
};
|
||||||
|
|
||||||
//state.metrics.num_new_paths.set(num_new_paths);
|
state
|
||||||
|
.metrics
|
||||||
|
.num_original_paths
|
||||||
|
.set(reply.num_original_paths);
|
||||||
|
state.metrics.num_final_paths.set(reply.num_final_paths);
|
||||||
|
state.metrics.num_new_paths.set(reply.num_new_paths);
|
||||||
|
|
||||||
Ok(Json(reply))
|
Ok(Json(reply))
|
||||||
}
|
}
|
||||||
|
@ -80,7 +120,7 @@ pub struct EnqueuePathsResponse {}
|
||||||
|
|
||||||
/// Schedule paths in the local Nix store for uploading.
|
/// Schedule paths in the local Nix store for uploading.
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn enqueue_paths(
|
async fn post_enqueue_paths(
|
||||||
Extension(state): Extension<State>,
|
Extension(state): Extension<State>,
|
||||||
Json(req): Json<EnqueuePathsRequest>,
|
Json(req): Json<EnqueuePathsRequest>,
|
||||||
) -> Result<Json<EnqueuePathsResponse>> {
|
) -> Result<Json<EnqueuePathsResponse>> {
|
||||||
|
@ -92,6 +132,12 @@ async fn enqueue_paths(
|
||||||
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
|
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
|
||||||
.collect::<Result<Vec<_>>>()?;
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
enqueue_paths(&state, store_paths).await?;
|
||||||
|
|
||||||
|
Ok(Json(EnqueuePathsResponse {}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
|
||||||
if let Some(gha_cache) = &state.gha_cache {
|
if let Some(gha_cache) = &state.gha_cache {
|
||||||
gha_cache
|
gha_cache
|
||||||
.enqueue_paths(state.store.clone(), store_paths.clone())
|
.enqueue_paths(state.store.clone(), store_paths.clone())
|
||||||
|
@ -103,5 +149,5 @@ async fn enqueue_paths(
|
||||||
crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?;
|
crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Json(EnqueuePathsResponse {}))
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ mod error;
|
||||||
mod flakehub;
|
mod flakehub;
|
||||||
mod gha;
|
mod gha;
|
||||||
mod telemetry;
|
mod telemetry;
|
||||||
|
mod util;
|
||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::fs::{self, create_dir_all};
|
use std::fs::{self, create_dir_all};
|
||||||
|
@ -136,6 +137,9 @@ struct StateInner {
|
||||||
|
|
||||||
/// FlakeHub cache state.
|
/// FlakeHub cache state.
|
||||||
flakehub_state: RwLock<Option<flakehub::State>>,
|
flakehub_state: RwLock<Option<flakehub::State>>,
|
||||||
|
|
||||||
|
/// The paths in the Nix store when Magic Nix Cache started.
|
||||||
|
original_paths: Mutex<HashSet<PathBuf>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn main_cli() -> Result<()> {
|
async fn main_cli() -> Result<()> {
|
||||||
|
@ -324,6 +328,7 @@ async fn main_cli() -> Result<()> {
|
||||||
metrics,
|
metrics,
|
||||||
store,
|
store,
|
||||||
flakehub_state: RwLock::new(flakehub_state),
|
flakehub_state: RwLock::new(flakehub_state),
|
||||||
|
original_paths: Mutex::new(HashSet::new()),
|
||||||
});
|
});
|
||||||
|
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
|
|
47
magic-nix-cache/src/util.rs
Normal file
47
magic-nix-cache/src/util.rs
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
//! Utilities.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
|
use attic::nix_store::NixStore;
|
||||||
|
|
||||||
|
use crate::error::Result;
|
||||||
|
|
||||||
|
/// Returns the list of store paths that are currently present.
|
||||||
|
pub async fn get_store_paths(store: &NixStore) -> Result<HashSet<PathBuf>> {
|
||||||
|
// FIXME: use the Nix API.
|
||||||
|
let store_dir = store.store_dir();
|
||||||
|
let mut listing = tokio::fs::read_dir(store_dir).await?;
|
||||||
|
let mut paths = HashSet::new();
|
||||||
|
while let Some(entry) = listing.next_entry().await? {
|
||||||
|
let file_name = entry.file_name();
|
||||||
|
let file_name = Path::new(&file_name);
|
||||||
|
|
||||||
|
if let Some(extension) = file_name.extension() {
|
||||||
|
match extension.to_str() {
|
||||||
|
None | Some("drv") | Some("chroot") => {
|
||||||
|
tracing::debug!(
|
||||||
|
"skipping file with weird or uninteresting extension {extension:?}"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(s) = file_name.to_str() {
|
||||||
|
// Let's not push any sources
|
||||||
|
if s.ends_with("-source") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Special paths (so far only `.links`)
|
||||||
|
if s.starts_with(".links") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
paths.insert(store_dir.join(file_name));
|
||||||
|
}
|
||||||
|
Ok(paths)
|
||||||
|
}
|
Loading…
Reference in a new issue