Merge remote-tracking branch 'origin/eelcodolstra/fh-224-magic-nix-cache-use-post-build-hook-for-the-gha-cache-as' into merge-against-upstream

This commit is contained in:
Cole Helbling 2024-03-01 08:40:20 -08:00
commit 619a6346c0
14 changed files with 2840 additions and 237 deletions

2522
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -90,54 +90,54 @@
"type": "github" "type": "github"
} }
}, },
"lowdown-src": { "libgit2": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1633514407, "lastModified": 1709118350,
"narHash": "sha256-Dw32tiMjdK9t3ETl5fzGrutQTzh2rufgZV4A/BbxuD4=", "narHash": "sha256-JptPxlhXS3VHDYNKWTud7ubs1qweF6g0Vo6Epojporc=",
"owner": "kristapsdz", "owner": "libgit2",
"repo": "lowdown", "repo": "libgit2",
"rev": "d2c2b44ff6c27b936ec27358a2653caaef8f73b8", "rev": "4ab9c401ab36080707ab27aafd0c247af4399ef6",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "kristapsdz", "owner": "libgit2",
"repo": "lowdown", "repo": "libgit2",
"type": "github" "type": "github"
} }
}, },
"nix": { "nix": {
"inputs": { "inputs": {
"flake-compat": "flake-compat_2", "flake-compat": "flake-compat_2",
"lowdown-src": "lowdown-src", "libgit2": "libgit2",
"nixpkgs": "nixpkgs", "nixpkgs": "nixpkgs",
"nixpkgs-regression": "nixpkgs-regression" "nixpkgs-regression": "nixpkgs-regression"
}, },
"locked": { "locked": {
"lastModified": 1701122567, "lastModified": 1708517151,
"narHash": "sha256-iA8DqS+W2fWTfR+nNJSvMHqQ+4NpYMRT3b+2zS6JTvE=", "narHash": "sha256-s7QTMxLzVA5UF80sFCv8jwaTMBLA8/110YFkZNkNsCk=",
"rev": "50f8f1c8bc019a4c0fd098b9ac674b94cfc6af0d", "rev": "8a8172cd2b5ef2f6dd2d9673a6379447d780ff17",
"revCount": 15434, "revCount": 16129,
"type": "tarball", "type": "tarball",
"url": "https://api.flakehub.com/f/pinned/NixOS/nix/2.19.2/018c1be0-1b88-7682-b3bf-948ec82d0a0b/source.tar.gz" "url": "https://api.flakehub.com/f/pinned/NixOS/nix/2.20.3/018dcc43-c784-772a-8da1-64165044e9cd/source.tar.gz"
}, },
"original": { "original": {
"type": "tarball", "type": "tarball",
"url": "https://flakehub.com/f/NixOS/nix/2.19.tar.gz" "url": "https://flakehub.com/f/NixOS/nix/2.20.tar.gz"
} }
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1698876495, "lastModified": 1705033721,
"narHash": "sha256-nsQo2/mkDUFeAjuu92p0dEqhRvHHiENhkKVIV1y0/Oo=", "narHash": "sha256-K5eJHmL1/kev6WuqyqqbS1cdNnSidIZ3jeqJ7GbrYnQ=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "9eb24edd6a0027fed010ccfe300a9734d029983c", "rev": "a1982c92d8980a0114372973cbdfe0a307f1bdea",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "release-23.05", "ref": "nixos-23.05-small",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }

View file

@ -17,7 +17,7 @@
flake-compat.url = "https://flakehub.com/f/edolstra/flake-compat/1.0.1.tar.gz"; flake-compat.url = "https://flakehub.com/f/edolstra/flake-compat/1.0.1.tar.gz";
nix.url = "https://flakehub.com/f/NixOS/nix/2.19.tar.gz"; nix.url = "https://flakehub.com/f/NixOS/nix/2.20.tar.gz";
}; };
outputs = { self, nixpkgs, nix, ... }@inputs: outputs = { self, nixpkgs, nix, ... }@inputs:
@ -46,6 +46,7 @@
default = magic-nix-cache; default = magic-nix-cache;
}); });
/*
devShells = forEachSupportedSystem ({ pkgs, cranePkgs, lib }: { devShells = forEachSupportedSystem ({ pkgs, cranePkgs, lib }: {
default = pkgs.mkShell { default = pkgs.mkShell {
inputsFrom = [ cranePkgs.magic-nix-cache ]; inputsFrom = [ cranePkgs.magic-nix-cache ];
@ -156,5 +157,6 @@
]; ];
}; };
}); });
*/
}; };
} }

