Merge pull request #10 from DeterminateSystems/eelcodolstra/fh-159-magic-nix-cache-handle-flakehub-errors-more-gracefully
Improve error handling in FlakeHub cache setup
This commit is contained in:
commit
ac64bcd221
65
Cargo.lock
generated
65
Cargo.lock
generated
|
@ -125,13 +125,19 @@ dependencies = [
|
|||
"syn 2.0.32",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
|
||||
|
||||
[[package]]
|
||||
name = "attic"
|
||||
version = "0.2.0"
|
||||
source = "git+ssh://git@github.com/DeterminateSystems/attic-priv?branch=main#a16c0f4cf1abe471ac69731bf3cfe9a8d2eedd5e"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"base64 0.21.2",
|
||||
"base64",
|
||||
"bytes",
|
||||
"cxx",
|
||||
"cxx-build",
|
||||
|
@ -249,12 +255,6 @@ dependencies = [
|
|||
"syn 2.0.32",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.21.2"
|
||||
|
@ -617,7 +617,6 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
|
|||
dependencies = [
|
||||
"block-buffer",
|
||||
"crypto-common",
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -924,15 +923,6 @@ version = "0.4.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
|
||||
|
||||
[[package]]
|
||||
name = "hmac"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
|
||||
dependencies = [
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hostname"
|
||||
version = "0.3.1"
|
||||
|
@ -1203,21 +1193,6 @@ dependencies = [
|
|||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jwt"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6204285f77fe7d9784db3fdc449ecce1a0114927a51d5a41c4c7a292011c015f"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"crypto-common",
|
||||
"digest",
|
||||
"hmac",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kqueue"
|
||||
version = "1.0.8"
|
||||
|
@ -1316,7 +1291,6 @@ dependencies = [
|
|||
"gha-cache",
|
||||
"indicatif",
|
||||
"is_ci",
|
||||
"jwt",
|
||||
"netrc-rs",
|
||||
"reqwest",
|
||||
"serde",
|
||||
|
@ -1330,6 +1304,7 @@ dependencies = [
|
|||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1659,7 +1634,7 @@ version = "0.11.18"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55"
|
||||
dependencies = [
|
||||
"base64 0.21.2",
|
||||
"base64",
|
||||
"bytes",
|
||||
"encoding_rs",
|
||||
"futures-core",
|
||||
|
@ -1792,7 +1767,7 @@ version = "1.0.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
|
||||
dependencies = [
|
||||
"base64 0.21.2",
|
||||
"base64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1947,7 +1922,7 @@ version = "3.4.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23"
|
||||
dependencies = [
|
||||
"base64 0.21.2",
|
||||
"base64",
|
||||
"chrono",
|
||||
"hex",
|
||||
"indexmap 1.9.3",
|
||||
|
@ -2058,12 +2033,6 @@ version = "0.10.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
|
||||
|
||||
[[package]]
|
||||
name = "subtle"
|
||||
version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.109"
|
||||
|
@ -2530,6 +2499,18 @@ version = "0.2.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
|
||||
dependencies = [
|
||||
"atomic",
|
||||
"getrandom",
|
||||
"rand",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "valuable"
|
||||
version = "0.1.0"
|
||||
|
|
|
@ -23,7 +23,6 @@ is_ci = "1.1.1"
|
|||
sha2 = { version = "0.10.6", default-features = false }
|
||||
reqwest = { version = "0.11.17", default-features = false, features = ["blocking", "rustls-tls-native-roots", "trust-dns"] }
|
||||
netrc-rs = "0.1.2"
|
||||
jwt = { version = "0.16" }
|
||||
attic = { git = "ssh://git@github.com/DeterminateSystems/attic-priv", branch = "main" }
|
||||
#attic = { path = "../../attic-priv/attic" }
|
||||
attic-client = { git = "ssh://git@github.com/DeterminateSystems/attic-priv", branch = "main" }
|
||||
|
@ -31,6 +30,7 @@ attic-client = { git = "ssh://git@github.com/DeterminateSystems/attic-priv", bra
|
|||
indicatif = "0.17"
|
||||
anyhow = "1.0.71"
|
||||
tempfile = "3.9"
|
||||
uuid = { version = "1.4.0", features = ["serde", "v7", "rand", "std"] }
|
||||
|
||||
[dependencies.tokio]
|
||||
version = "1.28.0"
|
||||
|
|
|
@ -29,7 +29,25 @@ pub enum Error {
|
|||
GHADisabled,
|
||||
|
||||
#[error("FlakeHub cache error: {0}")]
|
||||
FlakeHub(anyhow::Error),
|
||||
FlakeHub(#[from] anyhow::Error),
|
||||
|
||||
#[error("FlakeHub HTTP error: {0}")]
|
||||
FlakeHubHttp(#[from] reqwest::Error),
|
||||
|
||||
#[error("Got HTTP response {0} getting the cache name from FlakeHub: {1}")]
|
||||
GetCacheName(reqwest::StatusCode, String),
|
||||
|
||||
#[error("netrc parse error: {0}")]
|
||||
Netrc(netrc_rs::Error),
|
||||
|
||||
#[error("Cannot find netrc credentials for {0}")]
|
||||
MissingCreds(String),
|
||||
|
||||
#[error("Attic error: {0}")]
|
||||
Attic(#[from] attic::AtticError),
|
||||
|
||||
#[error("Bad URL")]
|
||||
BadUrl(reqwest::Url),
|
||||
}
|
||||
|
||||
impl IntoResponse for Error {
|
||||
|
|
|
@ -8,6 +8,7 @@ use attic_client::{
|
|||
config::ServerConfig,
|
||||
push::{PushConfig, Pusher},
|
||||
};
|
||||
use reqwest::Url;
|
||||
use serde::Deserialize;
|
||||
use std::env;
|
||||
use std::path::Path;
|
||||
|
@ -15,20 +16,20 @@ use std::str::FromStr;
|
|||
use std::sync::Arc;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use uuid::Uuid;
|
||||
|
||||
const JWT_PREFIX: &str = "flakehub1_";
|
||||
const USER_AGENT: &str = "magic-nix-cache";
|
||||
|
||||
pub struct State {
|
||||
pub substituter: String,
|
||||
pub substituter: Url,
|
||||
|
||||
pub push_session: PushSession,
|
||||
}
|
||||
|
||||
pub async fn init_cache(
|
||||
flakehub_api_server: &str,
|
||||
flakehub_api_server: &Url,
|
||||
flakehub_api_server_netrc: &Path,
|
||||
flakehub_cache_server: &str,
|
||||
flakehub_cache_server: &Url,
|
||||
store: Arc<NixStore>,
|
||||
) -> Result<State> {
|
||||
// Parse netrc to get the credentials for api.flakehub.com.
|
||||
|
@ -36,7 +37,7 @@ pub async fn init_cache(
|
|||
let mut netrc_file = File::open(flakehub_api_server_netrc).await?;
|
||||
let mut netrc_contents = String::new();
|
||||
netrc_file.read_to_string(&mut netrc_contents).await?;
|
||||
netrc_rs::Netrc::parse(netrc_contents, false).unwrap()
|
||||
netrc_rs::Netrc::parse(netrc_contents, false).map_err(Error::Netrc)?
|
||||
};
|
||||
|
||||
let netrc_entry = {
|
||||
|
@ -44,35 +45,28 @@ pub async fn init_cache(
|
|||
.machines
|
||||
.iter()
|
||||
.find(|machine| {
|
||||
machine.name.as_ref().unwrap()
|
||||
== &reqwest::Url::parse(flakehub_api_server)
|
||||
.unwrap()
|
||||
.host()
|
||||
.unwrap()
|
||||
.to_string()
|
||||
machine.name.as_ref() == flakehub_api_server.host().map(|x| x.to_string()).as_ref()
|
||||
})
|
||||
.unwrap()
|
||||
.ok_or_else(|| Error::MissingCreds(flakehub_api_server.to_string()))?
|
||||
.to_owned()
|
||||
};
|
||||
|
||||
let flakehub_cache_server_hostname = reqwest::Url::parse(flakehub_cache_server)
|
||||
.unwrap()
|
||||
let flakehub_cache_server_hostname = flakehub_cache_server
|
||||
.host()
|
||||
.unwrap()
|
||||
.ok_or_else(|| Error::BadUrl(flakehub_cache_server.to_owned()))?
|
||||
.to_string();
|
||||
|
||||
// Append an entry for the FlakeHub cache server to netrc.
|
||||
if !netrc
|
||||
.machines
|
||||
.iter()
|
||||
.any(|machine| machine.name.as_ref().unwrap() == &flakehub_cache_server_hostname)
|
||||
.any(|machine| machine.name.as_ref() == Some(&flakehub_cache_server_hostname))
|
||||
{
|
||||
let mut netrc_file = tokio::fs::OpenOptions::new()
|
||||
.create(false)
|
||||
.append(true)
|
||||
.open(flakehub_api_server_netrc)
|
||||
.await
|
||||
.unwrap();
|
||||
.await?;
|
||||
netrc_file
|
||||
.write_all(
|
||||
format!(
|
||||
|
@ -82,127 +76,58 @@ pub async fn init_cache(
|
|||
)
|
||||
.as_bytes(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Get the cache we're supposed to use.
|
||||
let expected_cache_name = {
|
||||
// Get the cache UUID for this project.
|
||||
let cache_name = {
|
||||
let github_repo = env::var("GITHUB_REPOSITORY")
|
||||
.expect("GITHUB_REPOSITORY environment variable is not set");
|
||||
|
||||
let url = format!("{}/project/{}", flakehub_api_server, github_repo,);
|
||||
let url = flakehub_api_server
|
||||
.join(&format!("project/{}", github_repo))
|
||||
.unwrap();
|
||||
|
||||
let response = reqwest::Client::new()
|
||||
.get(&url)
|
||||
.get(url.to_owned())
|
||||
.header("User-Agent", USER_AGENT)
|
||||
.basic_auth(
|
||||
netrc_entry.login.as_ref().unwrap(),
|
||||
netrc_entry.password.as_ref(),
|
||||
)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
if response.status().is_success() {
|
||||
#[derive(Deserialize)]
|
||||
struct ProjectInfo {
|
||||
organization_uuid_v7: String,
|
||||
project_uuid_v7: String,
|
||||
}
|
||||
|
||||
let project_info = response.json::<ProjectInfo>().await.unwrap();
|
||||
|
||||
let expected_cache_name = format!(
|
||||
"{}:{}",
|
||||
project_info.organization_uuid_v7, project_info.project_uuid_v7,
|
||||
);
|
||||
|
||||
tracing::info!("Want to use cache {:?}.", expected_cache_name);
|
||||
|
||||
Some(expected_cache_name)
|
||||
} else {
|
||||
tracing::error!(
|
||||
"Failed to get project info from {}: {}",
|
||||
url,
|
||||
response.status()
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Get a token for creating and pushing to the FlakeHub binary cache.
|
||||
let (known_caches, token) = {
|
||||
let url = format!("{}/token/create/cache", flakehub_api_server);
|
||||
|
||||
let request = reqwest::Client::new()
|
||||
.post(&url)
|
||||
.header("User-Agent", USER_AGENT)
|
||||
.basic_auth(
|
||||
netrc_entry.login.as_ref().unwrap(),
|
||||
netrc_entry.password.as_ref(),
|
||||
);
|
||||
|
||||
let response = request.send().await.unwrap();
|
||||
.await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
panic!(
|
||||
"Failed to get FlakeHub binary cache creation token from {}: {}",
|
||||
url,
|
||||
response.status()
|
||||
);
|
||||
return Err(Error::GetCacheName(
|
||||
response.status(),
|
||||
response.text().await?,
|
||||
));
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Response {
|
||||
token: String,
|
||||
struct ProjectInfo {
|
||||
organization_uuid_v7: Uuid,
|
||||
project_uuid_v7: Uuid,
|
||||
}
|
||||
|
||||
let token = response.json::<Response>().await.unwrap().token;
|
||||
let project_info = response.json::<ProjectInfo>().await?;
|
||||
|
||||
// Parse the JWT to get the list of caches to which we have access.
|
||||
let jwt = token.strip_prefix(JWT_PREFIX).unwrap();
|
||||
let jwt_parsed: jwt::Token<jwt::Header, serde_json::Map<String, serde_json::Value>, _> =
|
||||
jwt::Token::parse_unverified(jwt).unwrap();
|
||||
let known_caches = jwt_parsed
|
||||
.claims()
|
||||
.get("https://cache.flakehub.com/v1")
|
||||
.unwrap()
|
||||
.get("caches")
|
||||
.unwrap()
|
||||
.as_object()
|
||||
.unwrap();
|
||||
|
||||
(known_caches.to_owned(), token)
|
||||
format!(
|
||||
"{}:{}",
|
||||
project_info.organization_uuid_v7, project_info.project_uuid_v7,
|
||||
)
|
||||
};
|
||||
|
||||
// Use the expected cache if we have access to it, otherwise use
|
||||
// the oldest cache to which we have access.
|
||||
let cache_name = {
|
||||
if expected_cache_name
|
||||
.as_ref()
|
||||
.map_or(false, |x| known_caches.get(x).is_some())
|
||||
{
|
||||
expected_cache_name.unwrap().to_owned()
|
||||
} else {
|
||||
let mut keys: Vec<_> = known_caches.keys().collect();
|
||||
keys.sort();
|
||||
keys.first()
|
||||
.expect("FlakeHub did not return any cache for the calling user.")
|
||||
.to_string()
|
||||
}
|
||||
};
|
||||
tracing::info!("Using cache {:?}.", cache_name);
|
||||
|
||||
let cache = CacheSliceIdentifier::from_str(&cache_name).unwrap();
|
||||
|
||||
tracing::info!("Using cache {}.", cache);
|
||||
let cache = CacheSliceIdentifier::from_str(&cache_name)?;
|
||||
|
||||
// Create the cache.
|
||||
let api = ApiClient::from_server_config(ServerConfig {
|
||||
endpoint: flakehub_cache_server.to_owned(),
|
||||
token: Some(token.to_owned()),
|
||||
})
|
||||
.unwrap();
|
||||
endpoint: flakehub_cache_server.to_string(),
|
||||
token: netrc_entry.password.as_ref().cloned(),
|
||||
})?;
|
||||
|
||||
let request = CreateCacheRequest {
|
||||
keypair: KeypairConfig::Generate,
|
||||
|
@ -218,14 +143,14 @@ pub async fn init_cache(
|
|||
tracing::info!("Cache {} already exists.", cache_name);
|
||||
}
|
||||
_ => {
|
||||
panic!("{:?}", err);
|
||||
return Err(Error::FlakeHub(err));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::info!("Created cache {} on {}.", cache_name, flakehub_cache_server);
|
||||
}
|
||||
|
||||
let cache_config = api.get_cache_config(&cache).await.unwrap();
|
||||
let cache_config = api.get_cache_config(&cache).await?;
|
||||
|
||||
let push_config = PushConfig {
|
||||
num_workers: 5, // FIXME: use number of CPUs?
|
||||
|
@ -254,10 +179,7 @@ pub async fn init_cache(
|
|||
}
|
||||
|
||||
pub async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
|
||||
state
|
||||
.push_session
|
||||
.queue_many(store_paths)
|
||||
.map_err(Error::FlakeHub)?;
|
||||
state.push_session.queue_many(store_paths)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ struct Args {
|
|||
|
||||
/// The FlakeHub API server.
|
||||
#[arg(long)]
|
||||
flakehub_api_server: Option<String>,
|
||||
flakehub_api_server: Option<reqwest::Url>,
|
||||
|
||||
/// The path of the `netrc` file that contains the FlakeHub JWT token.
|
||||
#[arg(long)]
|
||||
|
@ -89,7 +89,7 @@ struct Args {
|
|||
|
||||
/// The FlakeHub binary cache server.
|
||||
#[arg(long)]
|
||||
flakehub_cache_server: Option<String>,
|
||||
flakehub_cache_server: Option<reqwest::Url>,
|
||||
|
||||
/// The location of `nix.conf`.
|
||||
#[arg(long)]
|
||||
|
@ -186,16 +186,16 @@ async fn main_cli() {
|
|||
)
|
||||
.expect("Writing to nix.conf");
|
||||
|
||||
tracing::info!("Attic cache is enabled.");
|
||||
tracing::info!("FlakeHub cache is enabled.");
|
||||
Some(state)
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Attic cache initialization failed: {}", err);
|
||||
tracing::error!("FlakeHub cache initialization failed: {}", err);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::info!("Attic cache is disabled.");
|
||||
tracing::info!("FlakeHub cache is disabled.");
|
||||
None
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in a new issue