Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various S3 improvements #16

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ jobs:

build:
runs-on: ubuntu-latest
name: Build with Go ${{ matrix.go }}
name: Build with Go ${{ matrix.go }} ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
go: [ '1.23', '1.24.0-rc.1' ]
steps:
- uses: actions/checkout@v4
Expand Down
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,18 @@ cacher: closing; 808 gets (808 hits, 0 misses, 0 errors); 0 puts (0 errors)
```

## S3 Support

We support S3 backend for caching.

You can connect to S3 backend by setting the following parameters:
- `GOCACHE_S3_BUCKET` - Name of S3 bucket
- `GOCACHE_AWS_REGION` - AWS Region of bucket
- `GOCACHE_AWS_ACCESS_KEY` + `GOCACHE_AWS_SECRET_KEY` / `GOCACHE_AWS_CREDS_PROFILE` - Direct credentials or creds profile to use.
- `GOCACHE_CACHE_KEY` - (Optional, default `v1`) Unique key
- `GOCACHE_S3_BUCKET` - Name of S3 bucket (required)
- `GOCACHE_S3_ACCESS_KEY` + `GOCACHE_S3_SECRET_KEY` - Use static credentials.
- `GOCACHE_S3_SESSION_TOKEN` - Session token to use for alternative authentication.
- `GOCACHE_S3_REGION` - AWS Region of bucket. Default is `us-east-1`.
- `GOCACHE_S3_URL` - specify a custom endpoint. Will switch to path-style requests.
- `GOCACHE_S3_PREFIX` - Use a custom prefix for all entries. Default is `go-cacher`.

- Direct credentials or creds profile to use:
- `GOCACHE_AWS_CREDS_PROFILE`

The cache would be stored to `s3://<bucket>/cache/<cache_key>/<architecture>/<os>/<go-version>`
9 changes: 5 additions & 4 deletions cacheproc/cacheproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package cacheproc

