diff --git a/magic-nix-cache/src/api.rs b/magic-nix-cache/src/api.rs index ae76120..7aa6c63 100644 --- a/magic-nix-cache/src/api.rs +++ b/magic-nix-cache/src/api.rs @@ -147,7 +147,7 @@ async fn post_enqueue_paths( Ok(Json(EnqueuePathsResponse {})) } -async fn enqueue_paths(state: &State, store_paths: Vec) -> Result<()> { +pub async fn enqueue_paths(state: &State, store_paths: Vec) -> Result<()> { if let Some(gha_cache) = &state.gha_cache { gha_cache .enqueue_paths(state.store.clone(), store_paths.clone()) diff --git a/magic-nix-cache/src/main.rs b/magic-nix-cache/src/main.rs index 20aa088..5e1a90e 100644 --- a/magic-nix-cache/src/main.rs +++ b/magic-nix-cache/src/main.rs @@ -300,13 +300,36 @@ async fn main_cli() -> Result<()> { None }; + let diagnostic_endpoint = match args.diagnostic_endpoint.as_str() { + "" => { + tracing::info!("Diagnostics disabled."); + None + } + url => Some(url), + }; + + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + + let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new())); + let state = Arc::new(StateInner { + gha_cache, + upstream: args.upstream.clone(), + shutdown_sender: Mutex::new(Some(shutdown_sender)), + narinfo_negative_cache, + metrics, + store, + flakehub_state: RwLock::new(flakehub_state), + logfile: guard.logfile, + original_paths, + }); + let dnixd_uds_socket_dir: &Path = Path::new(&DETERMINATE_STATE_DIR); let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME); if dnixd_uds_socket_path.exists() { let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?); - // let (mut sender, conn): (SendRequest, _) = - // hyper::client::conn::http1::handshake(stream).await?; + + // TODO(colemickens): loop retry/reconnect/etc let executor = TokioExecutor::new(); let (mut sender, conn): (hyper::client::conn::http2::SendRequest, _) = @@ -327,26 +350,23 @@ async fn main_cli() -> Result<()> { .body(axum::body::Body::empty())?; let response = sender.send_request(request).await?; - - // NOTE(to self): there was a note somewhere to use this _IF_ you wanted all data, including trialers etc - // so ... we'll see.. let mut data = response.into_data_stream(); while let Some(foo) = data.next().await { tracing::info!("got {:?}", foo); + + + let sp = String::from_utf8_lossy(&foo.unwrap()).to_string(); + let store_paths = vec![sp]; + + // TODO(colemickens): error handling::: + let store_paths = store_paths + .iter() + .map(|path| state.store.follow_store_path(path).map_err(|_| anyhow!("ahhhhh"))) + .collect::>>()?; + + crate::api::enqueue_paths(&state, store_paths).await?; } - - // while let Some(frame_result) = response.frame().await { - // let frame = frame_result?; - - // if let Some(segment) = frame.data_ref() { - // tokio::io::stdout() - // .write_all(segment.iter().as_slice()) - // .await?; - // } - // } - - // TODO: do something with these paths } else { // TODO: split into own function(s) @@ -411,29 +431,6 @@ async fn main_cli() -> Result<()> { drop(nix_conf); } - let diagnostic_endpoint = match args.diagnostic_endpoint.as_str() { - "" => { - tracing::info!("Diagnostics disabled."); - None - } - url => Some(url), - }; - - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - - let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new())); - let state = Arc::new(StateInner { - gha_cache, - upstream: args.upstream.clone(), - shutdown_sender: Mutex::new(Some(shutdown_sender)), - narinfo_negative_cache, - metrics, - store, - flakehub_state: RwLock::new(flakehub_state), - logfile: guard.logfile, - original_paths, - }); - let app = Router::new() .route("/", get(root)) .merge(api::get_router())