This commit is contained in:
Eelco Dolstra 2024-02-29 20:43:25 +01:00
parent 625e95f484
commit 334bcc7df9
4 changed files with 20 additions and 10 deletions

1
Cargo.lock generated
View file

@ -2467,6 +2467,7 @@ name = "magic-nix-cache"
version = "0.1.1"
dependencies = [
"anyhow",
"async-compression",
"attic",
"attic-client",
"attic-server",

View file

@ -319,8 +319,8 @@ impl Api {
Err(Error::TooManyCollisions)
}
/// Uploads a file.
pub async fn upload_file<S>(&self, allocation: FileAllocation, mut stream: S) -> Result<()>
/// Uploads a file. Returns the size of the file.
pub async fn upload_file<S>(&self, allocation: FileAllocation, mut stream: S) -> Result<usize>
where
S: AsyncRead + Unpin + Send,
{
@ -396,7 +396,7 @@ impl Api {
self.commit_cache(allocation.0, offset).await?;
Ok(())
Ok(offset)
}
/// Downloads a file based on a list of key prefixes.

View file

@ -34,6 +34,7 @@ anyhow = "1.0.71"
tempfile = "3.9"
uuid = { version = "1.4.0", features = ["serde", "v7", "rand", "std"] }
futures = "0.3"
async-compression = "0.4"
[dependencies.tokio]
version = "1.28.0"

View file

@ -2,6 +2,7 @@ use std::{collections::HashSet, sync::Arc};
use crate::error::{Error, Result};
use crate::telemetry;
use async_compression::tokio::bufread::ZstdEncoder;
use attic::nix_store::{NixStore, StorePath, ValidPathInfo};
use attic_server::narinfo::{Compression, NarInfo};
use futures::stream::StreamExt;
@ -134,7 +135,7 @@ async fn upload_path(
let path_info = store.query_path_info(path.clone()).await?;
// Upload the NAR.
let nar_path = format!("nar/{}.nar", path_info.nar_hash.to_base32());
let nar_path = format!("{}.nar.zstd", path_info.nar_hash.to_base32());
let nar_allocation = api.allocate_file_with_random_suffix(&nar_path).await?;
@ -142,26 +143,33 @@ async fn upload_path(
let mut nar: Vec<u8> = vec![];
// FIXME: make this streaming and compress.
// FIXME: make this streaming.
while let Some(data) = nar_stream.next().await {
nar.append(&mut data?);
}
tracing::info!("Uploading NAR {} (size {})", nar_path, nar.len());
let reader = ZstdEncoder::new(&nar[..]);
api.upload_file(nar_allocation, &nar[..]).await?;
let compressed_nar_size = api.upload_file(nar_allocation, reader).await?;
metrics.nars_uploaded.incr();
tracing::info!(
"Uploaded '{}' (size {} -> {})",
nar_path,
path_info.nar_size,
compressed_nar_size
);
// Upload the narinfo.
let narinfo_path = format!("{}.narinfo", path.to_hash());
let narinfo_allocation = api.allocate_file_with_random_suffix(&narinfo_path).await?;
let narinfo = path_info_to_nar_info(store.clone(), &path_info, nar_path)
let narinfo = path_info_to_nar_info(store.clone(), &path_info, format!("nar/{}", nar_path))
.to_string()
.unwrap();
tracing::info!("Uploading {}: {}", narinfo_path, narinfo);
tracing::debug!("Uploading '{}'", narinfo_path);
api.upload_file(narinfo_allocation, narinfo.as_bytes())
.await?;
@ -175,7 +183,7 @@ fn path_info_to_nar_info(store: Arc<NixStore>, path_info: &ValidPathInfo, url: S
NarInfo {
store_path: store.get_full_path(&path_info.path),
url,
compression: Compression::None,
compression: Compression::Zstd,
file_hash: None,
file_size: None,
nar_hash: path_info.nar_hash.clone(),