import (
"bufio"
"bytes"
"context"
"encoding/hex"
"encoding/json"
Expand All @@ -20,6 +19,7 @@ import (
"sync"

"github.com/bradfitz/go-tool-cache/cachers"
"github.com/bradfitz/go-tool-cache/internal/sbytes"
"golang.org/x/sync/errgroup"

"github.com/bradfitz/go-tool-cache/wire"
Expand Down Expand Up @@ -93,7 +93,7 @@ func (p *Process) Run(ctx context.Context) error {
if int64(len(bodyb)) != req.BodySize {
log.Fatalf("only got %d bytes of declared %d", len(bodyb), req.BodySize)
}
req.Body = bytes.NewReader(bodyb)
req.Body = sbytes.NewBuffer(bodyb)
}
wg.Go(func() error {
res := &wire.Response{ID: req.ID}
Expand Down Expand Up @@ -151,7 +151,8 @@ func (p *Process) handleGet(ctx context.Context, req *wire.Request, res *wire.Re
return fmt.Errorf("not a regular file")
}
res.Size = fi.Size()
res.TimeNanos = fi.ModTime().UnixNano()
mt := fi.ModTime()
res.Time = &mt
res.DiskPath = diskPath
return nil
}
Expand All @@ -165,7 +166,7 @@ func (p *Process) handlePut(ctx context.Context, req *wire.Request, res *wire.Re
}()
var body = req.Body
if body == nil {
body = bytes.NewReader(nil)
body = sbytes.NewBuffer(nil)
}
diskPath, err := p.cache.Put(ctx, actionID, outputID, req.BodySize, body)
if err != nil {
Expand Down
29 changes: 26 additions & 3 deletions cachers/combined.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package cachers

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"

"github.com/bradfitz/go-tool-cache/internal/sbytes"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -81,12 +81,15 @@ func (l *CombinedCache) Get(ctx context.Context, actionID string) (string, strin
}

func (l *CombinedCache) Put(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (diskPath string, err error) {
if br, ok := body.(*sbytes.Buffer); ok {
return l.putBytes(ctx, actionID, outputID, size, br.Bytes())
}
pr, pw := io.Pipe()
wg, _ := errgroup.WithContext(ctx)
wg.Go(func() error {
var putBody io.Reader = pr
if size == 0 {
putBody = bytes.NewReader(nil)
putBody = sbytes.NewBuffer(nil)
}
var err2 error
diskPath, err2 = l.localCache.Put(ctx, actionID, outputID, size, putBody)
Expand All @@ -98,7 +101,7 @@ func (l *CombinedCache) Put(ctx context.Context, actionID, outputID string, size
// Special case the empty file so NewRequest sets "Content-Length: 0",
// as opposed to thinking we didn't set it and not being able to sniff its size
// from the type.
putBody = bytes.NewReader(nil)
putBody = sbytes.NewBuffer(nil)
} else {

putBody = io.TeeReader(body, pw)
Expand All @@ -114,7 +117,27 @@ func (l *CombinedCache) Put(ctx context.Context, actionID, outputID string, size
return "", err
}
return diskPath, nil
}

func (l *CombinedCache) putBytes(ctx context.Context, actionID, outputID string, size int64, body []byte) (diskPath string, err error) {
wg, _ := errgroup.WithContext(ctx)
wg.Go(func() error {
var err2 error
diskPath, err2 = l.localCache.Put(ctx, actionID, outputID, size, sbytes.NewBuffer(body))
return err2
})

// tolerate remote write errors
_, _ = l.putsMetrics.DoWithMeasure(size, func() (string, error) {
e := l.remoteCache.Put(ctx, actionID, outputID, size, sbytes.NewBuffer(body))
return "", e
})

if err := wg.Wait(); err != nil {
log.Printf("[%s]\terror: %v", l.localCache.Kind(), err)
return "", err
}
return diskPath, nil
}

func (l *CombinedCache) Close() error {
Expand Down
11 changes: 8 additions & 3 deletions cachers/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -60,14 +61,18 @@ func (dc *SimpleDiskCache) Get(_ context.Context, actionID string) (outputID, di
return "", "", nil
}
if _, err := hex.DecodeString(ie.OutputID); err != nil {
log.Printf("Warning: Output ID for action %q: %v", actionID, err)
// Protect against malicious non-hex OutputID on disk
return "", "", nil
}
return ie.OutputID, filepath.Join(dc.dir, fmt.Sprintf("o-%v", ie.OutputID)), nil
}

func (dc *SimpleDiskCache) Put(_ context.Context, actionID, objectID string, size int64, body io.Reader) (diskPath string, _ error) {
file := filepath.Join(dc.dir, fmt.Sprintf("o-%s", objectID))
func (dc *SimpleDiskCache) Put(_ context.Context, actionID, outputID string, size int64, body io.Reader) (diskPath string, _ error) {
if outputID == "" {
return "", errors.New("empty outputID")
}
Comment on lines +72 to +74
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this backward compatible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't test with EXPERIMENT option. For 1.24 it is.

file := filepath.Join(dc.dir, fmt.Sprintf("o-%s", outputID))

// Special case empty files; they're both common and easier to do race-free.
if size == 0 {
Expand All @@ -88,7 +93,7 @@ func (dc *SimpleDiskCache) Put(_ context.Context, actionID, objectID string, siz

ij, err := json.Marshal(indexEntry{
Version: 1,
OutputID: objectID,
OutputID: outputID,
Size: size,
TimeNanos: time.Now().UnixNano(),
})
Expand Down
88 changes: 72 additions & 16 deletions cachers/s3.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package cachers

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net/url"
"os"
"path"
"runtime"

"github.com/aws/smithy-go"
"strconv"
"strings"
"sync"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go"
"github.com/bradfitz/go-tool-cache/internal/sbytes"
"github.com/klauspost/compress/s2"
)

const (
outputIDMetadataKey = "outputid"
outputIDMetadataKey = "outputid"
compressedMetadataKey = "compressed"
decompSizeMetadataKey = "decomp-size"
)

// s3Client represents the functions we need from the S3 client
Expand All @@ -30,6 +36,7 @@ type s3Client interface {
type S3Cache struct {
bucket string
prefix string
tags string
// verbose optionally specifies whether to log verbose messages.
verbose bool
s3Client s3Client
Expand All @@ -52,6 +59,9 @@ func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID string, si
Bucket: &s.bucket,
Key: &actionKey,
})
if s.verbose {
log.Printf("[%s]\t GetObject: s3://%s/%s ok:%v", s.Kind(), s.bucket, actionKey, getOutputErr == nil)
}
if isNotFoundError(getOutputErr) {
// handle object not found
return "", 0, nil, nil
Expand All @@ -63,25 +73,61 @@ func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID string, si
}
contentSize := outputResult.ContentLength
outputID, ok := outputResult.Metadata[outputIDMetadataKey]
if !ok || outputID == "" {
return "", 0, nil, fmt.Errorf("outputId not found in metadata")
if !ok || outputID == "" || contentSize == nil {
return "", 0, nil, fmt.Errorf("outputId or contentSize not found in metadata")
}
return outputID, contentSize, outputResult.Body, nil
if outputResult.Metadata[compressedMetadataKey] == "s2" {
sz, err := strconv.Atoi(outputResult.Metadata[decompSizeMetadataKey])
if err != nil {
return "", 0, nil, err
}
*contentSize = int64(sz)
outputResult.Body = struct {
io.Reader
io.Closer
}{Reader: s2.NewReader(outputResult.Body), Closer: outputResult.Body}
}
return outputID, *contentSize, outputResult.Body, nil
}

var s2Encoders = sync.Pool{
New: func() interface{} { return s2.NewWriter(nil, s2.WriterBlockSize(1<<20), s2.WriterBetterCompression()) },
}

func (s *S3Cache) Put(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (err error) {
if size == 0 {
body = bytes.NewReader(nil)
body = sbytes.NewBuffer(nil)
}

actionKey := s.actionKey(actionID)
if s.verbose {
log.Printf("[%s]\t PutObject: s3://%s/%s", s.Kind(), s.bucket, actionKey)
}
metadata := map[string]string{
outputIDMetadataKey: outputID,
}

if bb, ok := body.(*sbytes.Buffer); size > 8<<10 && ok {
dst := sbytes.NewBuffer(make([]byte, 0, size/2))
enc := s2Encoders.Get().(*s2.Writer)
enc.Reset(dst)
enc.EncodeBuffer(bb.Bytes())
enc.Close()
metadata[compressedMetadataKey] = "s2"
metadata[decompSizeMetadataKey] = strconv.Itoa(int(size))
enc.Reset(nil)
s2Encoders.Put(enc)
body = dst
size = int64(dst.Len())
}

_, err = s.s3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &s.bucket,
Key: &actionKey,
Body: body,
ContentLength: size,
Metadata: map[string]string{
outputIDMetadataKey: outputID,
},
ContentLength: &size,
Metadata: metadata,
Tagging: &s.tags,
}, func(options *s3.Options) {
options.RetryMaxAttempts = 1 // We cannot perform seek in Body
})
Expand All @@ -95,7 +141,7 @@ func (s *S3Cache) Close() error {
return nil
}

func NewS3Cache(client s3Client, bucketName string, cacheKey string, verbose bool) *S3Cache {
func NewS3Cache(client s3Client, bucketName, prefix string, verbose bool) *S3Cache {
// get target architecture
goarch := os.Getenv("GOARCH")
if goarch == "" {
Expand All @@ -106,12 +152,16 @@ func NewS3Cache(client s3Client, bucketName string, cacheKey string, verbose boo
if goos == "" {
goos = runtime.GOOS
}
prefix := path.Join("cache", cacheKey, goarch, goos)
tags := make(url.Values, 2)
tags.Add("GOARCH", goarch)
tags.Add("GOOS", goos)

cache := &S3Cache{
s3Client: client,
bucket: bucketName,
prefix: prefix,
prefix: strings.Trim(prefix, "/.\\"),
or-shachar marked this conversation as resolved.
Show resolved Hide resolved
verbose: verbose,
tags: tags.Encode(),
}
return cache
}
Expand All @@ -128,5 +178,11 @@ func isNotFoundError(err error) bool {
}

func (s *S3Cache) actionKey(actionID string) string {
return fmt.Sprintf("%s/%s", s.prefix, actionID)
objPre := ""
if len(actionID) > 3 {
// 4096 prefixes.
objPre = actionID[:3]
actionID = actionID[3:]
}
return path.Join(s.prefix, objPre, actionID)
}
8 changes: 2 additions & 6 deletions cachers/timekeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func newTimeKeeper() *timeKeeper {

func (c *timeKeeper) Start(ctx context.Context) {
c.wg, _ = errgroup.WithContext(ctx)
start := time.Now()
c.wg.Go(func() error {
for m := range c.metricsChan {
c.TotalBytes += m.bytes
speed := float64(m.bytes) / m.duration.Seconds()
c.AvgBytesPerSecond = newAverage(c.AvgBytesPerSecond, c.Count, speed)
c.AvgBytesPerSecond = float64(c.TotalBytes) / time.Since(start).Seconds()
c.Count++
}
return nil
Expand All @@ -53,10 +53,6 @@ func (c *timeKeeper) Summary() string {
formatBytes(float64(c.TotalBytes)), formatBytes(c.AvgBytesPerSecond))
}

func newAverage(oldAverage float64, count int64, newValue float64) float64 {
return (oldAverage*float64(count) + newValue) / float64(count+1)
}

func (c *timeKeeper) DoWithMeasure(bytesCount int64, f func() (string, error)) (string, error) {
start := time.Now()
s, err := f()
Expand Down
2 changes: 1 addition & 1 deletion cmd/go-cacher-server/cacher-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func OutputFilename(dir, outputID string) string {
if len(outputID) < 4 || len(outputID) > 1000 {
return ""
}
for i := range outputID {
for _, b := range outputID {
b := outputID[i]
if b >= '0' && b <= '9' || b >= 'a' && b <= 'f' {
continue
Expand Down
Loading
Loading