Compare commits

..

No commits in common. "main" and "v0.1.3" have entirely different histories.
main ... v0.1.3

32 changed files with 1330 additions and 7402 deletions

3
.cargo/config Normal file
View file

@ -0,0 +1,3 @@
# For -Zbuild-std
[target.aarch64-unknown-linux-musl]
rustflags = ["-C", "target-feature=+crt-static", "-C", "link-arg=-lgcc"]

View file

@ -1,10 +0,0 @@
# https://editorconfig.org
root = true
[*]
indent_style = space
indent_size = 2
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

6
.envrc
View file

@ -1 +1,5 @@
use flake
if ! has nix_direnv_version || ! nix_direnv_version 2.1.1; then
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/2.1.1/direnvrc" "sha256-b6qJ4r34rbE23yWjMqbmu3ia2z4b2wIlZUksBke/ol0="
fi
use_flake

View file

@ -5,49 +5,88 @@ on:
workflow_call:
jobs:
build-artifacts:
runs-on: ${{ matrix.systems.runner }}
permissions:
contents: read
id-token: write
env:
ARTIFACT_KEY: magic-nix-cache-${{ matrix.systems.system }}
ARCHIVE_NAME: magic-nix-cache.closure.xz
strategy:
matrix:
systems:
- nix-system: x86_64-linux
system: X64-Linux
runner: ubuntu-22.04
- nix-system: aarch64-linux
system: ARM64-Linux
runner: namespace-profile-default-arm64
- nix-system: x86_64-darwin
system: X64-macOS
runner: macos-14-large
- nix-system: aarch64-darwin
system: ARM64-macOS
runner: macos-latest-xlarge
build-artifacts-ARM64-macOS:
runs-on: macos-latest-xlarge
steps:
- uses: actions/checkout@v4
- name: Install Nix on ${{ matrix.systems.system }}
uses: DeterminateSystems/nix-installer-action@main
- name: Set up FlakeHub Cache
uses: DeterminateSystems/flakehub-cache-action@main
- uses: actions/checkout@v3
- name: Build and cache dev shell for ${{ matrix.systems.nix-system }}
run: |
nix build ".#devShells.${{ matrix.systems.nix-system }}.default"
- uses: DeterminateSystems/nix-installer-action@main
- name: Build package and create closure for ${{ matrix.systems.system }}
run: |
nix build .# -L --fallback && \
nix-store --export $(nix-store -qR ./result) | xz -9 > "${{ env.ARCHIVE_NAME }}"
- uses: DeterminateSystems/magic-nix-cache-action@main
- name: Upload magic-nix-cache closure for ${{ matrix.systems.system }}
uses: actions/upload-artifact@v4.6.0
- name: Build package
run: "nix build .# -L --fallback"
- name: Upload a Build Artifact
uses: actions/upload-artifact@v3.1.2
with:
# Artifact name
name: ${{ env.ARTIFACT_KEY }}
path: ${{ env.ARCHIVE_NAME }}
name: magic-nix-cache-ARM64-macOS
path: result/bin/magic-nix-cache
retention-days: 1
build-artifacts-X64-macOS:
runs-on: macos-12
steps:
- uses: actions/checkout@v3
- uses: DeterminateSystems/flake-checker-action@main
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- name: Build package
run: "nix build .# -L --fallback"
- name: Upload a Build Artifact
uses: actions/upload-artifact@v3.1.2
with:
# Artifact name
name: magic-nix-cache-X64-macOS
path: result/bin/magic-nix-cache
retention-days: 1
build-artifacts-X64-Linux:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- uses: DeterminateSystems/flake-checker-action@main
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- name: Build package
run: "nix build .# -L --fallback"
- name: Upload a Build Artifact
uses: actions/upload-artifact@v3.1.2
with:
# Artifact name
name: magic-nix-cache-X64-Linux
path: result/bin/magic-nix-cache
retention-days: 1
build-artifacts-ARM64-Linux:
runs-on: namespace-profile-default-arm64
steps:
- uses: actions/checkout@v3
- uses: DeterminateSystems/flake-checker-action@main
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- name: Build package
run: "nix build .# -L --fallback"
- name: Upload a Build Artifact
uses: actions/upload-artifact@v3.1.2
with:
# Artifact name
name: magic-nix-cache-ARM64-Linux
path: result/bin/magic-nix-cache
retention-days: 1

View file

@ -1,87 +0,0 @@
name: Run checks and integration test
on:
pull_request:
push:
branches: [main]
jobs:
checks:
name: Nix and Rust checks
runs-on: ubuntu-22.04
permissions:
contents: read
id-token: write
steps:
- uses: actions/checkout@v4
- name: Check health of flake.lock
uses: DeterminateSystems/flake-checker-action@main
with:
fail-mode: true
- name: Install Nix
uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/flakehub-cache-action@main
- name: Check Rust formatting
run: nix develop --command cargo fmt --check
- name: Clippy
run: nix develop --command cargo clippy
build:
name: Build artifacts
needs: checks
uses: ./.github/workflows/build.yaml
secrets: inherit
action-integration-test:
name: Integration test for magic-nix-cache-action
runs-on: ${{ matrix.systems.runner }}
needs: build
env:
ARTIFACT_KEY: magic-nix-cache-${{ matrix.systems.system }}
ARCHIVE_NAME: magic-nix-cache.closure.xz
strategy:
matrix:
systems:
- system: X64-Linux
runner: ubuntu-22.04
- system: ARM64-Linux
runner: namespace-profile-default-arm64
- system: X64-macOS
runner: macos-14-large
- system: ARM64-macOS
runner: macos-latest-xlarge
permissions:
contents: read
id-token: write
steps:
- uses: actions/checkout@v4
- name: Download closure for ${{ matrix.systems.system }}
uses: actions/download-artifact@v4.1.8
with:
name: ${{ env.ARTIFACT_KEY }}
path: ${{ env.ARTIFACT_KEY }}
- name: Install Nix on ${{ matrix.systems.system }}
uses: DeterminateSystems/nix-installer-action@main
- name: Test magic-nix-cache-action@main on ${{ matrix.systems.runner }}
uses: DeterminateSystems/magic-nix-cache-action@main
with:
source-binary: "${{ env.ARTIFACT_KEY }}/${{ env.ARCHIVE_NAME }}"
_internal-strict-mode: true
- name: Run nix to test magic-nix-cache-action
run: |
nix develop --command echo "just testing"
- name: Exhaust our GitHub Actions Cache tokens
# Generally skip this step since it is so intensive
if: ${{ false }}
run: |
date >> README.md
nix build .#veryLongChain -v

27
.github/workflows/checks.yaml vendored Normal file
View file

@ -0,0 +1,27 @@
name: Rust checks
on:
pull_request:
push:
branches: [main]
jobs:
checks:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- name: Install Nix
uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- name: Check health of flake.lock
uses: DeterminateSystems/flake-checker-action@main
with:
fail-mode: true
- name: Check Rust formatting
run: nix develop --command cargo fmt --check
- name: Clippy
run: nix develop --command cargo clippy

View file

@ -1,21 +0,0 @@
name: "Publish every Git push to main to FlakeHub"
on:
push:
branches:
- "main"
jobs:
flakehub-publish:
runs-on: "ubuntu-latest"
permissions:
id-token: "write"
contents: "read"
steps:
- uses: "actions/checkout@v4"
- uses: "DeterminateSystems/nix-installer-action@main"
- uses: "DeterminateSystems/flakehub-push@main"
with:
name: "DeterminateSystems/magic-nix-cache"
rolling: true
visibility: "public"

View file

@ -5,10 +5,10 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v3
- name: Install Nix
uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/flakehub-cache-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- name: Expose GitHub Runtime
uses: crazy-max/ghaction-github-runtime@v2
- name: Dump credentials

View file

@ -10,7 +10,6 @@ on:
jobs:
build:
uses: ./.github/workflows/build.yaml
secrets: inherit
release:
needs: build
@ -22,7 +21,7 @@ jobs:
id-token: write # In order to request a JWT for AWS auth
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v3
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v2
with:
@ -32,33 +31,33 @@ jobs:
- name: Create the artifacts directory
run: rm -rf ./artifacts && mkdir ./artifacts
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-ARM64-macOS
path: cache-binary-ARM64-macOS
- name: Persist the cache binary
run: cp ./cache-binary-ARM64-macOS/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-ARM64-macOS
run: cp ./cache-binary-ARM64-macOS/magic-nix-cache ./artifacts/magic-nix-cache-ARM64-macOS
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-X64-macOS
path: cache-binary-X64-macOS
- name: Persist the cache binary
run: cp ./cache-binary-X64-macOS/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-X64-macOS
run: cp ./cache-binary-X64-macOS/magic-nix-cache ./artifacts/magic-nix-cache-X64-macOS
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-X64-Linux
path: cache-binary-X64-Linux
- name: Persist the cache binary
run: cp ./cache-binary-X64-Linux/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-X64-Linux
run: cp ./cache-binary-X64-Linux/magic-nix-cache ./artifacts/magic-nix-cache-X64-Linux
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-ARM64-Linux
path: cache-binary-ARM64-Linux
- name: Persist the cache binary
run: cp ./cache-binary-ARM64-Linux/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-ARM64-Linux
run: cp ./cache-binary-ARM64-Linux/magic-nix-cache ./artifacts/magic-nix-cache-ARM64-Linux
- name: Publish Release (Branch)
env:

View file

@ -10,7 +10,12 @@ on:
jobs:
build:
# We want to build artifacts only if the `upload to s3` label is applied
uses: ./.github/workflows/build.yaml
release:
needs: build
concurrency: release
# Only intra-repo PRs are allowed to have PR artifacts uploaded
# We only want to trigger once the upload once in the case the upload label is added, not when any label is added
if: |
@ -19,50 +24,44 @@ jobs:
(github.event.action == 'labeled' && github.event.label.name == 'upload to s3')
|| (github.event.action != 'labeled' && contains(github.event.pull_request.labels.*.name, 'upload to s3'))
)
uses: ./.github/workflows/build.yaml
secrets: inherit
release:
needs: build
concurrency: release
runs-on: ubuntu-latest
permissions:
id-token: write # In order to request a JWT for AWS auth
contents: read
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v3
- name: Create the artifacts directory
run: rm -rf ./artifacts && mkdir ./artifacts
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-ARM64-macOS
path: cache-binary-ARM64-macOS
- name: Persist the cache binary
run: cp ./cache-binary-ARM64-macOS/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-ARM64-macOS
run: cp ./cache-binary-ARM64-macOS/magic-nix-cache ./artifacts/magic-nix-cache-ARM64-macOS
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-X64-macOS
path: cache-binary-X64-macOS
- name: Persist the cache binary
run: cp ./cache-binary-X64-macOS/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-X64-macOS
run: cp ./cache-binary-X64-macOS/magic-nix-cache ./artifacts/magic-nix-cache-X64-macOS
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-X64-Linux
path: cache-binary-X64-Linux
- name: Persist the cache binary
run: cp ./cache-binary-X64-Linux/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-X64-Linux
run: cp ./cache-binary-X64-Linux/magic-nix-cache ./artifacts/magic-nix-cache-X64-Linux
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-ARM64-Linux
path: cache-binary-ARM64-Linux
- name: Persist the cache binary
run: cp ./cache-binary-ARM64-Linux/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-ARM64-Linux
run: cp ./cache-binary-ARM64-Linux/magic-nix-cache ./artifacts/magic-nix-cache-ARM64-Linux
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v2

View file

@ -19,38 +19,38 @@ jobs:
id-token: write # In order to request a JWT for AWS auth
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v3
- name: Create the artifacts directory
run: rm -rf ./artifacts && mkdir ./artifacts
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-ARM64-macOS
path: cache-binary-ARM64-macOS
- name: Persist the cache binary
run: cp ./cache-binary-ARM64-macOS/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-ARM64-macOS
run: cp ./cache-binary-ARM64-macOS/magic-nix-cache ./artifacts/magic-nix-cache-ARM64-macOS
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-X64-macOS
path: cache-binary-X64-macOS
- name: Persist the cache binary
run: cp ./cache-binary-X64-macOS/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-X64-macOS
run: cp ./cache-binary-X64-macOS/magic-nix-cache ./artifacts/magic-nix-cache-X64-macOS
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-X64-Linux
path: cache-binary-X64-Linux
- name: Persist the cache binary
run: cp ./cache-binary-X64-Linux/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-X64-Linux
run: cp ./cache-binary-X64-Linux/magic-nix-cache ./artifacts/magic-nix-cache-X64-Linux
- uses: actions/download-artifact@v4.1.8
- uses: actions/download-artifact@v3
with:
name: magic-nix-cache-ARM64-Linux
path: cache-binary-ARM64-Linux
- name: Persist the cache binary
run: cp ./cache-binary-ARM64-Linux/magic-nix-cache.closure.xz ./artifacts/magic-nix-cache-ARM64-Linux
run: cp ./cache-binary-ARM64-Linux/magic-nix-cache ./artifacts/magic-nix-cache-ARM64-Linux
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v2

View file

@ -1,20 +0,0 @@
name: update-flake-lock
on:
workflow_dispatch: # enable manual triggering
schedule:
- cron: "0 0 * * 0" # every Sunday at midnight
jobs:
lockfile:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/flakehub-cache-action@main
- uses: DeterminateSystems/update-flake-lock@main
with:
pr-title: Update flake.lock
pr-labels: |
dependencies
automated

5546
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -3,7 +3,6 @@ members = [
"gha-cache",
"magic-nix-cache",
]
resolver = "2"
[profile.release]
opt-level = 'z'

View file