View file

@ -32,5 +32,5 @@ We should contribute support for the latter to [Octocrab](https://github.com/XAM
Since GHAC uses private APIs that use special tokens for authentication, we need to get them from a workflow run. Since GHAC uses private APIs that use special tokens for authentication, we need to get them from a workflow run.
The easiest way is with the `keygen` workflow in this repo. The easiest way is with the `keygen` workflow in this repo.
Generate an `age` encryption key with `age-keygen -o key.txt`, and add the Public Key as a repository secret named `AGE_PUBLIC_KEY`. Generate an `age` encryption key with `nix shell nixpkgs#age --command age-keygen -o key.txt`, and add the Public Key as a repository secret named `AGE_PUBLIC_KEY`.
Then, trigger the `keygen` workflow which will print out a command that will let you decrypt the credentials. Then, trigger the `keygen` workflow which will print out a command that will let you decrypt the credentials.

View file

@ -319,8 +319,8 @@ impl Api {
Err(Error::TooManyCollisions) Err(Error::TooManyCollisions)
} }
/// Uploads a file. /// Uploads a file. Returns the size of the file.
pub async fn upload_file<S>(&self, allocation: FileAllocation, mut stream: S) -> Result<()> pub async fn upload_file<S>(&self, allocation: FileAllocation, mut stream: S) -> Result<usize>
where where
S: AsyncRead + Unpin + Send, S: AsyncRead + Unpin + Send,
{ {
@ -396,7 +396,7 @@ impl Api {
self.commit_cache(allocation.0, offset).await?; self.commit_cache(allocation.0, offset).await?;
Ok(()) Ok(offset)
} }
/// Downloads a file based on a list of key prefixes. /// Downloads a file based on a list of key prefixes.

View file

@ -17,7 +17,7 @@ serde = { version = "1.0.162", features = ["derive"] }
serde_json = { version = "1.0.96", default-features = false } serde_json = { version = "1.0.96", default-features = false }
thiserror = "1.0.40" thiserror = "1.0.40"
tokio-stream = { version = "0.1.14", default-features = false } tokio-stream = { version = "0.1.14", default-features = false }
tokio-util = { version = "0.7.8", features = ["io"] } tokio-util = { version = "0.7.8", features = ["io", "compat"] }
daemonize = "0.5.0" daemonize = "0.5.0"
is_ci = "1.1.1" is_ci = "1.1.1"
sha2 = { version = "0.10.6", default-features = false } sha2 = { version = "0.10.6", default-features = false }
@ -25,10 +25,13 @@ reqwest = { version = "0.11.17", default-features = false, features = ["blocking
netrc-rs = "0.1.2" netrc-rs = "0.1.2"
attic = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" } attic = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" }
attic-client = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" } attic-client = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" }
attic-server = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" }
indicatif = "0.17" indicatif = "0.17"
anyhow = "1.0.71" anyhow = "1.0.71"
tempfile = "3.9" tempfile = "3.9"
uuid = { version = "1.4.0", features = ["serde", "v7", "rand", "std"] } uuid = { version = "1.4.0", features = ["serde", "v7", "rand", "std"] }
futures = "0.3"
async-compression = "0.4"
[dependencies.tokio] [dependencies.tokio]
version = "1.28.0" version = "1.28.0"

View file

@ -2,26 +2,19 @@
//! //!
//! This API is intended to be used by nix-installer-action. //! This API is intended to be used by nix-installer-action.
use std::net::SocketAddr; use axum::{extract::Extension, routing::post, Json, Router};
use axum::{extract::Extension, http::uri::Uri, routing::post, Json, Router};
use axum_macros::debug_handler; use axum_macros::debug_handler;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::State; use super::State;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::util::{get_store_paths, upload_paths};
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
struct WorkflowStartResponse { struct WorkflowStartResponse {}
num_original_paths: usize,
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
struct WorkflowFinishResponse { struct WorkflowFinishResponse {
num_original_paths: usize, //num_new_paths: usize,
num_final_paths: usize,
num_new_paths: usize,
} }
pub fn get_router() -> Router { pub fn get_router() -> Router {
@ -33,15 +26,12 @@ pub fn get_router() -> Router {
/// Record existing paths. /// Record existing paths.
#[debug_handler] #[debug_handler]
async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> { async fn workflow_start(
Extension(_state): Extension<State>,
) -> Result<Json<WorkflowStartResponse>> {
tracing::info!("Workflow started"); tracing::info!("Workflow started");
let mut original_paths = state.original_paths.lock().await; Ok(Json(WorkflowStartResponse {}))
*original_paths = get_store_paths(&state.store).await?;
Ok(Json(WorkflowStartResponse {
num_original_paths: original_paths.len(),
}))
} }
/// Push new paths and shut down. /// Push new paths and shut down.
@ -49,17 +39,10 @@ async fn workflow_finish(
Extension(state): Extension<State>, Extension(state): Extension<State>,
) -> Result<Json<WorkflowFinishResponse>> { ) -> Result<Json<WorkflowFinishResponse>> {
tracing::info!("Workflow finished"); tracing::info!("Workflow finished");
let original_paths = state.original_paths.lock().await;
let final_paths = get_store_paths(&state.store).await?;
let new_paths = final_paths
.difference(&original_paths)
.cloned()
.collect::<Vec<_>>();
if state.api.is_some() { if let Some(gha_cache) = &state.gha_cache {
tracing::info!("Pushing {} new paths to GHA cache", new_paths.len()); tracing::info!("Waiting for GitHub action cache uploads to finish");
let store_uri = make_store_uri(&state.self_endpoint); gha_cache.shutdown().await?;
upload_paths(new_paths.clone(), &store_uri).await?;
} }
if let Some(sender) = state.shutdown_sender.lock().await.take() { if let Some(sender) = state.shutdown_sender.lock().await.take() {
@ -69,36 +52,18 @@ async fn workflow_finish(
// Wait for the Attic push workers to finish. // Wait for the Attic push workers to finish.
if let Some(attic_state) = state.flakehub_state.write().await.take() { if let Some(attic_state) = state.flakehub_state.write().await.take() {
tracing::info!("Waiting for FlakeHub cache uploads to finish");
attic_state.push_session.wait().await?; attic_state.push_session.wait().await?;
} }
} }
let reply = WorkflowFinishResponse { let reply = WorkflowFinishResponse {};
num_original_paths: original_paths.len(),
num_final_paths: final_paths.len(),
num_new_paths: new_paths.len(),
};
state //state.metrics.num_new_paths.set(num_new_paths);
.metrics
.num_original_paths
.set(reply.num_original_paths);
state.metrics.num_final_paths.set(reply.num_final_paths);
state.metrics.num_new_paths.set(reply.num_new_paths);
Ok(Json(reply)) Ok(Json(reply))
} }
fn make_store_uri(self_endpoint: &SocketAddr) -> String {
Uri::builder()
.scheme("http")
.authority(self_endpoint.to_string())
.path_and_query("/?compression=zstd&parallel-compression=true")
.build()
.expect("Cannot construct URL to self")
.to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnqueuePathsRequest { pub struct EnqueuePathsRequest {
pub store_paths: Vec<String>, pub store_paths: Vec<String>,
@ -120,6 +85,12 @@ async fn enqueue_paths(
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic)) .map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
if let Some(gha_cache) = &state.gha_cache {
gha_cache
.enqueue_paths(state.store.clone(), store_paths.clone())
.await?;
}
if let Some(flakehub_state) = &*state.flakehub_state.read().await { if let Some(flakehub_state) = &*state.flakehub_state.read().await {
crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?; crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?;
} }

View file

@ -51,7 +51,7 @@ async fn get_narinfo(
let key = format!("{}.narinfo", store_path_hash); let key = format!("{}.narinfo", store_path_hash);
if state if state
.narinfo_nagative_cache .narinfo_negative_cache
.read() .read()
.await .await
.contains(&store_path_hash) .contains(&store_path_hash)
@ -61,20 +61,21 @@ async fn get_narinfo(
return pull_through(&state, &path); return pull_through(&state, &path);
} }
if let Some(api) = &state.api { if let Some(gha_cache) = &state.gha_cache {
if let Some(url) = api.get_file_url(&[&key]).await? { if let Some(url) = gha_cache.api.get_file_url(&[&key]).await? {
state.metrics.narinfos_served.incr(); state.metrics.narinfos_served.incr();
return Ok(Redirect::temporary(&url)); return Ok(Redirect::temporary(&url));
} }
} }
let mut negative_cache = state.narinfo_nagative_cache.write().await; let mut negative_cache = state.narinfo_negative_cache.write().await;
negative_cache.insert(store_path_hash); negative_cache.insert(store_path_hash);
state.metrics.narinfos_sent_upstream.incr(); state.metrics.narinfos_sent_upstream.incr();
state.metrics.narinfos_negative_cache_misses.incr(); state.metrics.narinfos_negative_cache_misses.incr();
pull_through(&state, &path) pull_through(&state, &path)
} }
async fn put_narinfo( async fn put_narinfo(
Extension(state): Extension<State>, Extension(state): Extension<State>,
Path(path): Path<String>, Path(path): Path<String>,
@ -90,19 +91,19 @@ async fn put_narinfo(
return Err(Error::BadRequest); return Err(Error::BadRequest);
} }
let api = state.api.as_ref().ok_or(Error::GHADisabled)?; let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?;
let store_path_hash = components[0].to_string(); let store_path_hash = components[0].to_string();
let key = format!("{}.narinfo", store_path_hash); let key = format!("{}.narinfo", store_path_hash);
let allocation = api.allocate_file_with_random_suffix(&key).await?; let allocation = gha_cache.api.allocate_file_with_random_suffix(&key).await?;
let stream = StreamReader::new( let stream = StreamReader::new(
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))), body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
); );
api.upload_file(allocation, stream).await?; gha_cache.api.upload_file(allocation, stream).await?;
state.metrics.narinfos_uploaded.incr(); state.metrics.narinfos_uploaded.incr();
state state
.narinfo_nagative_cache .narinfo_negative_cache
.write() .write()
.await .await
.remove(&store_path_hash); .remove(&store_path_hash);
@ -112,9 +113,10 @@ async fn put_narinfo(
async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -> Result<Redirect> { async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -> Result<Redirect> {
if let Some(url) = state if let Some(url) = state
.api .gha_cache
.as_ref() .as_ref()
.ok_or(Error::GHADisabled)? .ok_or(Error::GHADisabled)?
.api
.get_file_url(&[&path]) .get_file_url(&[&path])
.await? .await?
{ {
@ -129,18 +131,22 @@ async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -
Err(Error::NotFound) Err(Error::NotFound)
} }
} }
async fn put_nar( async fn put_nar(
Extension(state): Extension<State>, Extension(state): Extension<State>,
Path(path): Path<String>, Path(path): Path<String>,
body: BodyStream, body: BodyStream,
) -> Result<()> { ) -> Result<()> {
let api = state.api.as_ref().ok_or(Error::GHADisabled)?; let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?;
let allocation = api.allocate_file_with_random_suffix(&path).await?; let allocation = gha_cache
.api
.allocate_file_with_random_suffix(&path)
.await?;
let stream = StreamReader::new( let stream = StreamReader::new(
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))), body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
); );
api.upload_file(allocation, stream).await?; gha_cache.api.upload_file(allocation, stream).await?;
state.metrics.nars_uploaded.incr(); state.metrics.nars_uploaded.incr();
Ok(()) Ok(())

View file

@ -22,9 +22,6 @@ pub enum Error {
#[error("I/O error: {0}")] #[error("I/O error: {0}")]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error("Failed to upload paths")]
FailedToUpload,
#[error("GHA cache is disabled")] #[error("GHA cache is disabled")]
GHADisabled, GHADisabled,

View file

@ -9,7 +9,6 @@ use attic_client::{
}; };
use reqwest::Url; use reqwest::Url;
use serde::Deserialize; use serde::Deserialize;
use std::env;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use tokio::fs::File; use tokio::fs::File;
@ -28,6 +27,7 @@ pub async fn init_cache(
flakehub_api_server: &Url, flakehub_api_server: &Url,
flakehub_api_server_netrc: &Path, flakehub_api_server_netrc: &Path,
flakehub_cache_server: &Url, flakehub_cache_server: &Url,
flakehub_flake_name: &str,
store: Arc<NixStore>, store: Arc<NixStore>,
) -> Result<State> { ) -> Result<State> {
// Parse netrc to get the credentials for api.flakehub.com. // Parse netrc to get the credentials for api.flakehub.com.
@ -92,12 +92,8 @@ pub async fn init_cache(
// Get the cache UUID for this project. // Get the cache UUID for this project.
let cache_name = { let cache_name = {
let github_repo = env::var("GITHUB_REPOSITORY").map_err(|_| {
Error::Config("GITHUB_REPOSITORY environment variable is not set".to_owned())
})?;
let url = flakehub_api_server let url = flakehub_api_server
.join(&format!("project/{}", github_repo)) .join(&format!("project/{}", flakehub_flake_name))
.map_err(|_| Error::Config(format!("bad URL '{}'", flakehub_api_server)))?; .map_err(|_| Error::Config(format!("bad URL '{}'", flakehub_api_server)))?;
let response = reqwest::Client::new() let response = reqwest::Client::new()

228
magic-nix-cache/src/gha.rs Normal file
View file

@ -0,0 +1,228 @@
use std::{collections::HashSet, sync::Arc};
use crate::error::{Error, Result};
use crate::telemetry;
use async_compression::tokio::bufread::ZstdEncoder;
use attic::nix_store::{NixStore, StorePath, ValidPathInfo};
use attic_server::narinfo::{Compression, NarInfo};
use futures::stream::TryStreamExt;
use gha_cache::{Api, Credentials};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
RwLock,
};
use tokio_util::compat::FuturesAsyncReadCompatExt;
pub struct GhaCache {
/// The GitHub Actions Cache API.
pub api: Arc<Api>,
/// The future from the completion of the worker.
worker_result: RwLock<Option<tokio::task::JoinHandle<Result<()>>>>,
channel_tx: UnboundedSender<Request>,
}
#[derive(Debug)]
enum Request {
Shutdown,
Upload(StorePath),
}
impl GhaCache {
pub fn new(
credentials: Credentials,
cache_version: Option<String>,
store: Arc<NixStore>,
metrics: Arc<telemetry::TelemetryReport>,
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
) -> Result<GhaCache> {
let mut api = Api::new(credentials)?;
if let Some(cache_version) = &cache_version {
api.mutate_version(cache_version.as_bytes());
}
let (channel_tx, channel_rx) = unbounded_channel();
let api = Arc::new(api);
let api2 = api.clone();
let worker_result = tokio::task::spawn(async move {
worker(
&api2,
store,
channel_rx,
metrics,
narinfo_negative_cache.clone(),
)
.await
});
Ok(GhaCache {
api,
worker_result: RwLock::new(Some(worker_result)),
channel_tx,
})
}
pub async fn shutdown(&self) -> Result<()> {
if let Some(worker_result) = self.worker_result.write().await.take() {
self.channel_tx
.send(Request::Shutdown)
.expect("Cannot send shutdown message");
worker_result.await.unwrap()
} else {
Ok(())
}
}
pub async fn enqueue_paths(
&self,
store: Arc<NixStore>,
store_paths: Vec<StorePath>,
) -> Result<()> {
// FIXME: make sending the closure optional. We might want to
// only send the paths that have been built by the user, under
// the assumption that everything else is already in a binary
// cache.
// FIXME: compute_fs_closure_multi doesn't return a
// toposort, though it doesn't really matter for the GHA
// cache.
let closure = store
.compute_fs_closure_multi(store_paths, false, false, false)
.await?;
for p in closure {
self.channel_tx
.send(Request::Upload(p))
.map_err(|_| Error::Internal("Cannot send upload message".to_owned()))?;
}
Ok(())
}
}
async fn worker(
api: &Api,
store: Arc<NixStore>,
mut channel_rx: UnboundedReceiver<Request>,
metrics: Arc<telemetry::TelemetryReport>,
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
) -> Result<()> {
let mut done = HashSet::new();
while let Some(req) = channel_rx.recv().await {
match req {
Request::Shutdown => {
break;
}
Request::Upload(path) => {
if !done.insert(path.clone()) {
continue;
}
if let Err(err) = upload_path(
api,
store.clone(),
&path,
metrics.clone(),
narinfo_negative_cache.clone(),
)
.await
{
tracing::error!(
"Upload of path '{}' failed: {}",
store.get_full_path(&path).display(),
err
);
}
}
}
}
Ok(())
}
async fn upload_path(
api: &Api,
store: Arc<NixStore>,
path: &StorePath,
metrics: Arc<telemetry::TelemetryReport>,
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
) -> Result<()> {
let path_info = store.query_path_info(path.clone()).await?;
// Upload the NAR.
let nar_path = format!("{}.nar.zstd", path_info.nar_hash.to_base32());
let nar_allocation = api.allocate_file_with_random_suffix(&nar_path).await?;
let nar_stream = store.nar_from_path(path.clone());
let nar_reader = nar_stream
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read();
let nar_compressor = ZstdEncoder::new(nar_reader.compat());
let compressed_nar_size = api.upload_file(nar_allocation, nar_compressor).await?;
metrics.nars_uploaded.incr();
tracing::debug!(
"Uploaded '{}' (size {} -> {})",
nar_path,
path_info.nar_size,
compressed_nar_size
);
// Upload the narinfo.
let narinfo_path = format!("{}.narinfo", path.to_hash().as_str());
let narinfo_allocation = api.allocate_file_with_random_suffix(&narinfo_path).await?;
let narinfo = path_info_to_nar_info(store.clone(), &path_info, format!("nar/{}", nar_path))
.to_string()
.unwrap();
tracing::debug!("Uploading '{}'", narinfo_path);
api.upload_file(narinfo_allocation, narinfo.as_bytes())
.await?;
metrics.narinfos_uploaded.incr();
narinfo_negative_cache
.write()
.await
.remove(&path.to_hash().to_string());
tracing::info!(
"Uploaded '{}' to the GitHub Action Cache",
store.get_full_path(path).display()
);
Ok(())
}
// FIXME: move to attic.
fn path_info_to_nar_info(store: Arc<NixStore>, path_info: &ValidPathInfo, url: String) -> NarInfo {
NarInfo {
store_path: store.get_full_path(&path_info.path),
url,
compression: Compression::Zstd,
file_hash: None,
file_size: None,
nar_hash: path_info.nar_hash.clone(),
nar_size: path_info.nar_size as usize,
references: path_info
.references
.iter()
.map(|r| r.file_name().unwrap().to_str().unwrap().to_owned())
.collect(),
system: None,
deriver: None,
signature: None,
ca: path_info.ca.clone(),
}
}

View file

@ -16,8 +16,8 @@ mod api;
mod binary_cache; mod binary_cache;
mod error; mod error;
mod flakehub; mod flakehub;
mod gha;
mod telemetry; mod telemetry;
mod util;
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::{self, create_dir_all, OpenOptions}; use std::fs::{self, create_dir_all, OpenOptions};
@ -35,7 +35,7 @@ use tempfile::NamedTempFile;
use tokio::sync::{oneshot, Mutex, RwLock}; use tokio::sync::{oneshot, Mutex, RwLock};
use tracing_subscriber::filter::EnvFilter; use tracing_subscriber::filter::EnvFilter;
use gha_cache::{Api, Credentials}; use gha_cache::Credentials;
type State = Arc<StateInner>; type State = Arc<StateInner>;
@ -91,6 +91,9 @@ struct Args {
#[arg(long)] #[arg(long)]
flakehub_cache_server: Option<reqwest::Url>, flakehub_cache_server: Option<reqwest::Url>,
#[arg(long)]
flakehub_flake_name: Option<String>,
/// The location of `nix.conf`. /// The location of `nix.conf`.
#[arg(long)] #[arg(long)]
nix_conf: PathBuf, nix_conf: PathBuf,
@ -110,8 +113,8 @@ struct Args {
/// The global server state. /// The global server state.
struct StateInner { struct StateInner {
/// The GitHub Actions Cache API. /// State for uploading to the GHA cache.
api: Option<Api>, gha_cache: Option<gha::GhaCache>,
/// The upstream cache. /// The upstream cache.
upstream: Option<String>, upstream: Option<String>,
@ -119,19 +122,11 @@ struct StateInner {
/// The sender half of the oneshot channel to trigger a shutdown. /// The sender half of the oneshot channel to trigger a shutdown.
shutdown_sender: Mutex<Option<oneshot::Sender<()>>>, shutdown_sender: Mutex<Option<oneshot::Sender<()>>>,
/// List of store paths originally present.
original_paths: Mutex<HashSet<PathBuf>>,
/// Set of store path hashes that are not present in GHAC. /// Set of store path hashes that are not present in GHAC.
narinfo_nagative_cache: RwLock<HashSet<String>>, narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
/// Endpoint of ourselves.
///
/// This is used by our Action API to invoke `nix copy` to upload new paths.
self_endpoint: SocketAddr,
/// Metrics for sending to perf at shutdown /// Metrics for sending to perf at shutdown
metrics: telemetry::TelemetryReport, metrics: Arc<telemetry::TelemetryReport>,
/// Connection to the local Nix store. /// Connection to the local Nix store.
store: Arc<NixStore>, store: Arc<NixStore>,
@ -145,6 +140,8 @@ async fn main_cli() -> Result<()> {
let args = Args::parse(); let args = Args::parse();
let metrics = Arc::new(telemetry::TelemetryReport::new());
if let Some(parent) = Path::new(&args.nix_conf).parent() { if let Some(parent) = Path::new(&args.nix_conf).parent() {
create_dir_all(parent).with_context(|| "Creating parent directories of nix.conf")?; create_dir_all(parent).with_context(|| "Creating parent directories of nix.conf")?;
} }
@ -157,6 +154,8 @@ async fn main_cli() -> Result<()> {
let store = Arc::new(NixStore::connect()?); let store = Arc::new(NixStore::connect()?);
let narinfo_negative_cache = Arc::new(RwLock::new(HashSet::new()));
let flakehub_state = if args.use_flakehub { let flakehub_state = if args.use_flakehub {
let flakehub_cache_server = args let flakehub_cache_server = args
.flakehub_cache_server .flakehub_cache_server
@ -164,6 +163,15 @@ async fn main_cli() -> Result<()> {
let flakehub_api_server_netrc = args let flakehub_api_server_netrc = args
.flakehub_api_server_netrc .flakehub_api_server_netrc
.ok_or_else(|| anyhow!("--flakehub-api-server-netrc is required"))?; .ok_or_else(|| anyhow!("--flakehub-api-server-netrc is required"))?;
let flakehub_flake_name = args
.flakehub_flake_name
.ok_or_else(|| {
tracing::debug!(
"--flakehub-flake-name was not set, inferring from $GITHUB_REPOSITORY env var"
);
std::env::var("GITHUB_REPOSITORY")
})
.map_err(|_| anyhow!("--flakehub-flake-name and $GITHUB_REPOSITORY were both unset"))?;
match flakehub::init_cache( match flakehub::init_cache(
&args &args
@ -171,6 +179,7 @@ async fn main_cli() -> Result<()> {
.ok_or_else(|| anyhow!("--flakehub-api-server is required"))?, .ok_or_else(|| anyhow!("--flakehub-api-server is required"))?,
&flakehub_api_server_netrc, &flakehub_api_server_netrc,
&flakehub_cache_server, &flakehub_cache_server,
&flakehub_flake_name,
store.clone(), store.clone(),
) )
.await .await
@ -200,7 +209,7 @@ async fn main_cli() -> Result<()> {
None None
}; };
let api = if args.use_gha_cache { let gha_cache = if args.use_gha_cache {
let credentials = if let Some(credentials_file) = &args.credentials_file { let credentials = if let Some(credentials_file) = &args.credentials_file {
tracing::info!("Loading credentials from {:?}", credentials_file); tracing::info!("Loading credentials from {:?}", credentials_file);
let bytes = fs::read(credentials_file).with_context(|| { let bytes = fs::read(credentials_file).with_context(|| {
@ -222,19 +231,21 @@ async fn main_cli() -> Result<()> {
.with_context(|| "Failed to load credentials from environment (see README.md)")? .with_context(|| "Failed to load credentials from environment (see README.md)")?
}; };
let mut api = Api::new(credentials) let gha_cache = gha::GhaCache::new(
credentials,
args.cache_version,
store.clone(),
metrics.clone(),
narinfo_negative_cache.clone(),
)
.with_context(|| "Failed to initialize GitHub Actions Cache API")?; .with_context(|| "Failed to initialize GitHub Actions Cache API")?;
if let Some(cache_version) = &args.cache_version {
api.mutate_version(cache_version.as_bytes());
}
nix_conf nix_conf
.write_all(format!("extra-substituters = http://{}?trusted=1&compression=zstd&parallel-compression=true&priority=1\n", args.listen).as_bytes()) .write_all(format!("extra-substituters = http://{}?trusted=1&compression=zstd&parallel-compression=true&priority=1\n", args.listen).as_bytes())
.with_context(|| "Writing to nix.conf")?; .with_context(|| "Writing to nix.conf")?;
tracing::info!("Native GitHub Action cache is enabled."); tracing::info!("Native GitHub Action cache is enabled.");
Some(api) Some(gha_cache)
} else { } else {
tracing::info!("Native GitHub Action cache is disabled."); tracing::info!("Native GitHub Action cache is disabled.");
None None
@ -290,13 +301,11 @@ async fn main_cli() -> Result<()> {
let (shutdown_sender, shutdown_receiver) = oneshot::channel(); let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let state = Arc::new(StateInner { let state = Arc::new(StateInner {
api, gha_cache,
upstream: args.upstream.clone(), upstream: args.upstream.clone(),
shutdown_sender: Mutex::new(Some(shutdown_sender)), shutdown_sender: Mutex::new(Some(shutdown_sender)),
original_paths: Mutex::new(HashSet::new()), narinfo_negative_cache,
narinfo_nagative_cache: RwLock::new(HashSet::new()), metrics,
self_endpoint: args.listen.to_owned(),
metrics: telemetry::TelemetryReport::new(),
store, store,
flakehub_state: RwLock::new(flakehub_state), flakehub_state: RwLock::new(flakehub_state),
}); });
@ -438,8 +447,8 @@ async fn dump_api_stats<B>(
request: axum::http::Request<B>, request: axum::http::Request<B>,
next: axum::middleware::Next<B>, next: axum::middleware::Next<B>,
) -> axum::response::Response { ) -> axum::response::Response {
if let Some(api) = &state.api { if let Some(gha_cache) = &state.gha_cache {
api.dump_stats(); gha_cache.api.dump_stats();
} }
next.run(request).await next.run(request).await
} }

View file

@ -1,91 +0,0 @@
//! Utilities.
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use attic::nix_store::NixStore;
use tokio::{fs, process::Command};
use crate::error::{Error, Result};
/// Returns the list of store paths that are currently present.
pub async fn get_store_paths(store: &NixStore) -> Result<HashSet<PathBuf>> {
// FIXME: use the Nix API.
let store_dir = store.store_dir();
let mut listing = fs::read_dir(store_dir).await?;
let mut paths = HashSet::new();
while let Some(entry) = listing.next_entry().await? {
let file_name = entry.file_name();
let file_name = Path::new(&file_name);
if let Some(extension) = file_name.extension() {
match extension.to_str() {
None | Some("drv") | Some("lock") | Some("chroot") => {
// Malformed or not interesting
continue;
}
_ => {}
}
}
if let Some(s) = file_name.to_str() {
// Let's not push any sources
if s.ends_with("-source") {
continue;
}
// Special paths (so far only `.links`)
if s.starts_with('.') {
continue;
}
}
paths.insert(store_dir.join(file_name));
}
Ok(paths)
}
/// Uploads a list of store paths to a store URI.
pub async fn upload_paths(mut paths: Vec<PathBuf>, store_uri: &str) -> Result<()> {
// When the daemon started Nix may not have been installed
let env_path = Command::new("sh")
.args(["-lc", "echo $PATH"])
.output()
.await?
.stdout;
let env_path = String::from_utf8(env_path)
.map_err(|_| Error::Config("PATH contains invalid UTF-8".to_owned()))?;
while !paths.is_empty() {
let mut batch = Vec::new();
let mut total_len = 0;
while total_len < 1024 * 1024 {
if let Some(p) = paths.pop() {
total_len += p.as_os_str().len() + 1;
batch.push(p);
} else {
break;
}
}
tracing::debug!("{} paths in this batch", batch.len());
let status = Command::new("nix")
.args(["--extra-experimental-features", "nix-command"])
.args(["copy", "--to", store_uri])
.args(&batch)
.env("PATH", &env_path)
.status()
.await?;
if status.success() {
tracing::debug!("Uploaded batch");
} else {
tracing::error!("Failed to upload batch: {:?}", status);
return Err(Error::FailedToUpload);
}
}
Ok(())
}

View file

@ -4,6 +4,9 @@
, nix , nix
, boost , boost
, darwin , darwin
, rust-analyzer
, clippy
, rustfmt
}: }:
let let
@ -21,6 +24,9 @@ in rustPlatform.buildRustPackage rec {
nativeBuildInputs = [ nativeBuildInputs = [
pkg-config pkg-config
installShellFiles installShellFiles
rust-analyzer
clippy
rustfmt
]; ];
buildInputs = [ buildInputs = [