diff --git a/Cargo.lock b/Cargo.lock index 58e94d2..9c061d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4680,6 +4680,7 @@ checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/magic-nix-cache/Cargo.toml b/magic-nix-cache/Cargo.toml index c7df38c..90c1d2c 100644 --- a/magic-nix-cache/Cargo.toml +++ b/magic-nix-cache/Cargo.toml @@ -17,7 +17,7 @@ serde = { version = "1.0.162", features = ["derive"] } serde_json = { version = "1.0.96", default-features = false } thiserror = "1.0.40" tokio-stream = { version = "0.1.14", default-features = false } -tokio-util = { version = "0.7.8", features = ["io"] } +tokio-util = { version = "0.7.8", features = ["io", "compat"] } daemonize = "0.5.0" is_ci = "1.1.1" sha2 = { version = "0.10.6", default-features = false } diff --git a/magic-nix-cache/src/gha.rs b/magic-nix-cache/src/gha.rs index 32ae036..68865d8 100644 --- a/magic-nix-cache/src/gha.rs +++ b/magic-nix-cache/src/gha.rs @@ -5,12 +5,13 @@ use crate::telemetry; use async_compression::tokio::bufread::ZstdEncoder; use attic::nix_store::{NixStore, StorePath, ValidPathInfo}; use attic_server::narinfo::{Compression, NarInfo}; -use futures::stream::StreamExt; +use futures::stream::TryStreamExt; use gha_cache::{Api, Credentials}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, RwLock, }; +use tokio_util::compat::FuturesAsyncReadCompatExt; pub struct GhaCache { /// The GitHub Actions Cache API. @@ -139,18 +140,15 @@ async fn upload_path( let nar_allocation = api.allocate_file_with_random_suffix(&nar_path).await?; - let mut nar_stream = store.nar_from_path(path.clone()); + let nar_stream = store.nar_from_path(path.clone()); - let mut nar: Vec = vec![]; + let nar_reader = nar_stream + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + .into_async_read(); - // FIXME: make this streaming. - while let Some(data) = nar_stream.next().await { - nar.append(&mut data?); - } + let nar_compressor = ZstdEncoder::new(nar_reader.compat()); - let reader = ZstdEncoder::new(&nar[..]); - - let compressed_nar_size = api.upload_file(nar_allocation, reader).await?; + let compressed_nar_size = api.upload_file(nar_allocation, nar_compressor).await?; metrics.nars_uploaded.incr(); tracing::info!(