uds subscription: enqueue_paths
This commit is contained in:
parent
e5d5118022
commit
57eb3e75c0
|
@ -147,7 +147,7 @@ async fn post_enqueue_paths(
|
||||||
Ok(Json(EnqueuePathsResponse {}))
|
Ok(Json(EnqueuePathsResponse {}))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
|
pub async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
|
||||||
if let Some(gha_cache) = &state.gha_cache {
|
if let Some(gha_cache) = &state.gha_cache {
|
||||||
gha_cache
|
gha_cache
|
||||||
.enqueue_paths(state.store.clone(), store_paths.clone())
|
.enqueue_paths(state.store.clone(), store_paths.clone())
|
||||||
|
|
|
@ -300,13 +300,36 @@ async fn main_cli() -> Result<()> {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let diagnostic_endpoint = match args.diagnostic_endpoint.as_str() {
|
||||||
|
"" => {
|
||||||
|
tracing::info!("Diagnostics disabled.");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
url => Some(url),
|
||||||
|
};
|
||||||
|
|
||||||
|
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new()));
|
||||||
|
let state = Arc::new(StateInner {
|
||||||
|
gha_cache,
|
||||||
|
upstream: args.upstream.clone(),
|
||||||
|
shutdown_sender: Mutex::new(Some(shutdown_sender)),
|
||||||
|
narinfo_negative_cache,
|
||||||
|
metrics,
|
||||||
|
store,
|
||||||
|
flakehub_state: RwLock::new(flakehub_state),
|
||||||
|
logfile: guard.logfile,
|
||||||
|
original_paths,
|
||||||
|
});
|
||||||
|
|
||||||
let dnixd_uds_socket_dir: &Path = Path::new(&DETERMINATE_STATE_DIR);
|
let dnixd_uds_socket_dir: &Path = Path::new(&DETERMINATE_STATE_DIR);
|
||||||
let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME);
|
let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME);
|
||||||
|
|
||||||
if dnixd_uds_socket_path.exists() {
|
if dnixd_uds_socket_path.exists() {
|
||||||
let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?);
|
let stream = TokioIo::new(UnixStream::connect(dnixd_uds_socket_path).await?);
|
||||||
// let (mut sender, conn): (SendRequest<Body>, _) =
|
|
||||||
// hyper::client::conn::http1::handshake(stream).await?;
|
// TODO(colemickens): loop retry/reconnect/etc
|
||||||
|
|
||||||
let executor = TokioExecutor::new();
|
let executor = TokioExecutor::new();
|
||||||
let (mut sender, conn): (hyper::client::conn::http2::SendRequest<Body>, _) =
|
let (mut sender, conn): (hyper::client::conn::http2::SendRequest<Body>, _) =
|
||||||
|
@ -327,26 +350,23 @@ async fn main_cli() -> Result<()> {
|
||||||
.body(axum::body::Body::empty())?;
|
.body(axum::body::Body::empty())?;
|
||||||
|
|
||||||
let response = sender.send_request(request).await?;
|
let response = sender.send_request(request).await?;
|
||||||
|
|
||||||
// NOTE(to self): there was a note somewhere to use this _IF_ you wanted all data, including trialers etc
|
|
||||||
// so ... we'll see..
|
|
||||||
let mut data = response.into_data_stream();
|
let mut data = response.into_data_stream();
|
||||||
|
|
||||||
while let Some(foo) = data.next().await {
|
while let Some(foo) = data.next().await {
|
||||||
tracing::info!("got {:?}", foo);
|
tracing::info!("got {:?}", foo);
|
||||||
|
|
||||||
|
|
||||||
|
let sp = String::from_utf8_lossy(&foo.unwrap()).to_string();
|
||||||
|
let store_paths = vec![sp];
|
||||||
|
|
||||||
|
// TODO(colemickens): error handling:::
|
||||||
|
let store_paths = store_paths
|
||||||
|
.iter()
|
||||||
|
.map(|path| state.store.follow_store_path(path).map_err(|_| anyhow!("ahhhhh")))
|
||||||
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
crate::api::enqueue_paths(&state, store_paths).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// while let Some(frame_result) = response.frame().await {
|
|
||||||
// let frame = frame_result?;
|
|
||||||
|
|
||||||
// if let Some(segment) = frame.data_ref() {
|
|
||||||
// tokio::io::stdout()
|
|
||||||
// .write_all(segment.iter().as_slice())
|
|
||||||
// .await?;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// TODO: do something with these paths
|
|
||||||
} else {
|
} else {
|
||||||
// TODO: split into own function(s)
|
// TODO: split into own function(s)
|
||||||
|
|
||||||
|
@ -411,29 +431,6 @@ async fn main_cli() -> Result<()> {
|
||||||
drop(nix_conf);
|
drop(nix_conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
let diagnostic_endpoint = match args.diagnostic_endpoint.as_str() {
|
|
||||||
"" => {
|
|
||||||
tracing::info!("Diagnostics disabled.");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
url => Some(url),
|
|
||||||
};
|
|
||||||
|
|
||||||
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
|
|
||||||
|
|
||||||
let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new()));
|
|
||||||
let state = Arc::new(StateInner {
|
|
||||||
gha_cache,
|
|
||||||
upstream: args.upstream.clone(),
|
|
||||||
shutdown_sender: Mutex::new(Some(shutdown_sender)),
|
|
||||||
narinfo_negative_cache,
|
|
||||||
metrics,
|
|
||||||
store,
|
|
||||||
flakehub_state: RwLock::new(flakehub_state),
|
|
||||||
logfile: guard.logfile,
|
|
||||||
original_paths,
|
|
||||||
});
|
|
||||||
|
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/", get(root))
|
.route("/", get(root))
|
||||||
.merge(api::get_router())
|
.merge(api::get_router())
|
||||||
|
|
Loading…
Reference in a new issue