Invalidate negative narinfo cache entries
This commit is contained in:
parent
b53876db25
commit
9bf26f0680
|
@ -35,6 +35,7 @@ impl GhaCache {
|
||||||
cache_version: Option<String>,
|
cache_version: Option<String>,
|
||||||
store: Arc<NixStore>,
|
store: Arc<NixStore>,
|
||||||
metrics: Arc<telemetry::TelemetryReport>,
|
metrics: Arc<telemetry::TelemetryReport>,
|
||||||
|
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
|
||||||
) -> Result<GhaCache> {
|
) -> Result<GhaCache> {
|
||||||
let mut api = Api::new(credentials)?;
|
let mut api = Api::new(credentials)?;
|
||||||
|
|
||||||
|
@ -48,8 +49,16 @@ impl GhaCache {
|
||||||
|
|
||||||
let api2 = api.clone();
|
let api2 = api.clone();
|
||||||
|
|
||||||
let worker_result =
|
let worker_result = tokio::task::spawn(async move {
|
||||||
tokio::task::spawn(async move { worker(&api2, store, channel_rx, metrics).await });
|
worker(
|
||||||
|
&api2,
|
||||||
|
store,
|
||||||
|
channel_rx,
|
||||||
|
metrics,
|
||||||
|
narinfo_negative_cache.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
});
|
||||||
|
|
||||||
Ok(GhaCache {
|
Ok(GhaCache {
|
||||||
api,
|
api,
|
||||||
|
@ -100,6 +109,7 @@ async fn worker(
|
||||||
store: Arc<NixStore>,
|
store: Arc<NixStore>,
|
||||||
mut channel_rx: UnboundedReceiver<Request>,
|
mut channel_rx: UnboundedReceiver<Request>,
|
||||||
metrics: Arc<telemetry::TelemetryReport>,
|
metrics: Arc<telemetry::TelemetryReport>,
|
||||||
|
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut done = HashSet::new();
|
let mut done = HashSet::new();
|
||||||
|
|
||||||
|
@ -113,7 +123,15 @@ async fn worker(
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(err) = upload_path(api, store.clone(), &path, metrics.clone()).await {
|
if let Err(err) = upload_path(
|
||||||
|
api,
|
||||||
|
store.clone(),
|
||||||
|
&path,
|
||||||
|
metrics.clone(),
|
||||||
|
narinfo_negative_cache.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
"Upload of path '{}' failed: {}",
|
"Upload of path '{}' failed: {}",
|
||||||
store.get_full_path(&path).display(),
|
store.get_full_path(&path).display(),
|
||||||
|
@ -132,6 +150,7 @@ async fn upload_path(
|
||||||
store: Arc<NixStore>,
|
store: Arc<NixStore>,
|
||||||
path: &StorePath,
|
path: &StorePath,
|
||||||
metrics: Arc<telemetry::TelemetryReport>,
|
metrics: Arc<telemetry::TelemetryReport>,
|
||||||
|
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let path_info = store.query_path_info(path.clone()).await?;
|
let path_info = store.query_path_info(path.clone()).await?;
|
||||||
|
|
||||||
|
@ -173,9 +192,14 @@ async fn upload_path(
|
||||||
.await?;
|
.await?;
|
||||||
metrics.narinfos_uploaded.incr();
|
metrics.narinfos_uploaded.incr();
|
||||||
|
|
||||||
|
narinfo_negative_cache
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.remove(&path.to_hash().to_string());
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"Uploaded '{}' to the GitHub Action Cache",
|
"Uploaded '{}' to the GitHub Action Cache",
|
||||||
store.get_full_path(&path).display()
|
store.get_full_path(path).display()
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -123,7 +123,7 @@ struct StateInner {
|
||||||
shutdown_sender: Mutex<Option<oneshot::Sender<()>>>,
|
shutdown_sender: Mutex<Option<oneshot::Sender<()>>>,
|
||||||
|
|
||||||
/// Set of store path hashes that are not present in GHAC.
|
/// Set of store path hashes that are not present in GHAC.
|
||||||
narinfo_negative_cache: RwLock<HashSet<String>>,
|
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
|
||||||
|
|
||||||
/// Metrics for sending to perf at shutdown
|
/// Metrics for sending to perf at shutdown
|
||||||
metrics: Arc<telemetry::TelemetryReport>,
|
metrics: Arc<telemetry::TelemetryReport>,
|
||||||
|
@ -154,6 +154,8 @@ async fn main_cli() -> Result<()> {
|
||||||
|
|
||||||
let store = Arc::new(NixStore::connect()?);
|
let store = Arc::new(NixStore::connect()?);
|
||||||
|
|
||||||
|
let narinfo_negative_cache = Arc::new(RwLock::new(HashSet::new()));
|
||||||
|
|
||||||
let flakehub_state = if args.use_flakehub {
|
let flakehub_state = if args.use_flakehub {
|
||||||
let flakehub_cache_server = args
|
let flakehub_cache_server = args
|
||||||
.flakehub_cache_server
|
.flakehub_cache_server
|
||||||
|
@ -234,6 +236,7 @@ async fn main_cli() -> Result<()> {
|
||||||
args.cache_version,
|
args.cache_version,
|
||||||
store.clone(),
|
store.clone(),
|
||||||
metrics.clone(),
|
metrics.clone(),
|
||||||
|
narinfo_negative_cache.clone(),
|
||||||
)
|
)
|
||||||
.with_context(|| "Failed to initialize GitHub Actions Cache API")?;
|
.with_context(|| "Failed to initialize GitHub Actions Cache API")?;
|
||||||
|
|
||||||
|
@ -301,7 +304,7 @@ async fn main_cli() -> Result<()> {
|
||||||
gha_cache,
|
gha_cache,
|
||||||
upstream: args.upstream.clone(),
|
upstream: args.upstream.clone(),
|
||||||
shutdown_sender: Mutex::new(Some(shutdown_sender)),
|
shutdown_sender: Mutex::new(Some(shutdown_sender)),
|
||||||
narinfo_negative_cache: RwLock::new(HashSet::new()),
|
narinfo_negative_cache,
|
||||||
metrics,
|
metrics,
|
||||||
store,
|
store,
|
||||||
flakehub_state: RwLock::new(flakehub_state),
|
flakehub_state: RwLock::new(flakehub_state),
|
||||||
|
|
Loading…
Reference in a new issue