Parallelize chunk uploads

This commit is contained in:
Zhaofeng Li 2023-05-08 12:59:57 -06:00
parent f698478b4f
commit 03726dd5ba
4 changed files with 87 additions and 14 deletions

29
Cargo.lock generated
View file

@ -317,6 +317,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.28"
@ -324,6 +339,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -332,6 +348,17 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]]
name = "futures-executor"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.28"
@ -367,6 +394,7 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@ -407,6 +435,7 @@ dependencies = [
"async-trait",
"bytes",
"derivative",
"futures",
"hex",
"rand",
"reqwest",

View file

@ -19,7 +19,7 @@ nix-store --store $PWD/test-root --extra-substituters 'http://localhost:3000' --
## TODO
- [ ] Make a GitHub Action and dogfood
- [ ] Parallelize upload
- [x] Parallelize upload
- [ ] Make sure that the corresponding NAR exists before returning `.narinfo` request
- [ ] Keep in-memory cache of what's present
- [ ] Record what's accessed

View file

@ -7,6 +7,7 @@ edition = "2021"
async-trait = "0.1.68"
bytes = "1.4.0"
derivative = "2.2.0"
futures = "0.3.28"
hex = "0.4.3"
rand = "0.8.5"
reqwest = { version = "0.11.17", default-features = false, features = ["json", "rustls-tls-native-roots", "stream"] }

View file

@ -5,9 +5,11 @@
use std::fmt;
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use futures::future;
use rand::{distributions::Alphanumeric, Rng};
use reqwest::{
header::{HeaderMap, HeaderValue, CONTENT_RANGE, CONTENT_TYPE},
@ -16,7 +18,7 @@ use reqwest::{
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sha2::{Digest, Sha256};
use thiserror::Error;
use tokio::io::AsyncRead;
use tokio::{io::AsyncRead, sync::Semaphore};
use crate::credentials::Credentials;
use crate::util::read_chunk_async;
@ -39,6 +41,9 @@ const DEFAULT_VERSION: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_P
/// We greedily read this much from the input stream at a time.
const CHUNK_SIZE: usize = 8 * 1024 * 1024;
/// The number of chunks to upload at the same time.
const MAX_CONCURRENCY: usize = 5;
type Result<T> = std::result::Result<T, Error>;
/// An API error.
@ -87,6 +92,9 @@ pub struct Api {
/// The HTTP client for authenticated requests.
client: Client,
/// The concurrent upload limit.
concurrency_limit: Arc<Semaphore>,
/// Backend request statistics.
#[cfg(debug_assertions)]
stats: RequestStats,
@ -254,6 +262,7 @@ impl Api {
version: initial_version,
version_hasher,
client,
concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)),
#[cfg(debug_assertions)]
stats: Default::default(),
})
@ -316,6 +325,7 @@ impl Api {
{
// TODO: Parallelize
let mut offset = 0;
let mut futures = Vec::new();
loop {
let buf = BytesMut::with_capacity(CHUNK_SIZE);
let chunk = read_chunk_async(&mut stream, buf).await?;
@ -330,22 +340,55 @@ impl Api {
#[cfg(debug_assertions)]
self.stats.patch.fetch_add(1, Ordering::SeqCst);
self.client
.patch(self.construct_url(&format!("caches/{}", allocation.0 .0)))
.header(CONTENT_TYPE, "application/octet-stream")
.header(
CONTENT_RANGE,
format!("bytes {}-{}/*", offset, offset + chunk.len() - 1),
)
.body(chunk)
.send()
.await?
.check()
.await?;
futures.push({
let client = self.client.clone();
let concurrency_limit = self.concurrency_limit.clone();
let url = self.construct_url(&format!("caches/{}", allocation.0 .0));
tokio::task::spawn(async move {
let permit = concurrency_limit.acquire().await.unwrap();
tracing::debug!(
"Starting uploading chunk {}-{}",
offset,
offset + chunk_len - 1
);
let r = client
.patch(url)
.header(CONTENT_TYPE, "application/octet-stream")
.header(
CONTENT_RANGE,
format!("bytes {}-{}/*", offset, offset + chunk.len() - 1),
)
.body(chunk)
.send()
.await?
.check()
.await;
tracing::debug!(
"Finished uploading chunk {}-{}: {:?}",
offset,
offset + chunk_len - 1,
r
);
drop(permit);
r
})
});
offset += chunk_len;
}
future::join_all(futures)
.await
.into_iter()
.map(|join_result| join_result.unwrap())
.collect::<Result<()>>()?;
self.commit_cache(allocation.0, offset).await?;
Ok(())