Merge pull request #91 from DeterminateSystems/why-io-error
Why io error
This commit is contained in:
commit
91eef4416e
|
@ -75,8 +75,8 @@ pub enum Error {
|
||||||
info: ApiErrorInfo,
|
info: ApiErrorInfo,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[error("I/O error: {0}")]
|
#[error("I/O error: {0}, context: {1}")]
|
||||||
IoError(#[from] std::io::Error),
|
IoError(std::io::Error, String),
|
||||||
|
|
||||||
#[error("Too many collisions")]
|
#[error("Too many collisions")]
|
||||||
TooManyCollisions,
|
TooManyCollisions,
|
||||||
|
@ -345,8 +345,9 @@ impl Api {
|
||||||
let mut futures = Vec::new();
|
let mut futures = Vec::new();
|
||||||
loop {
|
loop {
|
||||||
let buf = BytesMut::with_capacity(CHUNK_SIZE);
|
let buf = BytesMut::with_capacity(CHUNK_SIZE);
|
||||||
let chunk = read_chunk_async(&mut stream, buf).await?;
|
let chunk = read_chunk_async(&mut stream, buf)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::IoError(e, "Reading a chunk during upload".to_string()))?;
|
||||||
if chunk.is_empty() {
|
if chunk.is_empty() {
|
||||||
offset += chunk.len();
|
offset += chunk.len();
|
||||||
break;
|
break;
|
||||||
|
@ -368,7 +369,10 @@ impl Api {
|
||||||
let url = self.construct_url(&format!("caches/{}", allocation.0 .0));
|
let url = self.construct_url(&format!("caches/{}", allocation.0 .0));
|
||||||
|
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
let permit = concurrency_limit.acquire().await.unwrap();
|
let permit = concurrency_limit
|
||||||
|
.acquire()
|
||||||
|
.await
|
||||||
|
.expect("failed to acquire concurrency semaphore permit");
|
||||||
|
|
||||||
tracing::trace!(
|
tracing::trace!(
|
||||||
"Starting uploading chunk {}-{}",
|
"Starting uploading chunk {}-{}",
|
||||||
|
@ -410,7 +414,9 @@ impl Api {
|
||||||
future::join_all(futures)
|
future::join_all(futures)
|
||||||
.await
|
.await
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.try_for_each(|join_result| join_result.unwrap())?;
|
.try_for_each(|join_result| {
|
||||||
|
join_result.expect("failed collecting a join result during parallel upload")
|
||||||
|
})?;
|
||||||
|
|
||||||
tracing::debug!("Received all chunks for cache {:?}", allocation.0);
|
tracing::debug!("Received all chunks for cache {:?}", allocation.0);
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,8 @@ async fn workflow_finish(
|
||||||
|
|
||||||
// NOTE(cole-h): see `init_logging`
|
// NOTE(cole-h): see `init_logging`
|
||||||
if let Some(logfile) = &state.logfile {
|
if let Some(logfile) = &state.logfile {
|
||||||
let logfile_contents = std::fs::read_to_string(logfile)?;
|
let logfile_contents = std::fs::read_to_string(logfile)
|
||||||
|
.map_err(|e| crate::error::Error::Io(e, format!("Reading {}", logfile.display())))?;
|
||||||
println!("Every log line throughout the lifetime of the program:");
|
println!("Every log line throughout the lifetime of the program:");
|
||||||
println!("\n{logfile_contents}\n");
|
println!("\n{logfile_contents}\n");
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,8 @@ pub enum Error {
|
||||||
#[error("Bad Request")]
|
#[error("Bad Request")]
|
||||||
BadRequest,
|
BadRequest,
|
||||||
|
|
||||||
#[error("I/O error: {0}")]
|
#[error("I/O error: {0}. Context: {1}")]
|
||||||
Io(#[from] std::io::Error),
|
Io(std::io::Error, String),
|
||||||
|
|
||||||
#[error("GHA cache is disabled")]
|
#[error("GHA cache is disabled")]
|
||||||
GHADisabled,
|
GHADisabled,
|
||||||
|
|
|
@ -72,7 +72,9 @@ impl GhaCache {
|
||||||
self.channel_tx
|
self.channel_tx
|
||||||
.send(Request::Shutdown)
|
.send(Request::Shutdown)
|
||||||
.expect("Cannot send shutdown message");
|
.expect("Cannot send shutdown message");
|
||||||
worker_result.await.unwrap()
|
worker_result
|
||||||
|
.await
|
||||||
|
.expect("failed to read result from gha worker")
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -189,7 +191,7 @@ async fn upload_path(
|
||||||
|
|
||||||
let narinfo = path_info_to_nar_info(store.clone(), &path_info, format!("nar/{}", nar_path))
|
let narinfo = path_info_to_nar_info(store.clone(), &path_info, format!("nar/{}", nar_path))
|
||||||
.to_string()
|
.to_string()
|
||||||
.unwrap();
|
.expect("failed to convert path into to nar info");
|
||||||
|
|
||||||
tracing::debug!("Uploading '{}'", narinfo_path);
|
tracing::debug!("Uploading '{}'", narinfo_path);
|
||||||
|
|
||||||
|
@ -224,7 +226,17 @@ fn path_info_to_nar_info(store: Arc<NixStore>, path_info: &ValidPathInfo, url: S
|
||||||
references: path_info
|
references: path_info
|
||||||
.references
|
.references
|
||||||
.iter()
|
.iter()
|
||||||
.map(|r| r.file_name().unwrap().to_str().unwrap().to_owned())
|
.map(|r| {
|
||||||
|
r.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
panic!(
|
||||||
|
"failed to convert nar_info reference to string: {}",
|
||||||
|
r.display()
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.to_owned()
|
||||||
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
system: None,
|
system: None,
|
||||||
deriver: None,
|
deriver: None,
|
||||||
|
|
|
@ -392,8 +392,33 @@ async fn main_cli() -> Result<()> {
|
||||||
|
|
||||||
tracing::debug!("Startup notification via file at {startup_notification_file_path:?}");
|
tracing::debug!("Startup notification via file at {startup_notification_file_path:?}");
|
||||||
|
|
||||||
let mut notification_file = File::create(&startup_notification_file_path).await?;
|
if let Some(parent_dir) = startup_notification_file_path.parent() {
|
||||||
notification_file.write_all(file_contents).await?;
|
tokio::fs::create_dir_all(parent_dir)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"failed to create parent directory for startup notification file path: {}",
|
||||||
|
startup_notification_file_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
let mut notification_file = File::create(&startup_notification_file_path)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"failed to create startup notification file to path: {}",
|
||||||
|
startup_notification_file_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
notification_file
|
||||||
|
.write_all(file_contents)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"failed to write startup notification file to path: {}",
|
||||||
|
startup_notification_file_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
tracing::debug!("Created startup notification file at {startup_notification_file_path:?}");
|
tracing::debug!("Created startup notification file at {startup_notification_file_path:?}");
|
||||||
}
|
}
|
||||||
|
@ -437,8 +462,16 @@ fn init_logging() -> Result<LogGuard> {
|
||||||
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
|
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
return EnvFilter::new("info")
|
return EnvFilter::new("info")
|
||||||
.add_directive("magic_nix_cache=debug".parse().unwrap())
|
.add_directive(
|
||||||
.add_directive("gha_cache=debug".parse().unwrap());
|
"magic_nix_cache=debug"
|
||||||
|
.parse()
|
||||||
|
.expect("failed to parse magix_nix_cache directive"),
|
||||||
|
)
|
||||||
|
.add_directive(
|
||||||
|
"gha_cache=debug"
|
||||||
|
.parse()
|
||||||
|
.expect("failed to parse gha_cahce directive"),
|
||||||
|
);
|
||||||
|
|
||||||
#[cfg(not(debug_assertions))]
|
#[cfg(not(debug_assertions))]
|
||||||
return EnvFilter::new("info");
|
return EnvFilter::new("info");
|
||||||
|
|
|
@ -164,9 +164,20 @@ pub async fn setup_legacy_post_build_hook(
|
||||||
&path.display().to_string(),
|
&path.display().to_string(),
|
||||||
])
|
])
|
||||||
.output()
|
.output()
|
||||||
.await?;
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Running nix to add the post-build-hook to the store from {}",
|
||||||
|
path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
if res.status.success() {
|
if res.status.success() {
|
||||||
tokio::fs::remove_file(path).await?;
|
tokio::fs::remove_file(&path).await.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Cleaning up the temporary post-build-hook at {}",
|
||||||
|
path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
PathBuf::from(String::from_utf8_lossy(&res.stdout).trim())
|
PathBuf::from(String::from_utf8_lossy(&res.stdout).trim())
|
||||||
} else {
|
} else {
|
||||||
path
|
path
|
||||||
|
|
|
@ -11,9 +11,19 @@ use crate::error::Result;
|
||||||
pub async fn get_store_paths(store: &NixStore) -> Result<HashSet<PathBuf>> {
|
pub async fn get_store_paths(store: &NixStore) -> Result<HashSet<PathBuf>> {
|
||||||
// FIXME: use the Nix API.
|
// FIXME: use the Nix API.
|
||||||
let store_dir = store.store_dir();
|
let store_dir = store.store_dir();
|
||||||
let mut listing = tokio::fs::read_dir(store_dir).await?;
|
let mut listing = tokio::fs::read_dir(store_dir).await.map_err(|e| {
|
||||||
|
crate::error::Error::Io(
|
||||||
|
e,
|
||||||
|
format!("Enumerating store paths in {}", store_dir.display()),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
let mut paths = HashSet::new();
|
let mut paths = HashSet::new();
|
||||||
while let Some(entry) = listing.next_entry().await? {
|
while let Some(entry) = listing.next_entry().await.map_err(|e| {
|
||||||
|
crate::error::Error::Io(
|
||||||
|
e,
|
||||||
|
format!("Reading existing store paths from {}", store_dir.display()),
|
||||||
|
)
|
||||||
|
})? {
|
||||||
let file_name = entry.file_name();
|
let file_name = entry.file_name();
|
||||||
let file_name = Path::new(&file_name);
|
let file_name = Path::new(&file_name);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue