Make NAR upload streaming
This commit is contained in:
parent
334bcc7df9
commit
f16e3c292a
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -4680,6 +4680,7 @@ checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
"futures-sink",
|
"futures-sink",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|
|
@ -17,7 +17,7 @@ serde = { version = "1.0.162", features = ["derive"] }
|
||||||
serde_json = { version = "1.0.96", default-features = false }
|
serde_json = { version = "1.0.96", default-features = false }
|
||||||
thiserror = "1.0.40"
|
thiserror = "1.0.40"
|
||||||
tokio-stream = { version = "0.1.14", default-features = false }
|
tokio-stream = { version = "0.1.14", default-features = false }
|
||||||
tokio-util = { version = "0.7.8", features = ["io"] }
|
tokio-util = { version = "0.7.8", features = ["io", "compat"] }
|
||||||
daemonize = "0.5.0"
|
daemonize = "0.5.0"
|
||||||
is_ci = "1.1.1"
|
is_ci = "1.1.1"
|
||||||
sha2 = { version = "0.10.6", default-features = false }
|
sha2 = { version = "0.10.6", default-features = false }
|
||||||
|
|
|
@ -5,12 +5,13 @@ use crate::telemetry;
|
||||||
use async_compression::tokio::bufread::ZstdEncoder;
|
use async_compression::tokio::bufread::ZstdEncoder;
|
||||||
use attic::nix_store::{NixStore, StorePath, ValidPathInfo};
|
use attic::nix_store::{NixStore, StorePath, ValidPathInfo};
|
||||||
use attic_server::narinfo::{Compression, NarInfo};
|
use attic_server::narinfo::{Compression, NarInfo};
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::TryStreamExt;
|
||||||
use gha_cache::{Api, Credentials};
|
use gha_cache::{Api, Credentials};
|
||||||
use tokio::sync::{
|
use tokio::sync::{
|
||||||
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||||
RwLock,
|
RwLock,
|
||||||
};
|
};
|
||||||
|
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||||
|
|
||||||
pub struct GhaCache {
|
pub struct GhaCache {
|
||||||
/// The GitHub Actions Cache API.
|
/// The GitHub Actions Cache API.
|
||||||
|
@ -139,18 +140,15 @@ async fn upload_path(
|
||||||
|
|
||||||
let nar_allocation = api.allocate_file_with_random_suffix(&nar_path).await?;
|
let nar_allocation = api.allocate_file_with_random_suffix(&nar_path).await?;
|
||||||
|
|
||||||
let mut nar_stream = store.nar_from_path(path.clone());
|
let nar_stream = store.nar_from_path(path.clone());
|
||||||
|
|
||||||
let mut nar: Vec<u8> = vec![];
|
let nar_reader = nar_stream
|
||||||
|
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
|
||||||
|
.into_async_read();
|
||||||
|
|
||||||
// FIXME: make this streaming.
|
let nar_compressor = ZstdEncoder::new(nar_reader.compat());
|
||||||
while let Some(data) = nar_stream.next().await {
|
|
||||||
nar.append(&mut data?);
|
|
||||||
}
|
|
||||||
|
|
||||||
let reader = ZstdEncoder::new(&nar[..]);
|
let compressed_nar_size = api.upload_file(nar_allocation, nar_compressor).await?;
|
||||||
|
|
||||||
let compressed_nar_size = api.upload_file(nar_allocation, reader).await?;
|
|
||||||
metrics.nars_uploaded.incr();
|
metrics.nars_uploaded.incr();
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
|
|
Loading…
Reference in a new issue