tree: upgrade axum, hyper, etc

This commit is contained in:
Cole Mickens 2024-08-07 17:44:18 -07:00
parent 97a583df58
commit 4c0a2510c1
6 changed files with 588 additions and 408 deletions

940
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -11,7 +11,7 @@ derivative = { version = "2.2.0", default-features = false }
futures = { version = "0.3.28", default-features = false, features = ["alloc"] } futures = { version = "0.3.28", default-features = false, features = ["alloc"] }
hex = "0.4.3" hex = "0.4.3"
rand = { version = "0.8.5", default-features = false, features = ["std", "std_rng"] } rand = { version = "0.8.5", default-features = false, features = ["std", "std_rng"] }
reqwest = { version = "0.11.17", default-features = false, features = ["json", "rustls-tls-native-roots", "stream", "trust-dns"] } reqwest = { version = "0.12.5", default-features = false, features = ["json", "rustls-tls-native-roots", "stream", "trust-dns"] }
serde = { version = "1.0.162", default-features = false, features = ["derive"] } serde = { version = "1.0.162", default-features = false, features = ["derive"] }
serde_json = { version = "1.0.96", default-features = false } serde_json = { version = "1.0.96", default-features = false }
sha2 = { version = "0.10.6", default-features = false } sha2 = { version = "0.10.6", default-features = false }

View file

@ -7,11 +7,13 @@ license = "Apache-2.0"
[dependencies] [dependencies]
gha-cache = { path = "../gha-cache" } gha-cache = { path = "../gha-cache" }
axum = { version = "0.6.18", default-features = false, features = [ axum = { version = "0.7.5", default-features = false, features = [
"json", "json",
"tokio", "tokio",
"http2",
"macros"
] } ] }
axum-macros = "0.3.7" # axum-macros = "0.3.7"
clap = { version = "4.2.7", default-features = false, features = [ clap = { version = "4.2.7", default-features = false, features = [
"std", "std",
"derive", "derive",
@ -26,19 +28,20 @@ tracing-subscriber = { version = "0.3.17", default-features = false, features =
"tracing-log", "tracing-log",
"smallvec", "smallvec",
] } ] }
tower-http = { version = "0.4.0", features = ["trace"] } tower-http = { version = "0.5.2", features = ["trace"] }
serde = { version = "1.0.162", features = ["derive"] } serde = { version = "1.0.162", features = ["derive"] }
serde_json = { version = "1.0.96", default-features = false } serde_json = { version = "1.0.96", default-features = false }
thiserror = "1.0.40" thiserror = "1.0.40"
tokio-stream = { version = "0.1.14", default-features = false } tokio-stream = { version = "0.1.15", default-features = false }
tokio-util = { version = "0.7.8", features = ["io", "compat"] } tokio-util = { version = "0.7.11", features = ["io", "compat"] }
daemonize = "0.5.0" daemonize = "0.5.0"
is_ci = "1.1.1" is_ci = "1.1.1"
sha2 = { version = "0.10.6", default-features = false } sha2 = { version = "0.10.6", default-features = false }
reqwest = { version = "0.11.17", default-features = false, features = [ reqwest = { version = "0.12.5", default-features = false, features = [
"blocking", "blocking",
"rustls-tls-native-roots", "rustls-tls-native-roots",
"trust-dns", "trust-dns",
"json"
] } ] }
netrc-rs = "0.1.2" netrc-rs = "0.1.2"
attic = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" } attic = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" }
@ -51,6 +54,10 @@ uuid = { version = "1.4.0", features = ["serde", "v7", "rand", "std"] }
futures = "0.3" futures = "0.3"
async-compression = "0.4" async-compression = "0.4"
tracing-appender = "0.2.3" tracing-appender = "0.2.3"
http = "1.0"
http-body-util = "0.1"
hyper = { version = "1.0.0", features = ["full"] }
hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] }
[dependencies.tokio] [dependencies.tokio]
version = "1.28.0" version = "1.28.0"

View file

