From 334bcc7df98079fa1e0a82cde9df52e7f990f5c5 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Thu, 29 Feb 2024 20:43:25 +0100 Subject: [PATCH] Compress --- Cargo.lock | 1 + gha-cache/src/api.rs | 6 +++--- magic-nix-cache/Cargo.toml | 1 + magic-nix-cache/src/gha.rs | 22 +++++++++++++++------- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55920ec..58e94d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2467,6 +2467,7 @@ name = "magic-nix-cache" version = "0.1.1" dependencies = [ "anyhow", + "async-compression", "attic", "attic-client", "attic-server", diff --git a/gha-cache/src/api.rs b/gha-cache/src/api.rs index bab198a..a04a10b 100644 --- a/gha-cache/src/api.rs +++ b/gha-cache/src/api.rs @@ -319,8 +319,8 @@ impl Api { Err(Error::TooManyCollisions) } - /// Uploads a file. - pub async fn upload_file(&self, allocation: FileAllocation, mut stream: S) -> Result<()> + /// Uploads a file. Returns the size of the file. + pub async fn upload_file(&self, allocation: FileAllocation, mut stream: S) -> Result where S: AsyncRead + Unpin + Send, { @@ -396,7 +396,7 @@ impl Api { self.commit_cache(allocation.0, offset).await?; - Ok(()) + Ok(offset) } /// Downloads a file based on a list of key prefixes. diff --git a/magic-nix-cache/Cargo.toml b/magic-nix-cache/Cargo.toml index ed33a36..c7df38c 100644 --- a/magic-nix-cache/Cargo.toml +++ b/magic-nix-cache/Cargo.toml @@ -34,6 +34,7 @@ anyhow = "1.0.71" tempfile = "3.9" uuid = { version = "1.4.0", features = ["serde", "v7", "rand", "std"] } futures = "0.3" +async-compression = "0.4" [dependencies.tokio] version = "1.28.0" diff --git a/magic-nix-cache/src/gha.rs b/magic-nix-cache/src/gha.rs index 6ede0f0..32ae036 100644 --- a/magic-nix-cache/src/gha.rs +++ b/magic-nix-cache/src/gha.rs @@ -2,6 +2,7 @@ use std::{collections::HashSet, sync::Arc}; use crate::error::{Error, Result}; use crate::telemetry; +use async_compression::tokio::bufread::ZstdEncoder; use attic::nix_store::{NixStore, StorePath, ValidPathInfo}; use attic_server::narinfo::{Compression, NarInfo}; use futures::stream::StreamExt; @@ -134,7 +135,7 @@ async fn upload_path( let path_info = store.query_path_info(path.clone()).await?; // Upload the NAR. - let nar_path = format!("nar/{}.nar", path_info.nar_hash.to_base32()); + let nar_path = format!("{}.nar.zstd", path_info.nar_hash.to_base32()); let nar_allocation = api.allocate_file_with_random_suffix(&nar_path).await?; @@ -142,26 +143,33 @@ async fn upload_path( let mut nar: Vec = vec![]; - // FIXME: make this streaming and compress. + // FIXME: make this streaming. while let Some(data) = nar_stream.next().await { nar.append(&mut data?); } - tracing::info!("Uploading NAR {} (size {})", nar_path, nar.len()); + let reader = ZstdEncoder::new(&nar[..]); - api.upload_file(nar_allocation, &nar[..]).await?; + let compressed_nar_size = api.upload_file(nar_allocation, reader).await?; metrics.nars_uploaded.incr(); + tracing::info!( + "Uploaded '{}' (size {} -> {})", + nar_path, + path_info.nar_size, + compressed_nar_size + ); + // Upload the narinfo. let narinfo_path = format!("{}.narinfo", path.to_hash()); let narinfo_allocation = api.allocate_file_with_random_suffix(&narinfo_path).await?; - let narinfo = path_info_to_nar_info(store.clone(), &path_info, nar_path) + let narinfo = path_info_to_nar_info(store.clone(), &path_info, format!("nar/{}", nar_path)) .to_string() .unwrap(); - tracing::info!("Uploading {}: {}", narinfo_path, narinfo); + tracing::debug!("Uploading '{}'", narinfo_path); api.upload_file(narinfo_allocation, narinfo.as_bytes()) .await?; @@ -175,7 +183,7 @@ fn path_info_to_nar_info(store: Arc, path_info: &ValidPathInfo, url: S NarInfo { store_path: store.get_full_path(&path_info.path), url, - compression: Compression::None, + compression: Compression::Zstd, file_hash: None, file_size: None, nar_hash: path_info.nar_hash.clone(),