Improve error handling in FlakeHub cache setup

Also update the token endpoint.
This commit is contained in:
Eelco Dolstra 2024-01-10 20:19:04 +01:00
parent e4bb70dba3
commit 9781bb8b6e
5 changed files with 91 additions and 55 deletions

19
Cargo.lock generated
View file

@ -125,6 +125,12 @@ dependencies = [
"syn 2.0.32", "syn 2.0.32",
] ]
[[package]]
name = "atomic"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
[[package]] [[package]]
name = "attic" name = "attic"
version = "0.2.0" version = "0.2.0"
@ -1330,6 +1336,7 @@ dependencies = [
"tower-http", "tower-http",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid",
] ]
[[package]] [[package]]
@ -2530,6 +2537,18 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
dependencies = [
"atomic",
"getrandom",
"rand",
"serde",
]
[[package]] [[package]]
name = "valuable" name = "valuable"
version = "0.1.0" version = "0.1.0"

View file

@ -31,6 +31,7 @@ attic-client = { git = "ssh://git@github.com/DeterminateSystems/attic-priv", bra
indicatif = "0.17" indicatif = "0.17"
anyhow = "1.0.71" anyhow = "1.0.71"
tempfile = "3.9" tempfile = "3.9"
uuid = { version = "1.4.0", features = ["serde", "v7", "rand", "std"] }
[dependencies.tokio] [dependencies.tokio]
version = "1.28.0" version = "1.28.0"

View file

@ -29,7 +29,31 @@ pub enum Error {
GHADisabled, GHADisabled,
#[error("FlakeHub cache error: {0}")] #[error("FlakeHub cache error: {0}")]
FlakeHub(anyhow::Error), FlakeHub(#[from] anyhow::Error),
#[error("FlakeHub HTTP error: {0}")]
FlakeHubHttp(#[from] reqwest::Error),
#[error("Got HTTP response {1} getting FlakeHub binary cache creation token from {0}: {2}")]
CacheCreation(reqwest::Url, reqwest::StatusCode, String),
#[error("netrc parse error: {0}")]
Netrc(netrc_rs::Error),
#[error("Cannot find netrc credentials for {0}")]
MissingCreds(String),
#[error("Received bad JWT")]
BadJWT,
#[error("Received bad JWT token: {0}")]
JWTParsing(#[from] jwt::Error),
#[error("Attic error: {0}")]
Attic(#[from] attic::AtticError),
#[error("Bad URL")]
BadURL(reqwest::Url),
} }
impl IntoResponse for Error { impl IntoResponse for Error {

View file

@ -8,6 +8,7 @@ use attic_client::{
config::ServerConfig, config::ServerConfig,
push::{PushConfig, Pusher}, push::{PushConfig, Pusher},
}; };
use reqwest::Url;
use serde::Deserialize; use serde::Deserialize;
use std::env; use std::env;
use std::path::Path; use std::path::Path;
@ -15,20 +16,21 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
const JWT_PREFIX: &str = "flakehub1_"; const JWT_PREFIX: &str = "flakehub1_";
const USER_AGENT: &str = "magic-nix-cache"; const USER_AGENT: &str = "magic-nix-cache";
pub struct State { pub struct State {
pub substituter: String, pub substituter: Url,
pub push_session: PushSession, pub push_session: PushSession,
} }
pub async fn init_cache( pub async fn init_cache(
flakehub_api_server: &str, flakehub_api_server: &Url,
flakehub_api_server_netrc: &Path, flakehub_api_server_netrc: &Path,
flakehub_cache_server: &str, flakehub_cache_server: &Url,
store: Arc<NixStore>, store: Arc<NixStore>,
) -> Result<State> { ) -> Result<State> {
// Parse netrc to get the credentials for api.flakehub.com. // Parse netrc to get the credentials for api.flakehub.com.
@ -36,7 +38,7 @@ pub async fn init_cache(
let mut netrc_file = File::open(flakehub_api_server_netrc).await?; let mut netrc_file = File::open(flakehub_api_server_netrc).await?;
let mut netrc_contents = String::new(); let mut netrc_contents = String::new();
netrc_file.read_to_string(&mut netrc_contents).await?; netrc_file.read_to_string(&mut netrc_contents).await?;
netrc_rs::Netrc::parse(netrc_contents, false).unwrap() netrc_rs::Netrc::parse(netrc_contents, false).map_err(Error::Netrc)?
}; };
let netrc_entry = { let netrc_entry = {
@ -44,35 +46,28 @@ pub async fn init_cache(
.machines .machines
.iter() .iter()
.find(|machine| { .find(|machine| {
machine.name.as_ref().unwrap() machine.name.as_ref() == flakehub_api_server.host().map(|x| x.to_string()).as_ref()
== &reqwest::Url::parse(flakehub_api_server)
.unwrap()
.host()
.unwrap()
.to_string()
}) })
.unwrap() .ok_or_else(|| Error::MissingCreds(flakehub_api_server.to_string()))?
.to_owned() .to_owned()
}; };
let flakehub_cache_server_hostname = reqwest::Url::parse(flakehub_cache_server) let flakehub_cache_server_hostname = flakehub_cache_server
.unwrap()
.host() .host()
.unwrap() .ok_or_else(|| Error::BadURL(flakehub_cache_server.to_owned()))?
.to_string(); .to_string();
// Append an entry for the FlakeHub cache server to netrc. // Append an entry for the FlakeHub cache server to netrc.
if !netrc if !netrc
.machines .machines
.iter() .iter()
.any(|machine| machine.name.as_ref().unwrap() == &flakehub_cache_server_hostname) .any(|machine| machine.name.as_ref() == Some(&flakehub_cache_server_hostname))
{ {
let mut netrc_file = tokio::fs::OpenOptions::new() let mut netrc_file = tokio::fs::OpenOptions::new()
.create(false) .create(false)
.append(true) .append(true)
.open(flakehub_api_server_netrc) .open(flakehub_api_server_netrc)
.await .await?;
.unwrap();
netrc_file netrc_file
.write_all( .write_all(
format!( format!(
@ -82,8 +77,7 @@ pub async fn init_cache(
) )
.as_bytes(), .as_bytes(),
) )
.await .await?;
.unwrap();
} }
// Get the cache we're supposed to use. // Get the cache we're supposed to use.
@ -91,27 +85,28 @@ pub async fn init_cache(
let github_repo = env::var("GITHUB_REPOSITORY") let github_repo = env::var("GITHUB_REPOSITORY")
.expect("GITHUB_REPOSITORY environment variable is not set"); .expect("GITHUB_REPOSITORY environment variable is not set");
let url = format!("{}/project/{}", flakehub_api_server, github_repo,); let url = flakehub_api_server
.join(&format!("project/{}", github_repo))
.unwrap();
let response = reqwest::Client::new() let response = reqwest::Client::new()
.get(&url) .get(url.to_owned())
.header("User-Agent", USER_AGENT) .header("User-Agent", USER_AGENT)
.basic_auth( .basic_auth(
netrc_entry.login.as_ref().unwrap(), netrc_entry.login.as_ref().unwrap(),
netrc_entry.password.as_ref(), netrc_entry.password.as_ref(),
) )
.send() .send()
.await .await?;
.unwrap();
if response.status().is_success() { if response.status().is_success() {
#[derive(Deserialize)] #[derive(Deserialize)]
struct ProjectInfo { struct ProjectInfo {
organization_uuid_v7: String, organization_uuid_v7: Uuid,
project_uuid_v7: String, project_uuid_v7: Uuid,
} }
let project_info = response.json::<ProjectInfo>().await.unwrap(); let project_info = response.json::<ProjectInfo>().await?;
let expected_cache_name = format!( let expected_cache_name = format!(
"{}:{}", "{}:{}",
@ -133,24 +128,24 @@ pub async fn init_cache(
// Get a token for creating and pushing to the FlakeHub binary cache. // Get a token for creating and pushing to the FlakeHub binary cache.
let (known_caches, token) = { let (known_caches, token) = {
let url = format!("{}/token/create/cache", flakehub_api_server); let url = flakehub_api_server.join("cache/token").unwrap();
let request = reqwest::Client::new() let request = reqwest::Client::new()
.post(&url) .post(url.to_owned())
.header("User-Agent", USER_AGENT) .header("User-Agent", USER_AGENT)
.basic_auth( .basic_auth(
netrc_entry.login.as_ref().unwrap(), netrc_entry.login.as_ref().unwrap(),
netrc_entry.password.as_ref(), netrc_entry.password.as_ref(),
); );
let response = request.send().await.unwrap(); let response = request.send().await?;
if !response.status().is_success() { if !response.status().is_success() {
panic!( return Err(Error::CacheCreation(
"Failed to get FlakeHub binary cache creation token from {}: {}",
url, url,
response.status() response.status(),
); response.text().await?,
));
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -158,20 +153,20 @@ pub async fn init_cache(
token: String, token: String,
} }
let token = response.json::<Response>().await.unwrap().token; let token = response.json::<Response>().await?.token;
// Parse the JWT to get the list of caches to which we have access. // Parse the JWT to get the list of caches to which we have access.
let jwt = token.strip_prefix(JWT_PREFIX).unwrap(); let jwt = token.strip_prefix(JWT_PREFIX).ok_or(Error::BadJWT)?;
let jwt_parsed: jwt::Token<jwt::Header, serde_json::Map<String, serde_json::Value>, _> = let jwt_parsed: jwt::Token<jwt::Header, serde_json::Map<String, serde_json::Value>, _> =
jwt::Token::parse_unverified(jwt).unwrap(); jwt::Token::parse_unverified(jwt)?;
let known_caches = jwt_parsed let known_caches = jwt_parsed
.claims() .claims()
.get("https://cache.flakehub.com/v1") .get("https://cache.flakehub.com/v1")
.unwrap() .ok_or(Error::BadJWT)?
.get("caches") .get("caches")
.unwrap() .ok_or(Error::BadJWT)?
.as_object() .as_object()
.unwrap(); .ok_or(Error::BadJWT)?;
(known_caches.to_owned(), token) (known_caches.to_owned(), token)
}; };
@ -193,16 +188,16 @@ pub async fn init_cache(
} }
}; };
let cache = CacheSliceIdentifier::from_str(&cache_name).unwrap(); let cache = CacheSliceIdentifier::from_str(&cache_name)?;
tracing::info!("Using cache {}.", cache); tracing::info!("Using cache {}.", cache);
// Create the cache. // Create the cache.
let api = ApiClient::from_server_config(ServerConfig { let api = ApiClient::from_server_config(ServerConfig {
endpoint: flakehub_cache_server.to_owned(), endpoint: flakehub_cache_server.to_string(),
//token: netrc_entry.password.as_ref().cloned(),
token: Some(token.to_owned()), token: Some(token.to_owned()),
}) })?;
.unwrap();
let request = CreateCacheRequest { let request = CreateCacheRequest {
keypair: KeypairConfig::Generate, keypair: KeypairConfig::Generate,
@ -218,14 +213,14 @@ pub async fn init_cache(
tracing::info!("Cache {} already exists.", cache_name); tracing::info!("Cache {} already exists.", cache_name);
} }
_ => { _ => {
panic!("{:?}", err); return Err(Error::FlakeHub(err));
} }
} }
} else { } else {
tracing::info!("Created cache {} on {}.", cache_name, flakehub_cache_server); tracing::info!("Created cache {} on {}.", cache_name, flakehub_cache_server);
} }
let cache_config = api.get_cache_config(&cache).await.unwrap(); let cache_config = api.get_cache_config(&cache).await?;
let push_config = PushConfig { let push_config = PushConfig {
num_workers: 5, // FIXME: use number of CPUs? num_workers: 5, // FIXME: use number of CPUs?
@ -254,10 +249,7 @@ pub async fn init_cache(
} }
pub async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> { pub async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
state state.push_session.queue_many(store_paths)?;
.push_session
.queue_many(store_paths)
.map_err(Error::FlakeHub)?;
Ok(()) Ok(())
} }

View file

@ -81,7 +81,7 @@ struct Args {
/// The FlakeHub API server. /// The FlakeHub API server.
#[arg(long)] #[arg(long)]
flakehub_api_server: Option<String>, flakehub_api_server: Option<reqwest::Url>,
/// The path of the `netrc` file that contains the FlakeHub JWT token. /// The path of the `netrc` file that contains the FlakeHub JWT token.
#[arg(long)] #[arg(long)]
@ -89,7 +89,7 @@ struct Args {
/// The FlakeHub binary cache server. /// The FlakeHub binary cache server.
#[arg(long)] #[arg(long)]
flakehub_cache_server: Option<String>, flakehub_cache_server: Option<reqwest::Url>,
/// The location of `nix.conf`. /// The location of `nix.conf`.
#[arg(long)] #[arg(long)]
@ -186,16 +186,16 @@ async fn main_cli() {
) )
.expect("Writing to nix.conf"); .expect("Writing to nix.conf");
tracing::info!("Attic cache is enabled."); tracing::info!("FlakeHub cache is enabled.");
Some(state) Some(state)
} }
Err(err) => { Err(err) => {
tracing::error!("Attic cache initialization failed: {}", err); tracing::error!("FlakeHub cache initialization failed: {}", err);
None None
} }
} }
} else { } else {
tracing::info!("Attic cache is disabled."); tracing::info!("FlakeHub cache is disabled.");
None None
}; };