@ -4,7 +4,6 @@
use attic::nix_store::StorePath; use attic::nix_store::StorePath;
use axum::{extract::Extension, routing::post, Json, Router}; use axum::{extract::Extension, routing::post, Json, Router};
use axum_macros::debug_handler;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::State; use super::State;
@ -30,7 +29,6 @@ pub fn get_router() -> Router {
} }
/// Record existing paths. /// Record existing paths.
#[debug_handler]
async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> { async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> {
tracing::info!("Workflow started"); tracing::info!("Workflow started");
let reply = if let Some(original_paths) = &state.original_paths { let reply = if let Some(original_paths) = &state.original_paths {

View file

@ -1,14 +1,12 @@
//! Binary Cache API. //! Binary Cache API.
use std::io;
use axum::{ use axum::{
extract::{BodyStream, Extension, Path}, extract::{Extension, Path},
response::Redirect, response::Redirect,
routing::{get, put}, routing::{get, put},
Router, Router,
}; };
use tokio_stream::StreamExt; use futures::StreamExt as _;
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
use super::State; use super::State;
@ -76,10 +74,11 @@ async fn get_narinfo(
pull_through(&state, &path) pull_through(&state, &path)
} }
#[axum::debug_handler]
async fn put_narinfo( async fn put_narinfo(
Extension(state): Extension<State>, Extension(state): Extension<State>,
Path(path): Path<String>, Path(path): Path<String>,
body: BodyStream, body: axum::body::Body,
) -> Result<()> { ) -> Result<()> {
let components: Vec<&str> = path.splitn(2, '.').collect(); let components: Vec<&str> = path.splitn(2, '.').collect();
@ -96,9 +95,13 @@ async fn put_narinfo(
let store_path_hash = components[0].to_string(); let store_path_hash = components[0].to_string();
let key = format!("{}.narinfo", store_path_hash); let key = format!("{}.narinfo", store_path_hash);
let allocation = gha_cache.api.allocate_file_with_random_suffix(&key).await?; let allocation = gha_cache.api.allocate_file_with_random_suffix(&key).await?;
let body_stream = body.into_data_stream();
let stream = StreamReader::new( let stream = StreamReader::new(
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))), body_stream
.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
); );
gha_cache.api.upload_file(allocation, stream).await?; gha_cache.api.upload_file(allocation, stream).await?;
state.metrics.narinfos_uploaded.incr(); state.metrics.narinfos_uploaded.incr();
@ -135,7 +138,7 @@ async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -
async fn put_nar( async fn put_nar(
Extension(state): Extension<State>, Extension(state): Extension<State>,
Path(path): Path<String>, Path(path): Path<String>,
body: BodyStream, body: axum::body::Body,
) -> Result<()> { ) -> Result<()> {
let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?; let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?;
@ -143,9 +146,13 @@ async fn put_nar(
.api .api
.allocate_file_with_random_suffix(&path) .allocate_file_with_random_suffix(&path)
.await?; .await?;
let body_stream = body.into_data_stream();
let stream = StreamReader::new( let stream = StreamReader::new(
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))), body_stream
.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
); );
gha_cache.api.upload_file(allocation, stream).await?; gha_cache.api.upload_file(allocation, stream).await?;
state.metrics.nars_uploaded.incr(); state.metrics.nars_uploaded.incr();

View file

@ -430,8 +430,8 @@ async fn main_cli() -> Result<()> {
tracing::debug!("Created startup notification file at {startup_notification_file_path:?}"); tracing::debug!("Created startup notification file at {startup_notification_file_path:?}");
} }
let ret = axum::Server::bind(&args.listen) let listener = tokio::net::TcpListener::bind(&args.listen).await?;
.serve(app.into_make_service()) let ret = axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(async move { .with_graceful_shutdown(async move {
shutdown_receiver.await.ok(); shutdown_receiver.await.ok();
tracing::info!("Shutting down"); tracing::info!("Shutting down");
@ -568,10 +568,10 @@ fn init_logging() -> Result<LogGuard> {
} }
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
async fn dump_api_stats<B>( async fn dump_api_stats(
Extension(state): Extension<State>, Extension(state): Extension<State>,
request: axum::http::Request<B>, request: axum::http::Request<axum::body::Body>,
next: axum::middleware::Next<B>, next: axum::middleware::Next,
) -> axum::response::Response { ) -> axum::response::Response {
if let Some(gha_cache) = &state.gha_cache { if let Some(gha_cache) = &state.gha_cache {
gha_cache.api.dump_stats(); gha_cache.api.dump_stats();