mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-01 09:32:08 +00:00
.github,cmd/cigocacher: use cigocacher for windows
Implements a new disk put function for cigocacher that does not cause locking issues on Windows when there are multiple processes reading and writing the same files concurrently. Integrates cigocacher into test.yml for Windows where we are running on larger runners that support connecting to private Azure vnet resources where cigocached is hosted. Updates tailscale/corp#10808 Change-Id: I0d0e9b670e49e0f9abf01ff3d605cd660dd85ebb Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
50
.github/actions/go-cache/action.sh
vendored
Executable file
50
.github/actions/go-cache/action.sh
vendored
Executable file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# This script sets up cigocacher, but should never fail the build if unsuccessful.
|
||||
# It expects to run on a GitHub-hosted runner, and connects to cigocached over a
|
||||
# private Azure network that is configured at the runner group level in GitHub.
|
||||
#
|
||||
# Usage: ./action.sh
|
||||
# Inputs:
|
||||
# URL: The cigocached server URL.
|
||||
# Outputs:
|
||||
# success: Whether cigocacher was set up successfully.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
if [ -z "${GITHUB_ACTIONS:-}" ]; then
|
||||
echo "This script is intended to run within GitHub Actions"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ -z "$URL" ]; then
|
||||
echo "No cigocached URL is set, skipping cigocacher setup"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
JWT="$(curl -sSL -H "Authorization: Bearer $ACTIONS_ID_TOKEN_REQUEST_TOKEN" "${ACTIONS_ID_TOKEN_REQUEST_URL}&audience=gocached" | jq -r .value)"
|
||||
# cigocached serves a TLS cert with an FQDN, but DNS is based on VM name.
|
||||
HOST_AND_PORT="${URL#http*://}"
|
||||
FIRST_LABEL="${HOST_AND_PORT/.*/}"
|
||||
BODY="$(jq -n --arg jwt "$JWT" '{"jwt": $jwt}')"
|
||||
CIGOCACHER_TOKEN="$(curl -sSL --connect-to "$HOST_AND_PORT:$FIRST_LABEL:" -H "Content-Type: application/json" "$URL/auth/exchange-token" -d "$BODY" | jq -r .access_token)"
|
||||
if [ -z "$CIGOCACHER_TOKEN" ]; then
|
||||
echo "Failed token exchange with cigocached, skipping cigocacher setup"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Wait until we successfully auth before building cigocacher to ensure we know
|
||||
# it's worth building.
|
||||
# TODO(tomhjp): bake cigocacher into runner image and use it for auth.
|
||||
echo "Fetched cigocacher token successfully"
|
||||
echo "::add-mask::${CIGOCACHER_TOKEN}"
|
||||
|
||||
BIN_NAME="cigocacher"
|
||||
if [[ "${RUNNER_OS:-}" == "Windows" ]]; then
|
||||
BIN_NAME="cigocacher.exe"
|
||||
fi
|
||||
BIN_PATH="${RUNNER_TEMP:-/tmp}/${BIN_NAME}"
|
||||
|
||||
go build -o "${BIN_PATH}" ./cmd/cigocacher
|
||||
echo "GOCACHEPROG=${BIN_PATH} --cache-dir ${CACHE_DIR} --cigocached-url ${URL} --token ${CIGOCACHER_TOKEN}" >> "${GITHUB_ENV}"
|
||||
echo "success=true" >> "${GITHUB_OUTPUT}"
|
||||
30
.github/actions/go-cache/action.yml
vendored
Normal file
30
.github/actions/go-cache/action.yml
vendored
Normal file
@@ -0,0 +1,30 @@
|
||||
name: go-cache
|
||||
description: Set up build to use cigocacher
|
||||
|
||||
inputs:
|
||||
cigocached-url:
|
||||
description: URL of the cigocached server
|
||||
required: true
|
||||
checkout-path:
|
||||
description: Path to cloned repository
|
||||
required: true
|
||||
cache-dir:
|
||||
description: Directory to use for caching
|
||||
required: true
|
||||
|
||||
outputs:
|
||||
success:
|
||||
description: Whether cigocacher was set up successfully
|
||||
value: ${{ steps.setup.outputs.success }}
|
||||
|
||||
runs:
|
||||
using: composite
|
||||
steps:
|
||||
- name: Setup cigocacher
|
||||
id: setup
|
||||
shell: bash
|
||||
env:
|
||||
URL: ${{ inputs.cigocached-url }}
|
||||
CACHE_DIR: ${{ inputs.cache-dir }}
|
||||
working-directory: ${{ inputs.checkout-path }}
|
||||
run: .github/actions/go-cache/action.sh
|
||||
60
.github/workflows/test.yml
vendored
60
.github/workflows/test.yml
vendored
@@ -2,6 +2,10 @@
|
||||
# both PRs and merged commits, and for the latter reports failures to slack.
|
||||
name: CI
|
||||
|
||||
permissions:
|
||||
id-token: write # This is required for requesting the JWT
|
||||
contents: read # This is required for actions/checkout
|
||||
|
||||
env:
|
||||
# Our fuzz job, powered by OSS-Fuzz, fails periodically because we upgrade to
|
||||
# new Go versions very eagerly. OSS-Fuzz is a little more conservative, and
|
||||
@@ -211,7 +215,7 @@ jobs:
|
||||
# windows-8vpu is a 2022 GitHub-managed runner in our
|
||||
# org with 8 cores and 32 GB of RAM:
|
||||
# https://github.com/organizations/tailscale/settings/actions/github-hosted-runners/1
|
||||
runs-on: windows-8vcpu
|
||||
runs-on: ci-windows-github-1
|
||||
needs: gomod-cache
|
||||
name: Windows (${{ matrix.name || matrix.shard}})
|
||||
strategy:
|
||||
@@ -220,8 +224,6 @@ jobs:
|
||||
include:
|
||||
- key: "win-bench"
|
||||
name: "benchmarks"
|
||||
- key: "win-tool-go"
|
||||
name: "./tool/go"
|
||||
- key: "win-shard-1-2"
|
||||
shard: "1/2"
|
||||
- key: "win-shard-2-2"
|
||||
@@ -230,44 +232,31 @@ jobs:
|
||||
- name: checkout
|
||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
path: src
|
||||
path: ${{ github.workspace }}/src
|
||||
|
||||
- name: Install Go
|
||||
if: matrix.key != 'win-tool-go'
|
||||
uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0
|
||||
with:
|
||||
go-version-file: src/go.mod
|
||||
go-version-file: ${{ github.workspace }}/src/go.mod
|
||||
cache: false
|
||||
|
||||
- name: Restore Go module cache
|
||||
if: matrix.key != 'win-tool-go'
|
||||
uses: actions/cache/restore@0400d5f644dc74513175e3cd8d07132dd4860809 # v4.2.4
|
||||
with:
|
||||
path: gomodcache
|
||||
key: ${{ needs.gomod-cache.outputs.cache-key }}
|
||||
enableCrossOsArchive: true
|
||||
|
||||
- name: Restore Cache
|
||||
if: matrix.key != 'win-tool-go'
|
||||
uses: actions/cache@0400d5f644dc74513175e3cd8d07132dd4860809 # v4.2.4
|
||||
- name: Setup cigocacher
|
||||
id: cigocacher-setup
|
||||
uses: ./src/.github/actions/go-cache
|
||||
with:
|
||||
path: |
|
||||
~/.cache/go-build
|
||||
~\AppData\Local\go-build
|
||||
# The -2- here should be incremented when the scheme of data to be
|
||||
# cached changes (e.g. path above changes).
|
||||
key: ${{ github.job }}-${{ matrix.key }}-go-2-${{ hashFiles('**/go.sum') }}-${{ github.run_id }}
|
||||
restore-keys: |
|
||||
${{ github.job }}-${{ matrix.key }}-go-2-${{ hashFiles('**/go.sum') }}
|
||||
${{ github.job }}-${{ matrix.key }}-go-2-
|
||||
|
||||
- name: test-tool-go
|
||||
if: matrix.key == 'win-tool-go'
|
||||
working-directory: src
|
||||
run: ./tool/go version
|
||||
checkout-path: ${{ github.workspace }}/src
|
||||
cache-dir: ${{ github.workspace }}/cigocacher
|
||||
cigocached-url: ${{ vars.CIGOCACHED_AZURE_URL }}
|
||||
|
||||
- name: test
|
||||
if: matrix.key != 'win-bench' && matrix.key != 'win-tool-go' # skip on bench builder
|
||||
if: matrix.key != 'win-bench' # skip on bench builder
|
||||
working-directory: src
|
||||
run: go run ./cmd/testwrapper sharded:${{ matrix.shard }}
|
||||
|
||||
@@ -280,11 +269,24 @@ jobs:
|
||||
run: go test ./... -bench . -benchtime 1x -run "^$"
|
||||
|
||||
- name: Tidy cache
|
||||
if: matrix.key != 'win-tool-go'
|
||||
working-directory: src
|
||||
shell: bash
|
||||
shell: pwsh
|
||||
run: |
|
||||
find $(go env GOCACHE) -type f -mmin +90 -delete
|
||||
Get-ChildItem -Path cigocacher -File -Recurse |
|
||||
Where-Object { $_.LastAccessTime -lt (Get-Date).AddMinutes(-90) } |
|
||||
Remove-Item -Force
|
||||
|
||||
win-tool-go:
|
||||
runs-on: windows-latest
|
||||
needs: gomod-cache
|
||||
name: Windows (win-tool-go)
|
||||
steps:
|
||||
- name: checkout
|
||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
path: src
|
||||
- name: test-tool-go
|
||||
working-directory: src
|
||||
run: ./tool/go version
|
||||
|
||||
privileged:
|
||||
needs: gomod-cache
|
||||
|
||||
@@ -37,6 +37,7 @@ func main() {
|
||||
auth = flag.Bool("auth", false, "auth with cigocached and exit, printing the access token as output")
|
||||
token = flag.String("token", "", "the cigocached access token to use, as created using --auth")
|
||||
cigocachedURL = flag.String("cigocached-url", "", "optional cigocached URL (scheme, host, and port). empty means to not use one.")
|
||||
dir = flag.String("cache-dir", "", "cache directory; empty means automatic")
|
||||
verbose = flag.Bool("verbose", false, "enable verbose logging")
|
||||
)
|
||||
flag.Parse()
|
||||
@@ -55,22 +56,29 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
d, err := os.UserCacheDir()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
if *dir == "" {
|
||||
d, err := os.UserCacheDir()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
*dir = filepath.Join(d, "go-cacher")
|
||||
log.Printf("Defaulting to cache dir %v ...", *dir)
|
||||
}
|
||||
d = filepath.Join(d, "go-cacher")
|
||||
log.Printf("Defaulting to cache dir %v ...", d)
|
||||
if err := os.MkdirAll(d, 0750); err != nil {
|
||||
if err := os.MkdirAll(*dir, 0750); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
c := &cigocacher{
|
||||
disk: &cachers.DiskCache{Dir: d},
|
||||
disk: &cachers.DiskCache{
|
||||
Dir: *dir,
|
||||
Verbose: *verbose,
|
||||
},
|
||||
verbose: *verbose,
|
||||
}
|
||||
if *cigocachedURL != "" {
|
||||
log.Printf("Using cigocached at %s", *cigocachedURL)
|
||||
if *verbose {
|
||||
log.Printf("Using cigocached at %s", *cigocachedURL)
|
||||
}
|
||||
c.gocached = &gocachedClient{
|
||||
baseURL: *cigocachedURL,
|
||||
cl: httpClient(),
|
||||
@@ -81,8 +89,10 @@ func main() {
|
||||
var p *cacheproc.Process
|
||||
p = &cacheproc.Process{
|
||||
Close: func() error {
|
||||
log.Printf("gocacheprog: closing; %d gets (%d hits, %d misses, %d errors); %d puts (%d errors)",
|
||||
p.Gets.Load(), p.GetHits.Load(), p.GetMisses.Load(), p.GetErrors.Load(), p.Puts.Load(), p.PutErrors.Load())
|
||||
if c.verbose {
|
||||
log.Printf("gocacheprog: closing; %d gets (%d hits, %d misses, %d errors); %d puts (%d errors)",
|
||||
p.Gets.Load(), p.GetHits.Load(), p.GetMisses.Load(), p.GetErrors.Load(), p.Puts.Load(), p.PutErrors.Load())
|
||||
}
|
||||
return c.close()
|
||||
},
|
||||
Get: c.get,
|
||||
@@ -164,11 +174,7 @@ func (c *cigocacher) get(ctx context.Context, actionID string) (outputID, diskPa
|
||||
|
||||
defer res.Body.Close()
|
||||
|
||||
// TODO(tomhjp): make sure we timeout if cigocached disappears, but for some
|
||||
// reason, this seemed to tank network performance.
|
||||
// ctx, cancel := context.WithTimeout(ctx, httpTimeout(res.ContentLength))
|
||||
// defer cancel()
|
||||
diskPath, err = c.disk.Put(ctx, actionID, outputID, res.ContentLength, res.Body)
|
||||
diskPath, err = put(c.disk, actionID, outputID, res.ContentLength, res.Body)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("error filling disk cache from HTTP: %w", err)
|
||||
}
|
||||
@@ -184,7 +190,7 @@ func (c *cigocacher) put(ctx context.Context, actionID, outputID string, size in
|
||||
c.putNanos.Add(time.Since(t0).Nanoseconds())
|
||||
}()
|
||||
if c.gocached == nil {
|
||||
return c.disk.Put(ctx, actionID, outputID, size, r)
|
||||
return put(c.disk, actionID, outputID, size, r)
|
||||
}
|
||||
|
||||
c.putHTTP.Add(1)
|
||||
@@ -206,10 +212,6 @@ func (c *cigocacher) put(ctx context.Context, actionID, outputID string, size in
|
||||
}
|
||||
httpErrCh := make(chan error)
|
||||
go func() {
|
||||
// TODO(tomhjp): make sure we timeout if cigocached disappears, but for some
|
||||
// reason, this seemed to tank network performance.
|
||||
// ctx, cancel := context.WithTimeout(ctx, httpTimeout(size))
|
||||
// defer cancel()
|
||||
t0HTTP := time.Now()
|
||||
defer func() {
|
||||
c.putHTTPNanos.Add(time.Since(t0HTTP).Nanoseconds())
|
||||
@@ -217,7 +219,7 @@ func (c *cigocacher) put(ctx context.Context, actionID, outputID string, size in
|
||||
httpErrCh <- c.gocached.put(ctx, actionID, outputID, size, httpReader)
|
||||
}()
|
||||
|
||||
diskPath, err = c.disk.Put(ctx, actionID, outputID, size, diskReader)
|
||||
diskPath, err = put(c.disk, actionID, outputID, size, diskReader)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error writing to disk cache: %w", errors.Join(err, tee.err))
|
||||
}
|
||||
@@ -236,13 +238,14 @@ func (c *cigocacher) put(ctx context.Context, actionID, outputID string, size in
|
||||
}
|
||||
|
||||
func (c *cigocacher) close() error {
|
||||
log.Printf("cigocacher HTTP stats: %d gets (%.1fMiB, %.2fs, %d hits, %d misses, %d errors ignored); %d puts (%.1fMiB, %.2fs, %d errors ignored)",
|
||||
c.getHTTP.Load(), float64(c.getHTTPBytes.Load())/float64(1<<20), float64(c.getHTTPNanos.Load())/float64(time.Second), c.getHTTPHits.Load(), c.getHTTPMisses.Load(), c.getHTTPErrors.Load(),
|
||||
c.putHTTP.Load(), float64(c.putHTTPBytes.Load())/float64(1<<20), float64(c.putHTTPNanos.Load())/float64(time.Second), c.putHTTPErrors.Load())
|
||||
if !c.verbose || c.gocached == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("cigocacher HTTP stats: %d gets (%.1fMiB, %.2fs, %d hits, %d misses, %d errors ignored); %d puts (%.1fMiB, %.2fs, %d errors ignored)",
|
||||
c.getHTTP.Load(), float64(c.getHTTPBytes.Load())/float64(1<<20), float64(c.getHTTPNanos.Load())/float64(time.Second), c.getHTTPHits.Load(), c.getHTTPMisses.Load(), c.getHTTPErrors.Load(),
|
||||
c.putHTTP.Load(), float64(c.putHTTPBytes.Load())/float64(1<<20), float64(c.putHTTPNanos.Load())/float64(time.Second), c.putHTTPErrors.Load())
|
||||
|
||||
stats, err := c.gocached.fetchStats()
|
||||
if err != nil {
|
||||
log.Printf("error fetching gocached stats: %v", err)
|
||||
|
||||
88
cmd/cigocacher/disk.go
Normal file
88
cmd/cigocacher/disk.go
Normal file
@@ -0,0 +1,88 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/bradfitz/go-tool-cache/cachers"
|
||||
)
|
||||
|
||||
// indexEntry is the metadata that DiskCache stores on disk for an ActionID.
|
||||
type indexEntry struct {
|
||||
Version int `json:"v"`
|
||||
OutputID string `json:"o"`
|
||||
Size int64 `json:"n"`
|
||||
TimeNanos int64 `json:"t"`
|
||||
}
|
||||
|
||||
func validHex(x string) bool {
|
||||
if len(x) < 4 || len(x) > 100 {
|
||||
return false
|
||||
}
|
||||
for _, b := range x {
|
||||
if b >= '0' && b <= '9' || b >= 'a' && b <= 'f' {
|
||||
continue
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// put is like dc.Put but refactored to support safe concurrent writes on Windows.
|
||||
// TODO(tomhjp): upstream these changes to go-tool-cache once they look stable.
|
||||
func put(dc *cachers.DiskCache, actionID, outputID string, size int64, body io.Reader) (diskPath string, _ error) {
|
||||
if len(actionID) < 4 || len(outputID) < 4 {
|
||||
return "", fmt.Errorf("actionID and outputID must be at least 4 characters long")
|
||||
}
|
||||
if !validHex(actionID) {
|
||||
log.Printf("diskcache: got invalid actionID %q", actionID)
|
||||
return "", errors.New("actionID must be hex")
|
||||
}
|
||||
if !validHex(outputID) {
|
||||
log.Printf("diskcache: got invalid outputID %q", outputID)
|
||||
return "", errors.New("outputID must be hex")
|
||||
}
|
||||
|
||||
actionFile := dc.ActionFilename(actionID)
|
||||
outputFile := dc.OutputFilename(outputID)
|
||||
actionDir := filepath.Dir(actionFile)
|
||||
outputDir := filepath.Dir(outputFile)
|
||||
|
||||
if err := os.MkdirAll(actionDir, 0755); err != nil {
|
||||
return "", fmt.Errorf("failed to create action directory: %w", err)
|
||||
}
|
||||
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
||||
return "", fmt.Errorf("failed to create output directory: %w", err)
|
||||
}
|
||||
|
||||
wrote, err := writeOutputFile(outputFile, body, size, outputID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if wrote != size {
|
||||
return "", fmt.Errorf("wrote %d bytes, expected %d", wrote, size)
|
||||
}
|
||||
|
||||
ij, err := json.Marshal(indexEntry{
|
||||
Version: 1,
|
||||
OutputID: outputID,
|
||||
Size: size,
|
||||
TimeNanos: time.Now().UnixNano(),
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := writeActionFile(dc.ActionFilename(actionID), ij); err != nil {
|
||||
return "", fmt.Errorf("atomic write failed: %w", err)
|
||||
}
|
||||
return outputFile, nil
|
||||
}
|
||||
44
cmd/cigocacher/disk_notwindows.go
Normal file
44
cmd/cigocacher/disk_notwindows.go
Normal file
@@ -0,0 +1,44 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !windows
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
func writeActionFile(dest string, b []byte) error {
|
||||
_, err := writeAtomic(dest, bytes.NewReader(b))
|
||||
return err
|
||||
}
|
||||
|
||||
func writeOutputFile(dest string, r io.Reader, _ int64, _ string) (int64, error) {
|
||||
return writeAtomic(dest, r)
|
||||
}
|
||||
|
||||
func writeAtomic(dest string, r io.Reader) (int64, error) {
|
||||
tf, err := os.CreateTemp(filepath.Dir(dest), filepath.Base(dest)+".*")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
size, err := io.Copy(tf, r)
|
||||
if err != nil {
|
||||
tf.Close()
|
||||
os.Remove(tf.Name())
|
||||
return 0, err
|
||||
}
|
||||
if err := tf.Close(); err != nil {
|
||||
os.Remove(tf.Name())
|
||||
return 0, err
|
||||
}
|
||||
if err := os.Rename(tf.Name(), dest); err != nil {
|
||||
os.Remove(tf.Name())
|
||||
return 0, err
|
||||
}
|
||||
return size, nil
|
||||
}
|
||||
102
cmd/cigocacher/disk_windows.go
Normal file
102
cmd/cigocacher/disk_windows.go
Normal file
@@ -0,0 +1,102 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
// The functions in this file are based on go's own cache in
|
||||
// cmd/go/internal/cache/cache.go, particularly putIndexEntry and copyFile.
|
||||
|
||||
// writeActionFile writes the indexEntry metadata for an ActionID to disk. It
|
||||
// may be called for the same actionID concurrently from multiple processes,
|
||||
// and the outputID for a specific actionID may change from time to time due
|
||||
// to non-deterministic builds. It makes a best-effort to delete the file if
|
||||
// anything goes wrong.
|
||||
func writeActionFile(dest string, b []byte) (retErr error) {
|
||||
f, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE, 0o666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
cerr := f.Close()
|
||||
if retErr != nil || cerr != nil {
|
||||
retErr = errors.Join(retErr, cerr, os.Remove(dest))
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = f.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Truncate the file only *after* writing it.
|
||||
// (This should be a no-op, but truncate just in case of previous corruption.)
|
||||
//
|
||||
// This differs from os.WriteFile, which truncates to 0 *before* writing
|
||||
// via os.O_TRUNC. Truncating only after writing ensures that a second write
|
||||
// of the same content to the same file is idempotent, and does not - even
|
||||
// temporarily! - undo the effect of the first write.
|
||||
return f.Truncate(int64(len(b)))
|
||||
}
|
||||
|
||||
// writeOutputFile writes content to be cached to disk. The outputID is the
|
||||
// sha256 hash of the content, and each file should only be written ~once,
|
||||
// assuming no sha256 hash collisions. It may be written multiple times if
|
||||
// concurrent processes are both populating the same output. The file is opened
|
||||
// with FILE_SHARE_READ|FILE_SHARE_WRITE, which means both processes can write
|
||||
// the same contents concurrently without conflict.
|
||||
//
|
||||
// It makes a best effort to clean up if anything goes wrong, but the file may
|
||||
// be left in an inconsistent state in the event of disk-related errors such as
|
||||
// another process taking file locks, or power loss etc.
|
||||
func writeOutputFile(dest string, r io.Reader, size int64, outputID string) (_ int64, retErr error) {
|
||||
info, err := os.Stat(dest)
|
||||
if err == nil && info.Size() == size {
|
||||
// Already exists, check the hash.
|
||||
if f, err := os.Open(dest); err == nil {
|
||||
h := sha256.New()
|
||||
io.Copy(h, f)
|
||||
f.Close()
|
||||
if fmt.Sprintf("%x", h.Sum(nil)) == outputID {
|
||||
// Still drain the reader to ensure associated resources are released.
|
||||
return io.Copy(io.Discard, r)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Didn't successfully find the pre-existing file, write it.
|
||||
mode := os.O_WRONLY | os.O_CREATE
|
||||
if err == nil && info.Size() > size {
|
||||
mode |= os.O_TRUNC // Should never happen, but self-heal.
|
||||
}
|
||||
f, err := os.OpenFile(dest, mode, 0644)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to open output file %q: %w", dest, err)
|
||||
}
|
||||
defer func() {
|
||||
cerr := f.Close()
|
||||
if retErr != nil || cerr != nil {
|
||||
retErr = errors.Join(retErr, cerr, os.Remove(dest))
|
||||
}
|
||||
}()
|
||||
|
||||
// Copy file to f, but also into h to double-check hash.
|
||||
h := sha256.New()
|
||||
w := io.MultiWriter(f, h)
|
||||
n, err := io.Copy(w, r)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if fmt.Sprintf("%x", h.Sum(nil)) != outputID {
|
||||
return 0, errors.New("file content changed underfoot")
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
Reference in New Issue
Block a user