From 03726dd5bab9bbfee29c1e500d0b0284f389315a Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Mon, 8 May 2023 12:59:57 -0600 Subject: [PATCH] Parallelize chunk uploads --- Cargo.lock | 29 +++++++++++++++++++ README.md | 2 +- gha-cache/Cargo.toml | 1 + gha-cache/src/api.rs | 69 +++++++++++++++++++++++++++++++++++--------- 4 files changed, 87 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93697c7..4d4af7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,6 +317,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -324,6 +339,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -332,6 +348,17 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.28" @@ -367,6 +394,7 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -407,6 +435,7 @@ dependencies = [ "async-trait", "bytes", "derivative", + "futures", "hex", "rand", "reqwest", diff --git a/README.md b/README.md index d154135..0dc8c1c 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ nix-store --store $PWD/test-root --extra-substituters 'http://localhost:3000' -- ## TODO - [ ] Make a GitHub Action and dogfood -- [ ] Parallelize upload +- [x] Parallelize upload - [ ] Make sure that the corresponding NAR exists before returning `.narinfo` request - [ ] Keep in-memory cache of what's present - [ ] Record what's accessed diff --git a/gha-cache/Cargo.toml b/gha-cache/Cargo.toml index 7dd31e8..088c881 100644 --- a/gha-cache/Cargo.toml +++ b/gha-cache/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" async-trait = "0.1.68" bytes = "1.4.0" derivative = "2.2.0" +futures = "0.3.28" hex = "0.4.3" rand = "0.8.5" reqwest = { version = "0.11.17", default-features = false, features = ["json", "rustls-tls-native-roots", "stream"] } diff --git a/gha-cache/src/api.rs b/gha-cache/src/api.rs index cf0c120..4a5f828 100644 --- a/gha-cache/src/api.rs +++ b/gha-cache/src/api.rs @@ -5,9 +5,11 @@ use std::fmt; #[cfg(debug_assertions)] use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; +use futures::future; use rand::{distributions::Alphanumeric, Rng}; use reqwest::{ header::{HeaderMap, HeaderValue, CONTENT_RANGE, CONTENT_TYPE}, @@ -16,7 +18,7 @@ use reqwest::{ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sha2::{Digest, Sha256}; use thiserror::Error; -use tokio::io::AsyncRead; +use tokio::{io::AsyncRead, sync::Semaphore}; use crate::credentials::Credentials; use crate::util::read_chunk_async; @@ -39,6 +41,9 @@ const DEFAULT_VERSION: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_P /// We greedily read this much from the input stream at a time. const CHUNK_SIZE: usize = 8 * 1024 * 1024; +/// The number of chunks to upload at the same time. +const MAX_CONCURRENCY: usize = 5; + type Result = std::result::Result; /// An API error. @@ -87,6 +92,9 @@ pub struct Api { /// The HTTP client for authenticated requests. client: Client, + /// The concurrent upload limit. + concurrency_limit: Arc, + /// Backend request statistics. #[cfg(debug_assertions)] stats: RequestStats, @@ -254,6 +262,7 @@ impl Api { version: initial_version, version_hasher, client, + concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)), #[cfg(debug_assertions)] stats: Default::default(), }) @@ -316,6 +325,7 @@ impl Api { { // TODO: Parallelize let mut offset = 0; + let mut futures = Vec::new(); loop { let buf = BytesMut::with_capacity(CHUNK_SIZE); let chunk = read_chunk_async(&mut stream, buf).await?; @@ -330,22 +340,55 @@ impl Api { #[cfg(debug_assertions)] self.stats.patch.fetch_add(1, Ordering::SeqCst); - self.client - .patch(self.construct_url(&format!("caches/{}", allocation.0 .0))) - .header(CONTENT_TYPE, "application/octet-stream") - .header( - CONTENT_RANGE, - format!("bytes {}-{}/*", offset, offset + chunk.len() - 1), - ) - .body(chunk) - .send() - .await? - .check() - .await?; + futures.push({ + let client = self.client.clone(); + let concurrency_limit = self.concurrency_limit.clone(); + let url = self.construct_url(&format!("caches/{}", allocation.0 .0)); + + tokio::task::spawn(async move { + let permit = concurrency_limit.acquire().await.unwrap(); + + tracing::debug!( + "Starting uploading chunk {}-{}", + offset, + offset + chunk_len - 1 + ); + + let r = client + .patch(url) + .header(CONTENT_TYPE, "application/octet-stream") + .header( + CONTENT_RANGE, + format!("bytes {}-{}/*", offset, offset + chunk.len() - 1), + ) + .body(chunk) + .send() + .await? + .check() + .await; + + tracing::debug!( + "Finished uploading chunk {}-{}: {:?}", + offset, + offset + chunk_len - 1, + r + ); + + drop(permit); + + r + }) + }); offset += chunk_len; } + future::join_all(futures) + .await + .into_iter() + .map(|join_result| join_result.unwrap()) + .collect::>()?; + self.commit_cache(allocation.0, offset).await?; Ok(())