@ -1,26 +1,12 @@
# Magic Nix Cache
> [!WARNING]
> The [Magic Nix Cache will will stop working](https://determinate.systems/posts/magic-nix-cache-free-tier-eol) on **February 1st, 2025** unless you're on [GitHub Enterprise Server](https://github.com/enterprise).
>
> You can upgrade to [FlakeHub Cache](https://flakehub.com/cache) and get **one month free** using the coupon code **`FHC`**.
>
> For more information, read [this blog post](https://determinate.systems/posts/magic-nix-cache-free-tier-eol/).
Save 30-50%+ of CI time without any effort or cost.
Use Magic Nix Cache, a totally free and zero-configuration binary cache for Nix on GitHub Actions.
Add our [GitHub Action][action] after installing Nix, in your workflow, like this:
```yaml
permissions:
contents: read
id-token: write
steps:
- uses: actions/checkout@v4
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- run: nix flake check
- uses: DeterminateSystems/magic-nix-cache-action@main
```
See [Usage](#usage) for a detailed example.
@ -55,11 +41,8 @@ on:
jobs:
check:
runs-on: ubuntu-22.04
permissions:
contents: read
id-token: write
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v3
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- run: nix flake check
@ -91,8 +74,8 @@ For local development, see `gha-cache/README.md` for more details on how to obta
```shell
cargo run -- -c creds.json --upstream https://cache.nixos.org
cargo build --release --target x86_64-unknown-linux-gnu
cargo build --release --target aarch64-unknown-linux-gnu
cargo build --release --target x86_64-unknown-linux-musl
cargo build --release --target aarch64-unknown-linux-musl
nix copy --to 'http://127.0.0.1:3000' $(which bash)
nix-store --store $PWD/test-root --extra-substituters 'http://localhost:3000' --option require-sigs false -r $(which bash)
```
@ -144,3 +127,4 @@ You can read the full privacy policy for [Determinate Systems][detsys], the crea
[attic]: https://github.com/zhaofengli/attic
[colmena]: https://github.com/zhaofengli/colmena
[z2n]: https://zero-to-nix.com

116
crane.nix Normal file
View file

@ -0,0 +1,116 @@
{ stdenv
, pkgs
, lib
, crane
, rust
, rust-bin
, nix-gitignore
, supportedSystems
}:
let
inherit (stdenv.hostPlatform) system;
nightlyVersion = "2023-05-01";
rustNightly = (pkgs.rust-bin.nightly.${nightlyVersion}.default.override {
extensions = [ "rust-src" "rust-analyzer-preview" ];
targets = cargoTargets;
}).overrideAttrs (old: {
# Remove the propagated libiconv since we want to add our static version
depsTargetTargetPropagated = lib.filter (d: d.pname != "libiconv")
(lib.flatten (old.depsTargetTargetPropagated or [ ]));
});
# For easy cross-compilation in devShells
# We are just composing the pkgsCross.*.stdenv.cc together
crossPlatforms =
let
makeCrossPlatform = crossSystem:
let
pkgsCross =
if crossSystem == system then pkgs
else
import pkgs.path {
inherit system crossSystem;
overlays = [ ];
};
rustTargetSpec = rust.toRustTargetSpec pkgsCross.pkgsStatic.stdenv.hostPlatform;
rustTargetSpecUnderscored = builtins.replaceStrings [ "-" ] [ "_" ] rustTargetSpec;
cargoLinkerEnv = lib.strings.toUpper "CARGO_TARGET_${rustTargetSpecUnderscored}_LINKER";
cargoCcEnv = "CC_${rustTargetSpecUnderscored}"; # for ring
ccbin = "${pkgsCross.stdenv.cc}/bin/${pkgsCross.stdenv.cc.targetPrefix}cc";
in
{
name = crossSystem;
value = {
inherit rustTargetSpec;
cc = pkgsCross.stdenv.cc;
pkgs = pkgsCross;
buildInputs = makeBuildInputs pkgsCross;
env = {
"${cargoLinkerEnv}" = ccbin;
"${cargoCcEnv}" = ccbin;
};
};
};
systems = lib.filter (s: s == system || lib.hasInfix "linux" s) supportedSystems
# Cross from aarch64-darwin -> x86_64-darwin doesn't work yet
# Hopefully the situation will improve with the SDK bumps
++ lib.optional (system == "x86_64-darwin") "aarch64-darwin";
in
builtins.listToAttrs (map makeCrossPlatform systems);
cargoTargets = lib.mapAttrsToList (_: p: p.rustTargetSpec) crossPlatforms;
cargoCrossEnvs = lib.foldl (acc: p: acc // p.env) { } (builtins.attrValues crossPlatforms);
makeBuildInputs = pkgs: with pkgs; [ ]
++ lib.optionals pkgs.stdenv.isDarwin [
darwin.apple_sdk.frameworks.Security
(libiconv.override { enableStatic = true; enableShared = false; })
];
buildFor = system:
let
crossPlatform = crossPlatforms.${system};
inherit (crossPlatform) pkgs;
craneLib = (crane.mkLib pkgs).overrideToolchain rustNightly;
crateName = craneLib.crateNameFromCargoToml {
cargoToml = ./magic-nix-cache/Cargo.toml;
};
src = nix-gitignore.gitignoreSource [ ] ./.;
commonArgs = {
inherit (crateName) pname version;
inherit src;
buildInputs = makeBuildInputs pkgs;
cargoExtraArgs = "--target ${crossPlatform.rustTargetSpec}";
cargoVendorDir = craneLib.vendorMultipleCargoDeps {
inherit (craneLib.findCargoFiles src) cargoConfigs;
cargoLockList = [
./Cargo.lock
"${rustNightly.passthru.availableComponents.rust-src}/lib/rustlib/src/rust/Cargo.lock"
];
};
} // crossPlatform.env;
crate = craneLib.buildPackage (commonArgs // {
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
# The resulting executable must be standalone
allowedRequisites = [ ];
});
in
crate;
in
{
inherit crossPlatforms cargoTargets cargoCrossEnvs rustNightly;
magic-nix-cache = buildFor system;
}

View file

@ -1,176 +1,175 @@
{
"nodes": {
"crane": {
"inputs": {
"flake-compat": [
"flake-compat"
],
"flake-utils": "flake-utils",
"nixpkgs": [
"nixpkgs"
],
"rust-overlay": "rust-overlay"
},
"locked": {
"lastModified": 1741479724,
"narHash": "sha256-fnyETBKSVRa5abjOiRG/IAzKZq5yX8U6oRrHstPl4VM=",
"rev": "60202a2e3597a3d91f5e791aab03f45470a738b5",
"revCount": 709,
"lastModified": 1695511445,
"narHash": "sha256-mnE14re43v3/Jc50Jv0BKPMtEk7FEtDSligP6B5HwlI=",
"rev": "3de322e06fc88ada5e3589dc8a375b73e749f512",
"revCount": 411,
"type": "tarball",
"url": "https://api.flakehub.com/f/pinned/ipetkov/crane/0.20.2/0195784b-915b-7d2d-915d-ab02d1112ef9/source.tar.gz"
"url": "https://api.flakehub.com/f/pinned/ipetkov/crane/0.14.1/018ac45c-ff5e-7076-b956-d478a0336516/source.tar.gz"
},
"original": {
"type": "tarball",
"url": "https://flakehub.com/f/ipetkov/crane/%2A"
"url": "https://flakehub.com/f/ipetkov/crane/0.14.1.tar.gz"
}
},
"flake-compat": {
"flake": false,
"locked": {
"lastModified": 1733328505,
"narHash": "sha256-NeCCThCEP3eCl2l/+27kNNK7QrwZB1IJCrXfrbv5oqU=",
"owner": "edolstra",
"repo": "flake-compat",
"rev": "ff81ac966bb2cae68946d5ed5fc4994f96d0ffec",
"type": "github"
},
"original": {
"owner": "edolstra",
"repo": "flake-compat",
"type": "github"
}
},
"flake-parts": {
"inputs": {
"nixpkgs-lib": [
"nix",
"nixpkgs"
]
},
"locked": {
"lastModified": 1733312601,
"narHash": "sha256-4pDvzqnegAfRkPwO3wmwBhVi/Sye1mzps0zHWYnP88c=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "205b12d8b7cd4802fbcb8e8ef6a0f1408781a4f9",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "flake-parts",
"type": "github"
}
},
"git-hooks-nix": {
"inputs": {
"flake-compat": [
"nix"
],
"gitignore": [
"nix"
],
"nixpkgs": [
"nix",
"nixpkgs"
],
"nixpkgs-stable": [
"nix",
"nixpkgs"
]
},
"locked": {
"lastModified": 1734279981,
"narHash": "sha256-NdaCraHPp8iYMWzdXAt5Nv6sA3MUzlCiGiR586TCwo0=",
"owner": "cachix",
"repo": "git-hooks.nix",
"rev": "aa9f40c906904ebd83da78e7f328cd8aeaeae785",
"type": "github"
},
"original": {
"owner": "cachix",
"repo": "git-hooks.nix",
"type": "github"
}
},
"nix": {
"inputs": {
"flake-compat": "flake-compat",
"flake-parts": "flake-parts",
"git-hooks-nix": "git-hooks-nix",
"nixpkgs": "nixpkgs",
"nixpkgs-23-11": "nixpkgs-23-11",
"nixpkgs-regression": "nixpkgs-regression"
},
"locked": {
"lastModified": 1742824067,
"narHash": "sha256-rBPulEBpn4IiqkPsetuh7BRzT2iGCzZYnogTAsbrvhU=",
"rev": "9cb662df7442a1e2c4600fb8ecb2ad613ebc5a95",
"revCount": 19496,
"lastModified": 1696426674,
"narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=",
"rev": "0f9255e01c2351cc7d116c072cb317785dd33b33",
"revCount": 57,
"type": "tarball",
"url": "https://api.flakehub.com/f/pinned/NixOS/nix/2.27.1/0195c8c5-1964-7a31-b025-ebf9bfeef991/source.tar.gz"
"url": "https://api.flakehub.com/f/pinned/edolstra/flake-compat/1.0.1/018afb31-abd1-7bff-a5e4-cff7e18efb7a/source.tar.gz"
},
"original": {
"type": "tarball",
"url": "https://flakehub.com/f/NixOS/nix/2"
"url": "https://flakehub.com/f/edolstra/flake-compat/1.0.1.tar.gz"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1685518550,
"narHash": "sha256-o2d0KcvaXzTrPRIo0kOLV0/QXHhDQ5DTi+OxcjO8xqY=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "a1720a10a6cfe8234c0e93907ffe81be440f4cef",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1681202837,
"narHash": "sha256-H+Rh19JDwRtpVPAWp64F+rlEtxUWBAQW28eAi3SRSzg=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "cfacdce06f30d2b68473a46042957675eebb3401",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1734359947,
"narHash": "sha256-1Noao/H+N8nFB4Beoy8fgwrcOQLVm9o4zKW1ODaqK9E=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "48d12d5e70ee91fe8481378e540433a7303dbf6a",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "release-24.11",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-23-11": {
"locked": {
"lastModified": 1717159533,
"narHash": "sha256-oamiKNfr2MS6yH64rUn99mIZjc45nGJlj9eGth/3Xuw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a62e6edd6d5e1fa0329b8653c801147986f8d446",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a62e6edd6d5e1fa0329b8653c801147986f8d446",
"type": "github"
}
},
"nixpkgs-regression": {
"locked": {
"lastModified": 1643052045,
"narHash": "sha256-uGJ0VXIhWKGXxkeNnq4TvV3CIOkUJ3PAoLZ3HMzNVMw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "215d4d0fd80ca5163643b03a33fde804a29cc1e2",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "215d4d0fd80ca5163643b03a33fde804a29cc1e2",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1742422364,
"narHash": "sha256-mNqIplmEohk5jRkqYqG19GA8MbQ/D4gQSK0Mu4LvfRQ=",
"rev": "a84ebe20c6bc2ecbcfb000a50776219f48d134cc",
"revCount": 770807,
"lastModified": 1696604326,
"narHash": "sha256-YXUNI0kLEcI5g8lqGMb0nh67fY9f2YoJsILafh6zlMo=",
"rev": "87828a0e03d1418e848d3dd3f3014a632e4a4f64",
"revCount": 533189,
"type": "tarball",
"url": "https://api.flakehub.com/f/pinned/NixOS/nixpkgs/0.1.770807%2Brev-a84ebe20c6bc2ecbcfb000a50776219f48d134cc/0195b626-8c1d-7fb9-9282-563af3d37ab9/source.tar.gz"
"url": "https://api.flakehub.com/f/pinned/NixOS/nixpkgs/0.1.533189%2Brev-87828a0e03d1418e848d3dd3f3014a632e4a4f64/018b0dc8-e84f-7c59-b5d6-16849c3b2074/source.tar.gz"
},
"original": {
"type": "tarball",
"url": "https://flakehub.com/f/NixOS/nixpkgs/0.1"
"url": "https://flakehub.com/f/NixOS/nixpkgs/0.1.533189.tar.gz"
}
},
"root": {
"inputs": {
"crane": "crane",
"nix": "nix",
"nixpkgs": "nixpkgs_2"
"flake-compat": "flake-compat",
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay_2"
}
},
"rust-overlay": {
"inputs": {
"flake-utils": [
"crane",
"flake-utils"
],
"nixpkgs": [
"crane",
"nixpkgs"
]
},
"locked": {
"lastModified": 1685759304,
"narHash": "sha256-I3YBH6MS3G5kGzNuc1G0f9uYfTcNY9NYoRc3QsykLk4=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "c535b4f3327910c96dcf21851bbdd074d0760290",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
},
"rust-overlay_2": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1687400833,
"narHash": "sha256-rVENiSupjAE8o1+ZXNRIqewUzM2brm+aeme8MUrwl0U=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "fc0a266e836c079a9131108f4334e5af219dbb93",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},

191
flake.nix
View file

@ -2,122 +2,139 @@
description = "GitHub Actions-powered Nix binary cache";
inputs = {
nixpkgs.url = "https://flakehub.com/f/NixOS/nixpkgs/0.1";
nixpkgs.url = "https://flakehub.com/f/NixOS/nixpkgs/0.1.533189.tar.gz";
crane.url = "https://flakehub.com/f/ipetkov/crane/*";
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
};
nix.url = "https://flakehub.com/f/NixOS/nix/2";
crane = {
url = "https://flakehub.com/f/ipetkov/crane/0.14.1.tar.gz";
inputs.nixpkgs.follows = "nixpkgs";
inputs.flake-compat.follows = "flake-compat";
};
flake-compat.url = "https://flakehub.com/f/edolstra/flake-compat/1.0.1.tar.gz";
};
outputs = inputs:
outputs = { self, nixpkgs, ... }@inputs:
let
overlays = [ inputs.rust-overlay.overlays.default ];
supportedSystems = [
"aarch64-linux"
"x86_64-linux"
"aarch64-darwin"
"x86_64-darwin"
];
forEachSupportedSystem = f: inputs.nixpkgs.lib.genAttrs supportedSystems (system: f rec {
pkgs = import inputs.nixpkgs {
inherit system;
overlays = [
inputs.self.overlays.default
];
forEachSupportedSystem = f: nixpkgs.lib.genAttrs supportedSystems (system: f rec {
pkgs = import nixpkgs { inherit overlays system; };
cranePkgs = pkgs.callPackage ./crane.nix {
inherit supportedSystems;
inherit (inputs) crane;
};
inherit system;
inherit (pkgs) lib;
});
in
{
overlays.default = final: prev:
let
craneLib = inputs.crane.mkLib final;
crateName = craneLib.crateNameFromCargoToml {
cargoToml = ./magic-nix-cache/Cargo.toml;
};
commonArgs = {
inherit (crateName) pname version;
src = inputs.self;
nativeBuildInputs = with final; [
pkg-config
];
buildInputs = [
inputs.nix.packages.${final.stdenv.system}.default
final.boost
];
};
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
in
{
magic-nix-cache = craneLib.buildPackage (commonArgs // {
inherit cargoArtifacts;
});
};
packages = forEachSupportedSystem ({ pkgs, ... }: rec {
magic-nix-cache = pkgs.magic-nix-cache;
packages = forEachSupportedSystem ({ pkgs, cranePkgs, ... }: rec {
inherit (cranePkgs) magic-nix-cache;
default = magic-nix-cache;
veryLongChain =
let
ctx = ./README.md;
# Function to write the current date to a file
startFile =
pkgs.stdenv.mkDerivation {
name = "start-file";
buildCommand = ''
cat ${ctx} > $out
'';
};
# Recursive function to create a chain of derivations
createChain = n: startFile:
pkgs.stdenv.mkDerivation {
name = "chain-${toString n}";
src =
if n == 0 then
startFile
else createChain (n - 1) startFile;
buildCommand = ''
echo $src > $out
'';
};
in
# Starting point of the chain
createChain 200 startFile;
});
devShells = forEachSupportedSystem ({ system, pkgs }: {
default = pkgs.mkShell {
devShells = forEachSupportedSystem ({ pkgs, cranePkgs, lib }: {
default = pkgs.mkShell ({
inputsFrom = [ cranePkgs.magic-nix-cache ];
packages = with pkgs; [
rustc
cargo
clippy
rustfmt
rust-analyzer
inputs.nix.packages.${stdenv.system}.default # for linking attic
boost # for linking attic
bashInteractive
pkg-config
cranePkgs.rustNightly
cargo-bloat
cargo-edit
cargo-udeps
cargo-watch
bacon
age
];
shellHook =
let
crossSystems = lib.filter (s: s != pkgs.system) (builtins.attrNames cranePkgs.crossPlatforms);
in
''
# Returns compiler environment variables for a platform
#
# getTargetFlags "suffixSalt" "nativeBuildInputs" "buildInputs"
getTargetFlags() {
# Here we only call the setup-hooks of nativeBuildInputs.
#
# What's off-limits for us:
#
# - findInputs
# - activatePackage
# - Other functions in stdenv setup that depend on the private accumulator variables
(
suffixSalt="$1"
nativeBuildInputs="$2"
buildInputs="$3"
RUST_SRC_PATH = "${pkgs.rustPlatform.rustcSrc}/library";
# Offsets for the nativeBuildInput (e.g., gcc)
hostOffset=-1
targetOffset=0
# In stdenv, the hooks are first accumulated before being called.
# Here we call them immediately
addEnvHooks() {
local depHostOffset="$1"
# For simplicity, we only call the hook on buildInputs
for pkg in $buildInputs; do
depTargetOffset=1
$2 $pkg
done
}
unset _PATH
unset NIX_CFLAGS_COMPILE
unset NIX_LDFLAGS
# For simplicity, we only call the setup-hooks of nativeBuildInputs
for nbi in $nativeBuildInputs; do
addToSearchPath _PATH "$nbi/bin"
if [ -e "$nbi/nix-support/setup-hook" ]; then
source "$nbi/nix-support/setup-hook"
fi
done
echo "export NIX_CFLAGS_COMPILE_''${suffixSalt}='$NIX_CFLAGS_COMPILE'"
echo "export NIX_LDFLAGS_''${suffixSalt}='$NIX_LDFLAGS'"
echo "export PATH=$PATH''${_PATH+:$_PATH}"
)
}
target_flags=$(mktemp)
${lib.concatMapStrings (system: let
crossPlatform = cranePkgs.crossPlatforms.${system};
in ''
getTargetFlags \
"${crossPlatform.cc.suffixSalt}" \
"${crossPlatform.cc} ${crossPlatform.cc.bintools}" \
"${builtins.concatStringsSep " " (crossPlatform.buildInputs ++ crossPlatform.pkgs.stdenv.defaultBuildInputs)}" >$target_flags
. $target_flags
'') crossSystems}
rm $target_flags
# Suffix flags for current system as well
export NIX_CFLAGS_COMPILE_${pkgs.stdenv.cc.suffixSalt}="$NIX_CFLAGS_COMPILE"
export NIX_LDFLAGS_${pkgs.stdenv.cc.suffixSalt}="$NIX_LDFLAGS"
unset NIX_CFLAGS_COMPILE
unset NIX_LDFLAGS
'';
} // cranePkgs.cargoCrossEnvs);
keygen = pkgs.mkShellNoCC {
packages = with pkgs; [
age
];
};
});
};

View file

@ -11,12 +11,12 @@ derivative = { version = "2.2.0", default-features = false }
futures = { version = "0.3.28", default-features = false, features = ["alloc"] }
hex = "0.4.3"
rand = { version = "0.8.5", default-features = false, features = ["std", "std_rng"] }
reqwest = { version = "0.12.5", default-features = false, features = ["json", "rustls-tls-native-roots", "stream", "trust-dns"] }
reqwest = { version = "0.11.17", default-features = false, features = ["json", "rustls-tls-native-roots", "stream", "trust-dns"] }
serde = { version = "1.0.162", default-features = false, features = ["derive"] }
serde_json = { version = "1.0.96", default-features = false }
sha2 = { version = "0.10.6", default-features = false }
thiserror = "1.0.40"
tokio = { version = "1.44.2", default-features = false, features = ["io-util"] }
tokio = { version = "1.28.0", default-features = false, features = ["io-util"] }
tracing = { version = "0.1.37", default-features = false }
unicode-bom = "2.0.2"

View file

@ -32,5 +32,5 @@ We should contribute support for the latter to [Octocrab](https://github.com/XAM
Since GHAC uses private APIs that use special tokens for authentication, we need to get them from a workflow run.
The easiest way is with the `keygen` workflow in this repo.
Generate an `age` encryption key with `nix shell nixpkgs#age --command age-keygen -o key.txt`, and add the Public Key as a repository secret named `AGE_PUBLIC_KEY`.
Generate an `age` encryption key with `age-keygen -o key.txt`, and add the Public Key as a repository secret named `AGE_PUBLIC_KEY`.
Then, trigger the `keygen` workflow which will print out a command that will let you decrypt the credentials.

View file

@ -4,8 +4,7 @@
use std::fmt;
#[cfg(debug_assertions)]
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
@ -48,19 +47,12 @@ const MAX_CONCURRENCY: usize = 4;
type Result<T> = std::result::Result<T, Error>;
pub type CircuitBreakerTrippedCallback = Arc<Box<dyn Fn() + Send + Sync>>;
/// An API error.
#[derive(Error, Debug)]
pub enum Error {
#[error("Failed to initialize the client: {0}")]
InitError(Box<dyn std::error::Error + Send + Sync>),
#[error(
"GitHub Actions Cache throttled Magic Nix Cache. Not trying to use it again on this run."
)]
CircuitBreakerTripped,
#[error("Request error: {0}")]
RequestError(#[from] reqwest::Error), // TODO: Better errors
@ -77,13 +69,14 @@ pub enum Error {
info: ApiErrorInfo,
},
#[error("I/O error: {0}, context: {1}")]
IoError(std::io::Error, String),
#[error("I/O error: {0}")]
IoError(#[from] std::io::Error),
#[error("Too many collisions")]
TooManyCollisions,
}
#[derive(Debug)]
pub struct Api {
/// Credentials to access the cache.
credentials: Credentials,
@ -103,10 +96,6 @@ pub struct Api {
/// The concurrent upload limit.
concurrency_limit: Arc<Semaphore>,
circuit_breaker_429_tripped: Arc<AtomicBool>,
circuit_breaker_429_tripped_callback: CircuitBreakerTrippedCallback,
/// Backend request statistics.
#[cfg(debug_assertions)]
stats: RequestStats,
@ -119,7 +108,7 @@ pub struct FileAllocation(CacheId);
/// The ID of a cache.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(transparent)]
struct CacheId(pub i64);
struct CacheId(pub i32);
/// An API error.
#[derive(Debug, Clone)]
@ -245,10 +234,7 @@ impl fmt::Display for ApiErrorInfo {
}
impl Api {
pub fn new(
credentials: Credentials,
circuit_breaker_429_tripped_callback: CircuitBreakerTrippedCallback,
) -> Result<Self> {
pub fn new(credentials: Credentials) -> Result<Self> {
let mut headers = HeaderMap::new();
let auth_header = {
let mut h = HeaderValue::from_str(&format!("Bearer {}", credentials.runtime_token))
@ -278,17 +264,11 @@ impl Api {
version_hasher,
client,
concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)),
circuit_breaker_429_tripped: Arc::new(AtomicBool::from(false)),
circuit_breaker_429_tripped_callback,
#[cfg(debug_assertions)]
stats: Default::default(),
})
}
pub fn circuit_breaker_tripped(&self) -> bool {
self.circuit_breaker_429_tripped.load(Ordering::Relaxed)
}
/// Mutates the cache version/namespace.
pub fn mutate_version(&mut self, data: &[u8]) {
self.version_hasher.update(data);
@ -339,22 +319,17 @@ impl Api {
Err(Error::TooManyCollisions)
}
/// Uploads a file. Returns the size of the file.
pub async fn upload_file<S>(&self, allocation: FileAllocation, mut stream: S) -> Result<usize>
/// Uploads a file.
pub async fn upload_file<S>(&self, allocation: FileAllocation, mut stream: S) -> Result<()>
where
S: AsyncRead + Unpin + Send,
{
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}
let mut offset = 0;
let mut futures = Vec::new();
loop {
let buf = BytesMut::with_capacity(CHUNK_SIZE);
let chunk = read_chunk_async(&mut stream, buf)
.await
.map_err(|e| Error::IoError(e, "Reading a chunk during upload".to_string()))?;
let chunk = read_chunk_async(&mut stream, buf).await?;
if chunk.is_empty() {
offset += chunk.len();
break;
@ -372,16 +347,10 @@ impl Api {
futures.push({
let client = self.client.clone();
let concurrency_limit = self.concurrency_limit.clone();
let circuit_breaker_429_tripped = self.circuit_breaker_429_tripped.clone();
let circuit_breaker_429_tripped_callback =
self.circuit_breaker_429_tripped_callback.clone();
let url = self.construct_url(&format!("caches/{}", allocation.0 .0));
tokio::task::spawn(async move {
let permit = concurrency_limit
.acquire()
.await
.expect("failed to acquire concurrency semaphore permit");
let permit = concurrency_limit.acquire().await.unwrap();
tracing::trace!(
"Starting uploading chunk {}-{}",
@ -411,9 +380,6 @@ impl Api {
drop(permit);
circuit_breaker_429_tripped
.check_result(&r, &circuit_breaker_429_tripped_callback);
r
})
});
@ -424,23 +390,17 @@ impl Api {
future::join_all(futures)
.await
.into_iter()
.try_for_each(|join_result| {
join_result.expect("failed collecting a join result during parallel upload")
})?;
.try_for_each(|join_result| join_result.unwrap())?;
tracing::debug!("Received all chunks for cache {:?}", allocation.0);
self.commit_cache(allocation.0, offset).await?;
Ok(offset)
Ok(())
}
/// Downloads a file based on a list of key prefixes.
pub async fn get_file_url(&self, keys: &[&str]) -> Result<Option<String>> {
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}
Ok(self
.get_cache_entry(keys)
.await?
@ -459,10 +419,6 @@ impl Api {
/// Retrieves a cache based on a list of key prefixes.
async fn get_cache_entry(&self, keys: &[&str]) -> Result<Option<ArtifactCacheEntry>> {
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}
#[cfg(debug_assertions)]
self.stats.get.fetch_add(1, Ordering::SeqCst);
@ -475,9 +431,6 @@ impl Api {
.check_json()
.await;
self.circuit_breaker_429_tripped
.check_result(&res, &self.circuit_breaker_429_tripped_callback);
match res {
Ok(entry) => Ok(Some(entry)),
Err(Error::DecodeError { status, .. }) if status == StatusCode::NO_CONTENT => Ok(None),
@ -495,10 +448,6 @@ impl Api {
key: &str,
cache_size: Option<usize>,
) -> Result<ReserveCacheResponse> {
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}
tracing::debug!("Reserving cache for {}", key);
let req = ReserveCacheRequest {
@ -517,20 +466,13 @@ impl Api {
.send()
.await?
.check_json()
.await;
.await?;
self.circuit_breaker_429_tripped
.check_result(&res, &self.circuit_breaker_429_tripped_callback);
res
Ok(res)
}
/// Finalizes uploading to a cache.
async fn commit_cache(&self, cache_id: CacheId, size: usize) -> Result<()> {
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}
tracing::debug!("Commiting cache {:?}", cache_id);
let req = CommitCacheRequest { size };
@ -538,31 +480,22 @@ impl Api {
#[cfg(debug_assertions)]
self.stats.post.fetch_add(1, Ordering::SeqCst);
if let Err(e) = self
.client
self.client
.post(self.construct_url(&format!("caches/{}", cache_id.0)))
.json(&req)
.send()
.await?
.check()
.await
{
self.circuit_breaker_429_tripped
.check_err(&e, &self.circuit_breaker_429_tripped_callback);
return Err(e);
}
.await?;
Ok(())
}
fn construct_url(&self, resource: &str) -> String {
let mut url = self.credentials.cache_url.clone();
if !url.ends_with('/') {
url.push('/');
}
url.push_str("_apis/artifactcache/");
url.push_str(resource);
url
format!(
"{}/_apis/artifactcache/{}",
self.credentials.cache_url, resource
)
}
}
@ -621,36 +554,3 @@ async fn handle_error(res: reqwest::Response) -> Error {
Error::ApiError { status, info }
}
trait AtomicCircuitBreaker {
fn check_err(&self, e: &Error, callback: &CircuitBreakerTrippedCallback);
fn check_result<T>(
&self,
r: &std::result::Result<T, Error>,
callback: &CircuitBreakerTrippedCallback,
);
}
impl AtomicCircuitBreaker for AtomicBool {
fn check_result<T>(
&self,
r: &std::result::Result<T, Error>,
callback: &CircuitBreakerTrippedCallback,
) {
if let Err(ref e) = r {
self.check_err(e, callback)
}
}
fn check_err(&self, e: &Error, callback: &CircuitBreakerTrippedCallback) {
if let Error::ApiError {
status: reqwest::StatusCode::TOO_MANY_REQUESTS,
..
} = e
{
tracing::info!("Disabling GitHub Actions Cache due to 429: Too Many Requests");
self.store(true, Ordering::Relaxed);
callback();
}
}
}

View file

@ -1,65 +1,35 @@
[package]
name = "magic-nix-cache"
version = "0.2.0"
version = "0.1.2"
edition = "2021"
license = "Apache-2.0"
[dependencies]
gha-cache = { path = "../gha-cache" }
axum = { version = "0.7.5", default-features = false, features = [
"json",
"tokio",
"http2",
"macros"
] }
clap = { version = "4.2.7", default-features = false, features = [
"std",
"derive",
"error-context",
"wrap_help",
] }
axum = { version = "0.6.18", default-features = false, features = ["json", "tokio"] }
axum-macros = "0.3.7"
clap = { version = "4.2.7", default-features = false, features = ["std", "derive", "error-context", "wrap_help"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", default-features = false, features = [
"ansi",
"env-filter",
"fmt",
"tracing-log",
"smallvec",
] }
tower-http = { version = "0.5.2", features = ["trace"] }
tracing-subscriber = { version = "0.3.17", default-features = false, features = ["ansi", "env-filter", "fmt", "tracing-log", "smallvec"] }
tower-http = { version = "0.4.0", features = ["trace"] }
serde = { version = "1.0.162", features = ["derive"] }
serde_json = { version = "1.0.96", default-features = false }
thiserror = "1.0.40"
tokio-stream = { version = "0.1.15", default-features = false }
tokio-util = { version = "0.7.11", features = ["io", "compat"] }
tokio-stream = { version = "0.1.14", default-features = false }
tokio-util = { version = "0.7.8", features = ["io"] }
daemonize = "0.5.0"
is_ci = "1.1.1"
sha2 = { version = "0.10.6", default-features = false }
reqwest = { version = "0.12.5", default-features = false, features = [
"blocking",
"rustls-tls-native-roots",
"trust-dns",
"json"
] }
netrc-rs = "0.1.2"
attic = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" }
attic-client = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" }
attic-server = { git = "https://github.com/DeterminateSystems/attic", branch = "fixups-for-magic-nix-cache" }
indicatif = "0.17"
anyhow = "1.0.71"
tempfile = "3.9"
uuid = { version = "1.4.0", features = ["serde", "v7", "rand", "std"] }
futures = "0.3"
async-compression = "0.4"
tracing-appender = "0.2.3"
http = "1.0"
http-body-util = "0.1"
hyper = { version = "1.0.0", features = ["full"] }
hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] }
xdg = { version = "2.5.2" }
reqwest = { version = "0.11.17", default-features = false, features = ["blocking", "rustls-tls-native-roots", "trust-dns"] }
[dependencies.tokio]
version = "1.44.2"
version = "1.28.0"
default-features = false
features = ["fs", "macros", "process", "rt", "rt-multi-thread", "sync"]
features = [
"fs",
"process",
"rt",
"rt-multi-thread",
"sync",
]

View file

@ -2,53 +2,45 @@
//!
//! This API is intended to be used by nix-installer-action.
use attic::nix_store::StorePath;
use axum::{extract::Extension, routing::post, Json, Router};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use axum::{extract::Extension, http::uri::Uri, routing::post, Json, Router};
use axum_macros::debug_handler;
use serde::Serialize;
use super::State;
use crate::error::{Error, Result};
use crate::error::Result;
use crate::util::{get_store_paths, upload_paths};
#[derive(Debug, Clone, Serialize)]
struct WorkflowStartResponse {
num_original_paths: Option<usize>,
num_original_paths: usize,
}
#[derive(Debug, Clone, Serialize)]
struct WorkflowFinishResponse {
num_original_paths: Option<usize>,
num_final_paths: Option<usize>,
num_new_paths: Option<usize>,
num_original_paths: usize,
num_final_paths: usize,
num_new_paths: usize,
}
pub fn get_router() -> Router {
Router::new()
.route("/api/workflow-start", post(workflow_start))
.route("/api/workflow-finish", post(workflow_finish))
.route("/api/enqueue-paths", post(post_enqueue_paths))
}
/// Record existing paths.
#[debug_handler]
async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> {
tracing::info!("Workflow started");
let reply = if let Some(original_paths) = &state.original_paths {
let mut original_paths = original_paths.lock().await;
*original_paths = crate::util::get_store_paths(&state.store).await?;
let reply = WorkflowStartResponse {
num_original_paths: Some(original_paths.len()),
};
let mut original_paths = state.original_paths.lock().await;
*original_paths = get_store_paths().await?;
state.metrics.num_original_paths.set(original_paths.len());
reply
} else {
WorkflowStartResponse {
num_original_paths: None,
}
};
Ok(Json(reply))
Ok(Json(WorkflowStartResponse {
num_original_paths: original_paths.len(),
}))
}
/// Push new paths and shut down.
@ -57,113 +49,42 @@ async fn workflow_finish(
) -> Result<Json<WorkflowFinishResponse>> {
tracing::info!("Workflow finished");
let response = if let Some(original_paths) = &state.original_paths {
let original_paths = original_paths.lock().await;
let final_paths = crate::util::get_store_paths(&state.store).await?;
let new_paths = final_paths
.difference(&original_paths)
.cloned()
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
.collect::<Result<Vec<_>>>()?;
let original_paths = state.original_paths.lock().await;
let final_paths = get_store_paths().await?;
let new_paths = final_paths
.difference(&original_paths)
.cloned()
.collect::<Vec<_>>();
let num_original_paths = original_paths.len();
let num_final_paths = final_paths.len();
let num_new_paths = new_paths.len();
tracing::info!("Pushing {} new paths", new_paths.len());
let store_uri = make_store_uri(&state.self_endpoint);
upload_paths(new_paths.clone(), &store_uri).await?;
let reply = WorkflowFinishResponse {
num_original_paths: Some(num_original_paths),
num_final_paths: Some(num_final_paths),
num_new_paths: Some(num_new_paths),
};
let sender = state.shutdown_sender.lock().await.take().unwrap();
sender.send(()).unwrap();
state.metrics.num_original_paths.set(num_original_paths);
state.metrics.num_final_paths.set(num_final_paths);
state.metrics.num_new_paths.set(num_new_paths);
// NOTE(cole-h): If we're substituting from an upstream cache, those paths won't have the
// post-build-hook run on it, so we diff the store to ensure we cache everything we can.
tracing::info!("Diffing the store and uploading any new paths before we shut down");
enqueue_paths(&state, new_paths).await?;
reply
} else {
WorkflowFinishResponse {
num_original_paths: None,
num_final_paths: None,
num_new_paths: None,
}
let reply = WorkflowFinishResponse {
num_original_paths: original_paths.len(),
num_final_paths: final_paths.len(),
num_new_paths: new_paths.len(),
};
if let Some(gha_cache) = &state.gha_cache {
tracing::info!("Waiting for GitHub action cache uploads to finish");
gha_cache.shutdown().await?;
}
state
.metrics
.num_original_paths
.set(reply.num_original_paths);
state.metrics.num_final_paths.set(reply.num_final_paths);
state.metrics.num_new_paths.set(reply.num_new_paths);
if let Some(attic_state) = state.flakehub_state.write().await.take() {
tracing::info!("Waiting for FlakeHub cache uploads to finish");
let paths = attic_state.push_session.wait().await?;
let paths = paths.keys().map(|s| s.name()).collect::<Vec<_>>();
tracing::info!(?paths, "FlakeHub Cache uploads completed");
} else {
tracing::info!("FlakeHub cache is not enabled, not uploading anything to it");
}
if let Some(sender) = state.shutdown_sender.lock().await.take() {
sender
.send(())
.map_err(|_| Error::Internal("Sending shutdown server message".to_owned()))?;
}
// NOTE(cole-h): see `init_logging`
if let Some(logfile) = &state.logfile {
let logfile_contents = std::fs::read_to_string(logfile)
.map_err(|e| crate::error::Error::Io(e, format!("Reading {}", logfile.display())))?;
println!("Every log line throughout the lifetime of the program:");
println!("\n{logfile_contents}\n");
}
Ok(Json(response))
Ok(Json(reply))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnqueuePathsRequest {
pub store_paths: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnqueuePathsResponse {}
/// Schedule paths in the local Nix store for uploading.
#[tracing::instrument(skip_all)]
async fn post_enqueue_paths(
Extension(state): Extension<State>,
Json(req): Json<EnqueuePathsRequest>,
) -> Result<Json<EnqueuePathsResponse>> {
tracing::info!("Enqueueing {:?}", req.store_paths);
let store_paths = req
.store_paths
.iter()
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
.collect::<Result<Vec<_>>>()?;
enqueue_paths(&state, store_paths).await?;
Ok(Json(EnqueuePathsResponse {}))
}
pub async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
if let Some(gha_cache) = &state.gha_cache {
gha_cache
.enqueue_paths(state.store.clone(), store_paths.clone())
.await?;
}
if let Some(flakehub_state) = &*state.flakehub_state.read().await {
crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?;
}
Ok(())
fn make_store_uri(self_endpoint: &SocketAddr) -> String {
Uri::builder()
.scheme("http")
.authority(self_endpoint.to_string())
.path_and_query("/?compression=zstd&parallel-compression=true")
.build()
.unwrap()
.to_string()
}

View file

@ -1,12 +1,14 @@
//! Binary Cache API.
use std::io;
use axum::{
extract::{Extension, Path},
extract::{BodyStream, Extension, Path},
response::Redirect,
routing::{get, put},
Router,
};
use futures::StreamExt as _;
use tokio_stream::StreamExt;
use tokio_util::io::StreamReader;
use super::State;
@ -49,7 +51,7 @@ async fn get_narinfo(
let key = format!("{}.narinfo", store_path_hash);
if state
.narinfo_negative_cache
.narinfo_nagative_cache
.read()
.await
.contains(&store_path_hash)
@ -59,25 +61,22 @@ async fn get_narinfo(
return pull_through(&state, &path);
}
if let Some(gha_cache) = &state.gha_cache {
if let Some(url) = gha_cache.api.get_file_url(&[&key]).await? {
state.metrics.narinfos_served.incr();
return Ok(Redirect::temporary(&url));
}
if let Some(url) = state.api.get_file_url(&[&key]).await? {
state.metrics.narinfos_served.incr();
return Ok(Redirect::temporary(&url));
}
let mut negative_cache = state.narinfo_negative_cache.write().await;
let mut negative_cache = state.narinfo_nagative_cache.write().await;
negative_cache.insert(store_path_hash);
state.metrics.narinfos_sent_upstream.incr();
state.metrics.narinfos_negative_cache_misses.incr();
pull_through(&state, &path)
}
async fn put_narinfo(
Extension(state): Extension<State>,
Path(path): Path<String>,
body: axum::body::Body,
body: BodyStream,
) -> Result<()> {
let components: Vec<&str> = path.splitn(2, '.').collect();
@ -89,23 +88,17 @@ async fn put_narinfo(
return Err(Error::BadRequest);
}
let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?;
let store_path_hash = components[0].to_string();
let key = format!("{}.narinfo", store_path_hash);
let allocation = gha_cache.api.allocate_file_with_random_suffix(&key).await?;
let body_stream = body.into_data_stream();
let allocation = state.api.allocate_file_with_random_suffix(&key).await?;
let stream = StreamReader::new(
body_stream
.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);
gha_cache.api.upload_file(allocation, stream).await?;
state.api.upload_file(allocation, stream).await?;
state.metrics.narinfos_uploaded.incr();
state
.narinfo_negative_cache
.narinfo_nagative_cache
.write()
.await
.remove(&store_path_hash);
@ -114,14 +107,7 @@ async fn put_narinfo(
}
async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -> Result<Redirect> {
if let Some(url) = state
.gha_cache
.as_ref()
.ok_or(Error::GHADisabled)?
.api
.get_file_url(&[&path])
.await?
{
if let Some(url) = state.api.get_file_url(&[&path]).await? {
state.metrics.nars_served.incr();
return Ok(Redirect::temporary(&url));
}
@ -133,26 +119,16 @@ async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -
Err(Error::NotFound)
}
}
async fn put_nar(
Extension(state): Extension<State>,
Path(path): Path<String>,
body: axum::body::Body,
body: BodyStream,
) -> Result<()> {
let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?;
let allocation = gha_cache
.api
.allocate_file_with_random_suffix(&path)
.await?;
let body_stream = body.into_data_stream();
let allocation = state.api.allocate_file_with_random_suffix(&path).await?;
let stream = StreamReader::new(
body_stream
.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);
gha_cache.api.upload_file(allocation, stream).await?;
state.api.upload_file(allocation, stream).await?;
state.metrics.nars_uploaded.incr();
Ok(())

View file

@ -1,50 +0,0 @@
use std::fmt::{self, Display};
#[derive(Clone, Copy)]
pub enum Environment {
GitHubActions,
GitLabCI,
Other,
}
impl Environment {
pub fn determine() -> Self {
if env_var_is_true("GITHUB_ACTIONS") {
return Environment::GitHubActions;
}
if env_var_is_true("GITLAB_CI") {
return Environment::GitLabCI;
}
Environment::Other
}
pub fn is_github_actions(&self) -> bool {
matches!(self, Self::GitHubActions)
}
pub fn is_gitlab_ci(&self) -> bool {
matches!(self, Self::GitLabCI)
}
}
impl Display for Environment {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use Environment::*;
write!(
f,
"{}",
match self {
GitHubActions => "GitHub Actions",
GitLabCI => "GitLab CI",
Other => "an unspecified environment",
}
)
}
}
fn env_var_is_true(e: &str) -> bool {
std::env::var(e).is_ok_and(|v| v == "true")
}

View file

@ -19,38 +19,11 @@ pub enum Error {
#[error("Bad Request")]
BadRequest,
#[error("I/O error: {0}. Context: {1}")]
Io(std::io::Error, String),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("GHA cache is disabled")]
GHADisabled,
#[error("FlakeHub cache error: {0}")]
FlakeHub(#[from] anyhow::Error),
#[error("FlakeHub HTTP error: {0}")]
FlakeHubHttp(#[from] reqwest::Error),
#[error("Got HTTP response {0} getting the cache name from FlakeHub: {1}")]
GetCacheName(reqwest::StatusCode, String),
#[error("netrc parse error: {0}")]
Netrc(netrc_rs::Error),
#[error("Cannot find netrc credentials for {0}")]
MissingCreds(String),
#[error("Attic error: {0}")]
Attic(#[from] attic::AtticError),
#[error("Bad URL")]
BadUrl(reqwest::Url),
#[error("Configuration error: {0}")]
Config(String),
#[error("Internal error: {0}")]
Internal(String),
#[error("Failed to upload paths")]
FailedToUpload,
}
impl IntoResponse for Error {

View file

@ -1,492 +0,0 @@
use crate::env::Environment;
use crate::error::{Error, Result};
use crate::DETERMINATE_NETRC_PATH;
use anyhow::Context;
use attic::cache::CacheName;
use attic::nix_store::{NixStore, StorePath};
use attic_client::push::{PushSession, PushSessionConfig};
use attic_client::{
api::ApiClient,
config::ServerConfig,
push::{PushConfig, Pusher},
};
use reqwest::header::HeaderValue;
use reqwest::Url;
use serde::Deserialize;
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::RwLock;
use uuid::Uuid;
const USER_AGENT: &str = "magic-nix-cache";
pub struct State {
#[allow(dead_code)]
pub substituter: Url,
pub push_session: PushSession,
}
pub async fn init_cache(
environment: Environment,
flakehub_api_server: &Url,
flakehub_cache_server: &Url,
flakehub_flake_name: &Option<String>,
store: Arc<NixStore>,
auth_method: &super::FlakeHubAuthSource,
) -> Result<State> {
// Parse netrc to get the credentials for api.flakehub.com.
let netrc_path = auth_method.as_path_buf();
let NetrcInfo {
netrc,
flakehub_cache_server_hostname,
flakehub_login,
flakehub_password,
} = extract_info_from_netrc(&netrc_path, flakehub_api_server, flakehub_cache_server).await?;
if let super::FlakeHubAuthSource::Netrc(netrc_path) = auth_method {
// Append an entry for the FlakeHub cache server to netrc.
if !netrc
.machines
.iter()
.any(|machine| machine.name.as_ref() == Some(&flakehub_cache_server_hostname))
{
let mut netrc_file = tokio::fs::OpenOptions::new()
.create(false)
.append(true)
.open(netrc_path)
.await
.map_err(|e| {
Error::Internal(format!(
"Failed to open {} for appending: {}",
netrc_path.display(),
e
))
})?;
netrc_file
.write_all(
format!(
"\nmachine {} login {} password {}\n\n",
flakehub_cache_server_hostname, flakehub_login, flakehub_password,
)
.as_bytes(),
)
.await
.map_err(|e| {
Error::Internal(format!(
"Failed to write credentials to {}: {}",
netrc_path.display(),
e
))
})?;
}
}
let server_config = ServerConfig {
endpoint: flakehub_cache_server.to_string(),
token: Some(attic_client::config::ServerTokenConfig::Raw {
token: flakehub_password.clone(),
}),
};
let api_inner = ApiClient::from_server_config(server_config)?;
let api = Arc::new(RwLock::new(api_inner));
// Periodically refresh JWT in GitHub Actions environment
if environment.is_github_actions() {
match auth_method {
super::FlakeHubAuthSource::Netrc(path) => {
let netrc_path_clone = path.to_path_buf();
let initial_github_jwt_clone = flakehub_password.clone();
let flakehub_cache_server_clone = flakehub_cache_server.to_string();
let api_clone = api.clone();
tokio::task::spawn(refresh_github_actions_jwt_worker(
netrc_path_clone,
initial_github_jwt_clone,
flakehub_cache_server_clone,
api_clone,
));
}
crate::FlakeHubAuthSource::DeterminateNixd => {
let api_clone = api.clone();
let netrc_file = PathBuf::from(DETERMINATE_NETRC_PATH);
let flakehub_api_server_clone = flakehub_api_server.clone();
let flakehub_cache_server_clone = flakehub_cache_server.clone();
let initial_meta = tokio::fs::metadata(&netrc_file).await.map_err(|e| {
Error::Io(e, format!("getting metadata of {}", netrc_file.display()))
})?;
let initial_inode = initial_meta.ino();
tokio::task::spawn(refresh_determinate_token_worker(
netrc_file,
initial_inode,
flakehub_api_server_clone,
flakehub_cache_server_clone,
api_clone,
));
}
}
}
// Get the cache UUID for this project.
let cache_name = {
let mut url = flakehub_api_server
.join("project")
.map_err(|_| Error::Config(format!("bad URL '{}'", flakehub_api_server)))?;
if let Some(flakehub_flake_name) = flakehub_flake_name {
if !flakehub_flake_name.is_empty() {
url = flakehub_api_server
.join(&format!("project/{}", flakehub_flake_name))
.map_err(|_| Error::Config(format!("bad URL '{}'", flakehub_api_server)))?;
}
}
let response = reqwest::Client::new()
.get(url.to_owned())
.header("User-Agent", USER_AGENT)
.basic_auth(flakehub_login, Some(&flakehub_password))
.send()
.await?;
if !response.status().is_success() {
return Err(Error::GetCacheName(
response.status(),
response.text().await?,
));
}
#[derive(Deserialize)]
struct ProjectInfo {
organization_uuid_v7: Uuid,
project_uuid_v7: Uuid,
}
let project_info = response.json::<ProjectInfo>().await?;
format!(
"{}:{}",
project_info.organization_uuid_v7, project_info.project_uuid_v7,
)
};
tracing::info!("Using cache {:?}", cache_name);
let cache = unsafe { CacheName::new_unchecked(cache_name) };
let cache_config = api.read().await.get_cache_config(&cache).await?;
let push_config = PushConfig {
num_workers: 5, // FIXME: use number of CPUs?
force_preamble: false,
};
let mp = indicatif::MultiProgress::new();
let push_session = Pusher::new(
store.clone(),
api.clone(),
cache.to_owned(),
cache_config,
mp,
push_config,
)
.into_push_session(PushSessionConfig {
no_closure: false,
ignore_upstream_cache_filter: false,
});
let state = State {
substituter: flakehub_cache_server.to_owned(),
push_session,
};
Ok(state)
}
#[derive(Debug)]
struct NetrcInfo {
netrc: netrc_rs::Netrc,
flakehub_cache_server_hostname: String,
flakehub_login: String,
flakehub_password: String,
}
#[tracing::instrument]
async fn extract_info_from_netrc(
netrc_path: &Path,
flakehub_api_server: &Url,
flakehub_cache_server: &Url,
) -> Result<NetrcInfo> {
let netrc = {
let mut netrc_file = File::open(netrc_path).await.map_err(|e| {
Error::Internal(format!("Failed to open {}: {}", netrc_path.display(), e))
})?;
let mut netrc_contents = String::new();
netrc_file
.read_to_string(&mut netrc_contents)
.await
.map_err(|e| {
Error::Internal(format!(
"Failed to read {} contents: {}",
netrc_path.display(),
e
))
})?;
netrc_rs::Netrc::parse(netrc_contents, false).map_err(Error::Netrc)?
};
let flakehub_netrc_entry = netrc
.machines
.iter()
.find(|machine| {
machine.name.as_ref() == flakehub_api_server.host().map(|x| x.to_string()).as_ref()
})
.ok_or_else(|| Error::MissingCreds(flakehub_api_server.to_string()))?
.to_owned();
let flakehub_cache_server_hostname = flakehub_cache_server
.host()
.ok_or_else(|| Error::BadUrl(flakehub_cache_server.to_owned()))?
.to_string();
let flakehub_login = flakehub_netrc_entry.login.ok_or_else(|| {
Error::Config(format!(
"netrc file does not contain a login for '{}'",
flakehub_api_server
))
})?;
let flakehub_password = flakehub_netrc_entry.password.ok_or_else(|| {
Error::Config(format!(
"netrc file does not contain a password for '{}'",
flakehub_api_server
))
})?;
Ok(NetrcInfo {
netrc,
flakehub_cache_server_hostname,
flakehub_login,
flakehub_password,
})
}
pub async fn enqueue_paths(state: &State, store_paths: Vec<StorePath>) -> Result<()> {
state.push_session.queue_many(store_paths)?;
Ok(())
}
/// Refresh the GitHub Actions JWT every 2 minutes (slightly less than half of the default validity
/// period) to ensure pushing / pulling doesn't stop working.
#[tracing::instrument(skip_all)]
async fn refresh_github_actions_jwt_worker(
netrc_path: std::path::PathBuf,
mut github_jwt: String,
flakehub_cache_server_clone: String,
api: Arc<RwLock<ApiClient>>,
) -> Result<()> {
// NOTE(cole-h): This is a workaround -- at the time of writing, GitHub Actions JWTs are only
// valid for 5 minutes after being issued. FlakeHub uses these JWTs for authentication, which
// means that after those 5 minutes have passed and the token is expired, FlakeHub (and by
// extension FlakeHub Cache) will no longer allow requests using this token. However, GitHub
// gives us a way to repeatedly request new tokens, so we utilize that and refresh the token
// every 2 minutes (less than half of the lifetime of the token).
// TODO(cole-h): this should probably be half of the token's lifetime ((exp - iat) / 2), but
// getting this is nontrivial so I'm not going to do it until GitHub changes the lifetime and
// breaks this.
let next_refresh = std::time::Duration::from_secs(2 * 60);
// NOTE(cole-h): we sleep until the next refresh at first because we already got a token from
// GitHub recently, don't need to try again until we actually might need to get a new one.
tokio::time::sleep(next_refresh).await;
// NOTE(cole-h): https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/configuring-openid-connect-in-cloud-providers#requesting-the-jwt-using-environment-variables
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::ACCEPT,
HeaderValue::from_static("application/json;api-version=2.0"),
);
headers.insert(
reqwest::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
let github_client = reqwest::Client::builder()
.user_agent(USER_AGENT)
.default_headers(headers)
.build()?;
loop {
match rewrite_github_actions_token(&github_client, &netrc_path, &github_jwt).await {
Ok(new_github_jwt) => {
github_jwt = new_github_jwt;
let server_config = ServerConfig {
endpoint: flakehub_cache_server_clone.clone(),
token: Some(attic_client::config::ServerTokenConfig::Raw {
token: github_jwt.clone(),
}),
};
let new_api = ApiClient::from_server_config(server_config)?;
{
let mut api_client = api.write().await;
*api_client = new_api;
}
tracing::debug!(
"Stored new token in netrc and API client, sleeping for {next_refresh:?}"
);
tokio::time::sleep(next_refresh).await;
}
Err(e) => {
tracing::error!(
?e,
"Failed to get a new JWT from GitHub, trying again in 10 seconds"
);
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}
}
}
#[tracing::instrument(skip_all)]
async fn rewrite_github_actions_token(
client: &reqwest::Client,
netrc_path: &Path,
old_github_jwt: &str,
) -> Result<String> {
// NOTE(cole-h): https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/configuring-openid-connect-in-cloud-providers#requesting-the-jwt-using-environment-variables
let runtime_token = std::env::var("ACTIONS_ID_TOKEN_REQUEST_TOKEN").map_err(|e| {
Error::Internal(format!(
"ACTIONS_ID_TOKEN_REQUEST_TOKEN was invalid unicode: {e}"
))
})?;
let runtime_url = std::env::var("ACTIONS_ID_TOKEN_REQUEST_URL").map_err(|e| {
Error::Internal(format!(
"ACTIONS_ID_TOKEN_REQUEST_URL was invalid unicode: {e}"
))
})?;
let token_request_url = format!("{runtime_url}&audience=api.flakehub.com");
let token_response = client
.request(reqwest::Method::GET, &token_request_url)
.bearer_auth(runtime_token)
.send()
.await
.with_context(|| format!("sending request to {token_request_url}"))?;
if let Err(e) = token_response.error_for_status_ref() {
tracing::error!(?e, "Got error response when requesting token");
return Err(e)?;
}
#[derive(serde::Deserialize)]
struct TokenResponse {
value: String,
}
let token_response: TokenResponse = token_response
.json()
.await
.with_context(|| "converting response into json")?;
let new_github_jwt_string = token_response.value;
let netrc_contents = tokio::fs::read_to_string(netrc_path)
.await
.with_context(|| format!("failed to read {netrc_path:?} to string"))?;
let new_netrc_contents = netrc_contents.replace(old_github_jwt, &new_github_jwt_string);
// NOTE(cole-h): create the temporary file right next to the real one so we don't run into
// cross-device linking issues when renaming
let netrc_path_tmp = netrc_path.with_extension("tmp");
tokio::fs::write(&netrc_path_tmp, new_netrc_contents)
.await
.with_context(|| format!("writing new JWT to {netrc_path_tmp:?}"))?;
tokio::fs::rename(&netrc_path_tmp, &netrc_path)
.await
.with_context(|| format!("renaming {netrc_path_tmp:?} to {netrc_path:?}"))?;
Ok(new_github_jwt_string)
}
#[tracing::instrument(skip_all)]
async fn refresh_determinate_token_worker(
netrc_file: PathBuf,
mut inode: u64,
flakehub_api_server: Url,
flakehub_cache_server: Url,
api_clone: Arc<RwLock<ApiClient>>,
) {
// NOTE(cole-h): This is a workaround -- at the time of writing, determinate-nixd handles the
// GitHub Actions JWT refreshing for us, which means we don't know when this will happen. At the
// moment, it does it roughly every 2 minutes (less than half of the total lifetime of the
// issued token).
loop {
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let meta = tokio::fs::metadata(&netrc_file)
.await
.map_err(|e| Error::Io(e, format!("getting metadata of {}", netrc_file.display())));
let Ok(meta) = meta else {
tracing::error!(e = ?meta);
continue;
};
let current_inode = meta.ino();
if current_inode == inode {
tracing::debug!("current inode is the same, file didn't change");
continue;
}
tracing::debug!("current inode is different, file changed");
inode = current_inode;
let flakehub_password = match extract_info_from_netrc(
&netrc_file,
&flakehub_api_server,
&flakehub_cache_server,
)
.await
{
Ok(NetrcInfo {
flakehub_password, ..
}) => flakehub_password,
Err(e) => {
tracing::error!(?e, "Failed to extract auth info from netrc");
continue;
}
};
let server_config = ServerConfig {
endpoint: flakehub_cache_server.to_string(),
token: Some(attic_client::config::ServerTokenConfig::Raw {
token: flakehub_password,
}),
};
let new_api = ApiClient::from_server_config(server_config.clone());
let Ok(new_api) = new_api else {
tracing::error!(e = ?new_api, "Failed to construct new ApiClient");
continue;
};
{
let mut api_client = api_clone.write().await;
*api_client = new_api;
}
tracing::debug!("Stored new token in API client, sleeping for 30s");
}
}

View file

@ -1,254 +0,0 @@
use std::{collections::HashSet, sync::Arc};
use crate::error::{Error, Result};
use crate::telemetry;
use async_compression::tokio::bufread::ZstdEncoder;
use attic::nix_store::{NixStore, StorePath, ValidPathInfo};
use attic_server::narinfo::{Compression, NarInfo};
use futures::stream::TryStreamExt;
use gha_cache::{Api, Credentials};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
RwLock,
};
use tokio_util::compat::FuturesAsyncReadCompatExt;
pub struct GhaCache {
/// The GitHub Actions Cache API.
pub api: Arc<Api>,
/// The future from the completion of the worker.
worker_result: RwLock<Option<tokio::task::JoinHandle<Result<()>>>>,
channel_tx: UnboundedSender<Request>,
}
#[derive(Debug)]
enum Request {
Shutdown,
Upload(StorePath),
}
impl GhaCache {
pub fn new(
credentials: Credentials,
cache_version: Option<String>,
store: Arc<NixStore>,
metrics: Arc<telemetry::TelemetryReport>,
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
) -> Result<GhaCache> {
let cb_metrics = metrics.clone();
let mut api = Api::new(
credentials,
Arc::new(Box::new(move || {
cb_metrics
.tripped_429
.store(true, std::sync::atomic::Ordering::Relaxed);
})),
)?;
if let Some(cache_version) = &cache_version {
api.mutate_version(cache_version.as_bytes());
}
let (channel_tx, channel_rx) = unbounded_channel();
let api = Arc::new(api);
let api2 = api.clone();
let worker_result = tokio::task::spawn(async move {
worker(
&api2,
store,
channel_rx,
metrics,
narinfo_negative_cache.clone(),
)
.await
});
Ok(GhaCache {
api,
worker_result: RwLock::new(Some(worker_result)),
channel_tx,
})
}
pub async fn shutdown(&self) -> Result<()> {
if let Some(worker_result) = self.worker_result.write().await.take() {
self.channel_tx
.send(Request::Shutdown)
.expect("Cannot send shutdown message");
worker_result
.await
.expect("failed to read result from gha worker")
} else {
Ok(())
}
}
pub async fn enqueue_paths(
&self,
store: Arc<NixStore>,
store_paths: Vec<StorePath>,
) -> Result<()> {
// FIXME: make sending the closure optional. We might want to
// only send the paths that have been built by the user, under
// the assumption that everything else is already in a binary
// cache.
// FIXME: compute_fs_closure_multi doesn't return a
// toposort, though it doesn't really matter for the GHA
// cache.
let closure = store
.compute_fs_closure_multi(store_paths, false, false, false)
.await?;
for p in closure {
self.channel_tx
.send(Request::Upload(p))
.map_err(|_| Error::Internal("Cannot send upload message".to_owned()))?;
}
Ok(())
}
}
async fn worker(
api: &Api,
store: Arc<NixStore>,
mut channel_rx: UnboundedReceiver<Request>,
metrics: Arc<telemetry::TelemetryReport>,
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
) -> Result<()> {
let mut done = HashSet::new();
while let Some(req) = channel_rx.recv().await {
match req {
Request::Shutdown => {
break;
}
Request::Upload(path) => {
if api.circuit_breaker_tripped() {
tracing::trace!("GitHub Actions gave us a 429, so we're done.",);
continue;
}
if !done.insert(path.clone()) {
continue;
}
if let Err(err) = upload_path(
api,
store.clone(),
&path,
metrics.clone(),
narinfo_negative_cache.clone(),
)
.await
{
tracing::error!(
"Upload of path '{}' failed: {}",
store.get_full_path(&path).display(),
err
);
}
}
}
}
Ok(())
}
async fn upload_path(
api: &Api,
store: Arc<NixStore>,
path: &StorePath,
metrics: Arc<telemetry::TelemetryReport>,
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
) -> Result<()> {
let path_info = store.query_path_info(path.clone()).await?;
// Upload the NAR.
let nar_path = format!("{}.nar.zstd", path_info.nar_hash.to_base32());
let nar_allocation = api.allocate_file_with_random_suffix(&nar_path).await?;
let nar_stream = store.nar_from_path(path.clone());
let nar_reader = nar_stream
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read();
let nar_compressor = ZstdEncoder::new(nar_reader.compat());
let compressed_nar_size = api.upload_file(nar_allocation, nar_compressor).await?;
metrics.nars_uploaded.incr();
tracing::debug!(
"Uploaded '{}' (size {} -> {})",
nar_path,
path_info.nar_size,
compressed_nar_size
);
// Upload the narinfo.
let narinfo_path = format!("{}.narinfo", path.to_hash().as_str());
let narinfo_allocation = api.allocate_file_with_random_suffix(&narinfo_path).await?;
let narinfo = path_info_to_nar_info(store.clone(), &path_info, format!("nar/{}", nar_path))
.to_string()
.expect("failed to convert path into to nar info");
tracing::debug!("Uploading '{}'", narinfo_path);
api.upload_file(narinfo_allocation, narinfo.as_bytes())
.await?;
metrics.narinfos_uploaded.incr();
narinfo_negative_cache
.write()
.await
.remove(&path.to_hash().to_string());
tracing::info!(
"Uploaded '{}' to the GitHub Action Cache",
store.get_full_path(path).display()
);
Ok(())
}
// FIXME: move to attic.
fn path_info_to_nar_info(store: Arc<NixStore>, path_info: &ValidPathInfo, url: String) -> NarInfo {
NarInfo {
store_path: store.get_full_path(&path_info.path),
url,
compression: Compression::Zstd,
file_hash: None,
file_size: None,
nar_hash: path_info.nar_hash.clone(),
nar_size: path_info.nar_size as usize,
references: path_info
.references
.iter()
.map(|r| {
r.file_name()
.and_then(|n| n.to_str())
.unwrap_or_else(|| {
panic!(
"failed to convert nar_info reference to string: {}",
r.display()
)
})
.to_owned()
})
.collect(),
system: None,
deriver: None,
signature: None,
ca: path_info.ca.clone(),
}
}

View file

@ -2,6 +2,7 @@
asm_sub_register,
deprecated,
missing_abi,
unsafe_code,
unused_macros,
unused_must_use,
unused_unsafe
@ -14,52 +15,40 @@
mod api;
mod binary_cache;
mod env;
mod error;
mod flakehub;
mod gha;
mod pbh;
mod telemetry;
mod util;
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::io::Write;
use std::fs::{self, File};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::os::fd::OwnedFd;
use std::path::PathBuf;
use std::sync::Arc;
use ::attic::nix_store::NixStore;
use anyhow::{anyhow, Context, Result};
use axum::{extract::Extension, routing::get, Router};
use clap::Parser;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::sync::{oneshot, Mutex, RwLock};
use daemonize::Daemonize;
use tokio::{
runtime::Runtime,
sync::{oneshot, Mutex, RwLock},
};
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use gha_cache::Credentials;
const DETERMINATE_STATE_DIR: &str = "/nix/var/determinate";
const DETERMINATE_NIXD_SOCKET_NAME: &str = "determinate-nixd.socket";
const DETERMINATE_NETRC_PATH: &str = "/nix/var/determinate/netrc";
// TODO(colemickens): refactor, move with other UDS stuff (or all PBH stuff) to new file
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "c", rename_all = "kebab-case")]
pub struct BuiltPathResponseEventV1 {
pub drv: PathBuf,
pub outputs: Vec<PathBuf>,
}
use gha_cache::{Api, Credentials};
type State = Arc<StateInner>;
/// GitHub Actions-powered Nix binary cache
#[derive(Parser, Debug)]
struct Args {
/// JSON file containing credentials.
///
/// If this is not specified, credentials will be loaded
/// from the environment.
#[arg(short = 'c', long)]
credentials_file: Option<PathBuf>,
/// Address to listen on.
///
/// FIXME: IPv6
@ -90,118 +79,18 @@ struct Args {
)]
diagnostic_endpoint: String,
/// The FlakeHub API server.
#[arg(long, default_value = "https://api.flakehub.com")]
flakehub_api_server: reqwest::Url,
/// The path of the `netrc` file that contains the FlakeHub JWT token.
#[arg(long)]
flakehub_api_server_netrc: Option<PathBuf>,
/// The FlakeHub binary cache server.
#[arg(long, default_value = "https://cache.flakehub.com")]
flakehub_cache_server: reqwest::Url,
#[arg(long)]
flakehub_flake_name: Option<String>,
/// The location of `nix.conf`.
#[arg(long, default_value_os_t = default_nix_conf())]
nix_conf: PathBuf,
/// Whether to use the GHA cache.
#[arg(long)]
use_gha_cache: Option<Option<CacheTrinary>>,
/// Whether to use the FlakeHub binary cache.
#[arg(long)]
use_flakehub: Option<Option<CacheTrinary>>,
/// URL to which to post startup notification.
#[arg(long)]
startup_notification_url: Option<reqwest::Url>,
/// File to write to when indicating startup.
#[arg(long)]
startup_notification_file: Option<PathBuf>,
/// Whether or not to diff the store before and after Magic Nix Cache runs
#[arg(long, default_value_t = false)]
diff_store: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, clap::ValueEnum)]
pub enum CacheTrinary {
NoPreference,
Enabled,
Disabled,
}
impl From<Option<Option<CacheTrinary>>> for CacheTrinary {
fn from(b: Option<Option<CacheTrinary>>) -> Self {
match b {
None => CacheTrinary::NoPreference,
Some(None) => CacheTrinary::Enabled,
Some(Some(v)) => v,
}
}
}
#[derive(PartialEq, Clone, Copy)]
pub enum Dnixd {
Available,
Missing,
}
impl From<bool> for Dnixd {
fn from(b: bool) -> Self {
if b {
Dnixd::Available
} else {
Dnixd::Missing
}
}
}
impl Args {
fn validate(&self, environment: env::Environment) -> Result<(), error::Error> {
if environment.is_gitlab_ci() && self.github_cache_preference() == CacheTrinary::Enabled {
return Err(error::Error::Config(String::from(
"the --use-gha-cache flag should not be applied in GitLab CI",
)));
}
if environment.is_gitlab_ci() && self.flakehub_preference() != CacheTrinary::Enabled {
return Err(error::Error::Config(String::from(
"you must set --use-flakehub in GitLab CI",
)));
}
Ok(())
}
fn github_cache_preference(&self) -> CacheTrinary {
self.use_gha_cache.into()
}
fn flakehub_preference(&self) -> CacheTrinary {
self.use_flakehub.into()
}
}
fn default_nix_conf() -> PathBuf {
xdg::BaseDirectories::new()
.with_context(|| "identifying XDG base directories")
.expect(
"Could not identify your home directory. Try setting the HOME environment variable.",
)
.get_config_file("nix/nix.conf")
/// Daemonize the server.
///
/// This is for use in the GitHub Action only.
#[arg(long, hide = true)]
daemon_dir: Option<PathBuf>,
}
/// The global server state.
#[derive(Debug)]
struct StateInner {
/// State for uploading to the GHA cache.
gha_cache: Option<gha::GhaCache>,
/// The GitHub Actions Cache API.
api: Api,
/// The upstream cache.
upstream: Option<String>,
@ -209,203 +98,35 @@ struct StateInner {
/// The sender half of the oneshot channel to trigger a shutdown.
shutdown_sender: Mutex<Option<oneshot::Sender<()>>>,
/// List of store paths originally present.
original_paths: Mutex<HashSet<PathBuf>>,
/// Set of store path hashes that are not present in GHAC.
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
narinfo_nagative_cache: RwLock<HashSet<String>>,
/// Endpoint of ourselves.
///
/// This is used by our Action API to invoke `nix copy` to upload new paths.
self_endpoint: SocketAddr,
/// Metrics for sending to perf at shutdown
metrics: Arc<telemetry::TelemetryReport>,
/// Connection to the local Nix store.
store: Arc<NixStore>,
/// FlakeHub cache state.
flakehub_state: RwLock<Option<flakehub::State>>,
/// Where all of tracing will log to when GitHub Actions is run in debug mode
logfile: Option<PathBuf>,
/// The paths in the Nix store when Magic Nix Cache started, if store diffing is enabled.
original_paths: Option<Mutex<HashSet<PathBuf>>>,
metrics: telemetry::TelemetryReport,
}
#[derive(Debug, Clone)]
pub(crate) enum FlakeHubAuthSource {
DeterminateNixd,
Netrc(PathBuf),
}
impl FlakeHubAuthSource {
pub(crate) fn as_path_buf(&self) -> PathBuf {
match &self {
Self::Netrc(path) => path.clone(),
Self::DeterminateNixd => {
let mut path = PathBuf::from(DETERMINATE_STATE_DIR);
path.push("netrc");
path
}
}
}
}
async fn main_cli() -> Result<()> {
let guard = init_logging()?;
let _tracing_guard = guard.appender_guard;
fn main() {
init_logging();
let args = Args::parse();
let environment = env::Environment::determine();
tracing::debug!("Running in {}", environment.to_string());
args.validate(environment)?;
let metrics = Arc::new(telemetry::TelemetryReport::new());
let credentials = if let Some(credentials_file) = &args.credentials_file {
tracing::info!("Loading credentials from {:?}", credentials_file);
let bytes = fs::read(credentials_file).expect("Failed to read credentials file");
let dnixd_uds_socket_dir: &Path = Path::new(&DETERMINATE_STATE_DIR);
let dnixd_uds_socket_path = dnixd_uds_socket_dir.join(DETERMINATE_NIXD_SOCKET_NAME);
let dnixd_available: Dnixd = dnixd_uds_socket_path.exists().into();
let nix_conf_path: PathBuf = args.nix_conf.clone();
// NOTE: we expect this to point to a user nix.conf
// we always open/append to it to be able to append the extra-substituter for github-actions cache
// but we don't write to it for initializing flakehub_cache unless dnixd is unavailable
if let Some(parent) = Path::new(&nix_conf_path).parent() {
create_dir_all(parent).with_context(|| "Creating parent directories of nix.conf")?;
}
let mut nix_conf = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&nix_conf_path)
.with_context(|| "Creating nix.conf")?;
// always enable fallback, first
nix_conf
.write_all(b"fallback = true\n")
.with_context(|| "Setting fallback in nix.conf")?;
let store = Arc::new(NixStore::connect()?);
let narinfo_negative_cache = Arc::new(RwLock::new(HashSet::new()));
let flakehub_auth_method: Option<FlakeHubAuthSource> = match (
args.flakehub_preference(),
&args.flakehub_api_server_netrc,
dnixd_available,
) {
// User has explicitly pyassed --use-flakehub=disabled, so just straight up don't
(CacheTrinary::Disabled, _, _) => {
tracing::info!("Disabling FlakeHub cache.");
None
}
// User has no preference, did not pass a netrc, and determinate-nixd is not available
(CacheTrinary::NoPreference, None, Dnixd::Missing) => None,
// Use it when determinate-nixd is available, and let the user know what's going on
(pref, user_netrc_path, Dnixd::Available) => {
if pref == CacheTrinary::NoPreference {
tracing::info!("Enabling FlakeHub cache because determinate-nixd is available.");
}
if user_netrc_path.is_some() {
tracing::info!("Ignoring the user-specified --flakehub-api-server-netrc, in favor of the determinate-nixd netrc");
}
Some(FlakeHubAuthSource::DeterminateNixd)
}
// When determinate-nixd is not available, but the user specified a netrc
(_, Some(path), Dnixd::Missing) => {
if path.exists() {
Some(FlakeHubAuthSource::Netrc(path.to_owned()))
} else {
tracing::debug!(path = %path.display(), "User-provided netrc does not exist");
None
}
}
// User explicitly turned on flakehub cache, but we have no netrc and determinate-nixd is not present
(CacheTrinary::Enabled, None, Dnixd::Missing) => {
return Err(anyhow!(
"--flakehub-api-server-netrc is required when determinate-nixd is unavailable"
));
}
};
let flakehub_state = if let Some(auth_method) = flakehub_auth_method {
let flakehub_cache_server = &args.flakehub_cache_server;
let flakehub_api_server = &args.flakehub_api_server;
let flakehub_flake_name = &args.flakehub_flake_name;
match flakehub::init_cache(
environment,
flakehub_api_server,
flakehub_cache_server,
flakehub_flake_name,
store.clone(),
&auth_method,
)
.await
{
Ok(state) => {
if let FlakeHubAuthSource::Netrc(ref path) = auth_method {
nix_conf
.write_all(
format!(
"extra-substituters = {}?trusted=1\nnetrc-file = {}\n",
&flakehub_cache_server,
path.display()
)
.as_bytes(),
)
.with_context(|| "Writing to nix.conf")?;
}
tracing::info!("FlakeHub cache is enabled.");
Some(state)
}
Err(err) => {
tracing::error!("FlakeHub cache initialization failed: {}. Unable to authenticate to FlakeHub. Individuals must register at FlakeHub.com; Organizations must create an organization at FlakeHub.com.", err);
println!("::error title={{FlakeHub: Unauthenticated}}::{{Unable to authenticate to FlakeHub. Individuals must register at FlakeHub.com; Organizations must create an organization at FlakeHub.com.}}");
None
}
}
serde_json::from_slice(&bytes).expect("Failed to deserialize credentials file")
} else {
tracing::info!("FlakeHub cache is disabled.");
None
};
let gha_cache = if (args.github_cache_preference() == CacheTrinary::Enabled)
|| (args.github_cache_preference() == CacheTrinary::NoPreference
&& flakehub_state.is_none())
{
tracing::info!("Loading credentials from environment");
let credentials = Credentials::load_from_env()
.with_context(|| "Failed to load credentials from environment (see README.md)")?;
let gha_cache = gha::GhaCache::new(
credentials,
args.cache_version,
store.clone(),
metrics.clone(),
narinfo_negative_cache.clone(),
)
.with_context(|| "Failed to initialize GitHub Actions Cache API")?;
nix_conf
.write_all(format!("extra-substituters = http://{}?trusted=1&compression=zstd&parallel-compression=true&priority=1\n", args.listen).as_bytes())
.with_context(|| "Writing to nix.conf")?;
tracing::info!("Native GitHub Action cache is enabled.");
Some(gha_cache)
} else {
if environment.is_github_actions() {
tracing::info!("Native GitHub Action cache is disabled.");
}
None
Credentials::load_from_env()
.expect("Failed to load credentials from environment (see README.md)")
};
let diagnostic_endpoint = match args.diagnostic_endpoint.as_str() {
@ -416,30 +137,23 @@ async fn main_cli() -> Result<()> {
url => Some(url),
};
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let mut api = Api::new(credentials).expect("Failed to initialize GitHub Actions Cache API");
let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new()));
let state = Arc::new(StateInner {
gha_cache,
upstream: args.upstream.clone(),
shutdown_sender: Mutex::new(Some(shutdown_sender)),
narinfo_negative_cache,
metrics,
store,
flakehub_state: RwLock::new(flakehub_state),
logfile: guard.logfile,
original_paths,
});
if dnixd_available == Dnixd::Available {
tracing::info!("Subscribing to Determinate Nixd build events.");
crate::pbh::subscribe_uds_post_build_hook(dnixd_uds_socket_path, state.clone()).await?;
} else {
tracing::info!("Patching nix.conf to use a post-build-hook.");
crate::pbh::setup_legacy_post_build_hook(&args.listen, &mut nix_conf).await?;
if let Some(cache_version) = &args.cache_version {
api.mutate_version(cache_version.as_bytes());
}
drop(nix_conf);
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let state = Arc::new(StateInner {
api,
upstream: args.upstream.clone(),
shutdown_sender: Mutex::new(Some(shutdown_sender)),
original_paths: Mutex::new(HashSet::new()),
narinfo_nagative_cache: RwLock::new(HashSet::new()),
self_endpoint: args.listen.to_owned(),
metrics: telemetry::TelemetryReport::new(),
});
let app = Router::new()
.route("/", get(root))
@ -453,180 +167,60 @@ async fn main_cli() -> Result<()> {
let app = app.layer(Extension(state.clone()));
tracing::info!("Listening on {}", args.listen);
if args.daemon_dir.is_some() {
let dir = args.daemon_dir.as_ref().unwrap();
let logfile: OwnedFd = File::create(dir.join("daemon.log")).unwrap().into();
let daemon = Daemonize::new()
.pid_file(dir.join("daemon.pid"))
.stdout(File::from(logfile.try_clone().unwrap()))
.stderr(File::from(logfile));
// Notify of startup via HTTP
if let Some(startup_notification_url) = args.startup_notification_url {
tracing::debug!("Startup notification via HTTP POST to {startup_notification_url}");
tracing::info!("Forking into the background");
daemon.start().expect("Failed to fork into the background");
}
let response = reqwest::Client::new()
.post(startup_notification_url)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body("{}")
.send()
let rt = Runtime::new().unwrap();
rt.block_on(async move {
tracing::info!("Listening on {}", args.listen);
let ret = axum::Server::bind(&args.listen)
.serve(app.into_make_service())
.with_graceful_shutdown(async move {
shutdown_receiver.await.ok();
tracing::info!("Shutting down");
})
.await;
match response {
Ok(response) => {
if !response.status().is_success() {
Err(anyhow!(
"Startup notification returned an error: {}\n{}",
response.status(),
response
.text()
.await
.unwrap_or_else(|_| "<no response text>".to_owned())
))?;
}
}
err @ Err(_) => {
err.with_context(|| "Startup notification failed")?;
}
if let Some(diagnostic_endpoint) = diagnostic_endpoint {
state.metrics.send(diagnostic_endpoint);
}
}
// Notify of startup by writing "1" to the specified file
if let Some(startup_notification_file_path) = args.startup_notification_file {
let file_contents: &[u8] = b"1";
tracing::debug!("Startup notification via file at {startup_notification_file_path:?}");
if let Some(parent_dir) = startup_notification_file_path.parent() {
tokio::fs::create_dir_all(parent_dir)
.await
.with_context(|| {
format!(
"failed to create parent directory for startup notification file path: {}",
startup_notification_file_path.display()
)
})?;
}
let mut notification_file = File::create(&startup_notification_file_path)
.await
.with_context(|| {
format!(
"failed to create startup notification file to path: {}",
startup_notification_file_path.display()
)
})?;
notification_file
.write_all(file_contents)
.await
.with_context(|| {
format!(
"failed to write startup notification file to path: {}",
startup_notification_file_path.display()
)
})?;
tracing::debug!("Created startup notification file at {startup_notification_file_path:?}");
}
let listener = tokio::net::TcpListener::bind(&args.listen).await?;
let ret = axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(async move {
shutdown_receiver.await.ok();
tracing::info!("Shutting down");
})
.await;
// Notify diagnostics endpoint
if let Some(diagnostic_endpoint) = diagnostic_endpoint {
state.metrics.send(diagnostic_endpoint).await;
}
ret?;
Ok(())
ret.unwrap()
});
}
#[tokio::main]
async fn main() -> Result<()> {
match std::env::var("OUT_PATHS") {
Ok(out_paths) => pbh::handle_legacy_post_build_hook(&out_paths).await,
Err(_) => main_cli().await,
}
}
pub(crate) fn debug_logfile() -> PathBuf {
std::env::temp_dir().join("magic-nix-cache-tracing.log")
}
pub struct LogGuard {
appender_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
logfile: Option<PathBuf>,
}
fn init_logging() -> Result<LogGuard> {
fn init_logging() {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
#[cfg(debug_assertions)]
return EnvFilter::new("info")
.add_directive(
"magic_nix_cache=debug"
.parse()
.expect("failed to parse magix_nix_cache directive"),
)
.add_directive(
"gha_cache=debug"
.parse()
.expect("failed to parse gha_cahce directive"),
);
.add_directive("magic_nix_cache=debug".parse().unwrap())
.add_directive("gha_cache=debug".parse().unwrap());
#[cfg(not(debug_assertions))]
return EnvFilter::new("info");
});
let stderr_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.pretty();
let (guard, file_layer) = match std::env::var("RUNNER_DEBUG") {
Ok(val) if val == "1" => {
let logfile = debug_logfile();
let file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&logfile)?;
let (nonblocking, guard) = tracing_appender::non_blocking(file);
let file_layer = tracing_subscriber::fmt::layer()
.with_writer(nonblocking)
.pretty();
(
LogGuard {
appender_guard: Some(guard),
logfile: Some(logfile),
},
Some(file_layer),
)
}
_ => (
LogGuard {
appender_guard: None,
logfile: None,
},
None,
),
};
tracing_subscriber::registry()
.with(filter)
.with(stderr_layer)
.with(file_layer)
tracing_subscriber::fmt()
.pretty()
.with_env_filter(filter)
.init();
Ok(guard)
}
#[cfg(debug_assertions)]
async fn dump_api_stats(
async fn dump_api_stats<B>(
Extension(state): Extension<State>,
request: axum::http::Request<axum::body::Body>,
next: axum::middleware::Next,
request: axum::http::Request<B>,
next: axum::middleware::Next<B>,
) -> axum::response::Response {
if let Some(gha_cache) = &state.gha_cache {
gha_cache.api.dump_stats();
}
state.api.dump_stats();
next.run(request).await
}

View file

@ -1,241 +0,0 @@
use std::io::Write as _;
use std::net::SocketAddr;
use std::os::unix::fs::PermissionsExt as _;
use std::path::PathBuf;
use anyhow::anyhow;
use anyhow::Context as _;
use anyhow::Result;
use clap::Parser;
use futures::StreamExt as _;
use http_body_util::BodyExt as _;
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioIo;
use tempfile::NamedTempFile;
use tokio::net::UnixStream;
use tokio::process::Command;
use crate::BuiltPathResponseEventV1;
use crate::State;
pub async fn subscribe_uds_post_build_hook(
dnixd_uds_socket_path: PathBuf,
state: State,
) -> Result<()> {
tokio::spawn(async move {
let dnixd_uds_socket_path = &dnixd_uds_socket_path;
loop {
let Ok(socket_conn) = UnixStream::connect(dnixd_uds_socket_path).await else {
tracing::error!("built-paths: failed to connect to determinate-nixd's socket");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
continue;
};
let stream = TokioIo::new(socket_conn);
let executor: TokioExecutor = TokioExecutor::new();
let sender_conn = hyper::client::conn::http2::handshake(executor, stream).await;
let Ok((mut sender, conn)) = sender_conn else {
tracing::error!("built-paths: failed to http2 handshake");
continue;
};
// NOTE(colemickens): for now we just drop the joinhandle and let it keep running
let _join_handle = tokio::task::spawn(async move {
if let Err(err) = conn.await {
tracing::error!("Connection failed: {:?}", err);
}
});
let request = http::Request::builder()
.method(http::Method::GET)
.uri("http://localhost/events")
.body(axum::body::Body::empty());
let Ok(request) = request else {
tracing::error!("built-paths: failed to create request to subscribe");
continue;
};
let response = sender.send_request(request).await;
let response = match response {
Ok(r) => r,
Err(e) => {
tracing::error!("buit-paths: failed to send subscription request: {:?}", e);
continue;
}
};
let mut data = response.into_data_stream();
while let Some(event_str) = data.next().await {
let event_str = match event_str {
Ok(event) => event,
Err(e) => {
tracing::error!("built-paths: error while receiving: {}", e);
break;
}
};
let Some(event_str) = event_str.strip_prefix("data: ".as_bytes()) else {
tracing::debug!("built-paths subscription: ignoring non-data frame");
continue;
};
let Ok(event): core::result::Result<BuiltPathResponseEventV1, _> =
serde_json::from_slice(event_str)
else {
tracing::error!(
"failed to decode built-path response as BuiltPathResponseEventV1"
);
continue;
};
let maybe_store_paths = event
.outputs
.iter()
.map(|path| {
state
.store
.follow_store_path(path)
.map_err(|_| anyhow!("failed to collect store paths"))
})
.collect::<Result<Vec<_>>>();
let Ok(store_paths) = maybe_store_paths else {
tracing::error!(
"built-paths: encountered an error aggregating build store paths"
);
continue;
};
tracing::debug!("about to enqueue paths: {:?}", store_paths);
if let Err(e) = crate::api::enqueue_paths(&state, store_paths).await {
tracing::error!(
"built-paths: failed to enqueue paths for drv ({}): {}",
event.drv.display(),
e
);
continue;
}
}
}
});
Ok(())
}
pub async fn setup_legacy_post_build_hook(
listen: &SocketAddr,
nix_conf: &mut std::fs::File,
) -> Result<()> {
/* Write the post-build hook script. Note that the shell script
* ignores errors, to avoid the Nix build from failing. */
let post_build_hook_script = {
let mut file = NamedTempFile::with_prefix("magic-nix-cache-build-hook-")
.with_context(|| "Creating a temporary file for the post-build hook")?;
file.write_all(
format!(
// NOTE(cole-h): We want to exit 0 even if the hook failed, otherwise it'll fail the
// build itself
"#! /bin/sh\nRUST_LOG=trace RUST_BACKTRACE=full {} --server {} || :\n",
std::env::current_exe()
.with_context(|| "Getting the path of magic-nix-cache")?
.display(),
listen
)
.as_bytes(),
)
.with_context(|| "Writing the post-build hook")?;
let path = file
.keep()
.with_context(|| "Keeping the post-build hook")?
.1;
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755))
.with_context(|| "Setting permissions on the post-build hook")?;
/* Copy the script to the Nix store so we know for sure that
* it's accessible to the Nix daemon, which might have a
* different /tmp from us. */
let res = Command::new("nix")
.args([
"--extra-experimental-features",
"nix-command",
"store",
"add-path",
&path.display().to_string(),
])
.output()
.await
.with_context(|| {
format!(
"Running nix to add the post-build-hook to the store from {}",
path.display()
)
})?;
if res.status.success() {
tokio::fs::remove_file(&path).await.with_context(|| {
format!(
"Cleaning up the temporary post-build-hook at {}",
path.display()
)
})?;
PathBuf::from(String::from_utf8_lossy(&res.stdout).trim())
} else {
path
}
};
/* Update nix.conf. */
nix_conf
.write_all(format!("post-build-hook = {}\n", post_build_hook_script.display()).as_bytes())
.with_context(|| "Writing to nix.conf")?;
Ok(())
}
pub async fn handle_legacy_post_build_hook(out_paths: &str) -> Result<()> {
#[derive(Parser, Debug)]
struct Args {
/// `magic-nix-cache` daemon to connect to.
#[arg(short = 'l', long, default_value = "127.0.0.1:3000")]
server: SocketAddr,
}
let args = Args::parse();
let store_paths: Vec<_> = out_paths
.split_whitespace()
.map(|s| s.trim().to_owned())
.collect();
let request = crate::api::EnqueuePathsRequest { store_paths };
let response = reqwest::Client::new()
.post(format!("http://{}/api/enqueue-paths", &args.server))
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(
serde_json::to_string(&request)
.with_context(|| "Decoding the response from the magic-nix-cache server")?,
)
.send()
.await;
match response {
Ok(response) if !response.status().is_success() => Err(anyhow!(
"magic-nix-cache server failed to enqueue the push request: {}\n{}",
response.status(),
response
.text()
.await
.unwrap_or_else(|_| "<no response text>".to_owned()),
))?,
Ok(response) => response
.json::<crate::api::EnqueuePathsResponse>()
.await
.with_context(|| "magic-nix-cache-server didn't return a valid response")?,
Err(err) => {
Err(err).with_context(|| "magic-nix-cache server failed to send the enqueue request")?
}
};
Ok(())
}

View file

@ -1,6 +1,7 @@
use std::env;
use std::time::SystemTime;
use is_ci;
use sha2::{Digest, Sha256};
/// A telemetry report to measure the effectiveness of the Magic Nix Cache
@ -28,18 +29,16 @@ pub struct TelemetryReport {
pub num_original_paths: Metric,
pub num_final_paths: Metric,
pub num_new_paths: Metric,
pub tripped_429: std::sync::atomic::AtomicBool,
}
#[derive(Debug, Default, serde::Serialize)]
pub struct Metric(std::sync::atomic::AtomicUsize);
impl Metric {
pub fn incr(&self) {
pub fn incr(&self) -> () {
self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn set(&self, val: usize) {
pub fn set(&self, val: usize) -> () {
self.0.store(val, std::sync::atomic::Ordering::Relaxed);
}
}
@ -47,9 +46,7 @@ impl Metric {
impl TelemetryReport {
pub fn new() -> TelemetryReport {
TelemetryReport {
distinct_id: env::var("DETSYS_CORRELATION")
.ok()
.or_else(|| calculate_opaque_id().ok()),
distinct_id: calculate_opaque_id().ok(),
version: env!("CARGO_PKG_VERSION").to_string(),
is_ci: is_ci::cached(),
@ -60,7 +57,7 @@ impl TelemetryReport {
}
}
pub async fn send(&self, endpoint: &str) {
pub fn send(&self, endpoint: &str) {
if let Some(start_time) = self.start_time {
self.elapsed_seconds.set(
SystemTime::now()
@ -73,13 +70,12 @@ impl TelemetryReport {
}
if let Ok(serialized) = serde_json::to_string_pretty(&self) {
let _ = reqwest::Client::new()
let _ = reqwest::blocking::Client::new()
.post(endpoint)
.body(serialized)
.header("Content-Type", "application/json")
.timeout(std::time::Duration::from_millis(3000))
.send()
.await;
.send();
}
}
}

View file

@ -3,36 +3,23 @@
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use attic::nix_store::NixStore;
use tokio::{fs, process::Command};
use crate::error::Result;
use crate::error::{Error, Result};
/// Returns the list of store paths that are currently present.
pub async fn get_store_paths(store: &NixStore) -> Result<HashSet<PathBuf>> {
// FIXME: use the Nix API.
let store_dir = store.store_dir();
let mut listing = tokio::fs::read_dir(store_dir).await.map_err(|e| {
crate::error::Error::Io(
e,
format!("Enumerating store paths in {}", store_dir.display()),
)
})?;
pub async fn get_store_paths() -> Result<HashSet<PathBuf>> {
let store_dir = Path::new("/nix/store");
let mut listing = fs::read_dir(store_dir).await?;
let mut paths = HashSet::new();
while let Some(entry) = listing.next_entry().await.map_err(|e| {
crate::error::Error::Io(
e,
format!("Reading existing store paths from {}", store_dir.display()),
)
})? {
while let Some(entry) = listing.next_entry().await? {
let file_name = entry.file_name();
let file_name = Path::new(&file_name);
if let Some(extension) = file_name.extension() {
match extension.to_str() {
None | Some("drv") | Some("chroot") => {
tracing::debug!(
"skipping file with weird or uninteresting extension {extension:?}"
);
None | Some("drv") | Some("lock") => {
// Malformed or not interesting
continue;
}
_ => {}
@ -40,8 +27,13 @@ pub async fn get_store_paths(store: &NixStore) -> Result<HashSet<PathBuf>> {
}
if let Some(s) = file_name.to_str() {
// Let's not push any sources
if s.ends_with("-source") {
continue;
}
// Special paths (so far only `.links`)
if s == ".links" {
if s.starts_with('.') {
continue;
}
}
@ -50,3 +42,44 @@ pub async fn get_store_paths(store: &NixStore) -> Result<HashSet<PathBuf>> {
}
Ok(paths)
}
/// Uploads a list of store paths to a store URI.
pub async fn upload_paths(mut paths: Vec<PathBuf>, store_uri: &str) -> Result<()> {
// When the daemon started Nix may not have been installed
let env_path = Command::new("sh")
.args(["-lc", "echo $PATH"])
.output()
.await?
.stdout;
let env_path = String::from_utf8(env_path).expect("PATH contains invalid UTF-8");
while !paths.is_empty() {
let mut batch = Vec::new();
let mut total_len = 0;
while !paths.is_empty() && total_len < 1024 * 1024 {
let p = paths.pop().unwrap();
total_len += p.as_os_str().len() + 1;
batch.push(p);
}
tracing::debug!("{} paths in this batch", batch.len());
let status = Command::new("nix")
.args(["--extra-experimental-features", "nix-command"])
.args(["copy", "--to", store_uri])
.args(&batch)
.env("PATH", &env_path)
.status()
.await?;
if status.success() {
tracing::debug!("Uploaded batch");
} else {
tracing::error!("Failed to upload batch: {:?}", status);
return Err(Error::FailedToUpload);
}
}
Ok(())
}