From e0653a72ad446c51b637c96abb485e7e0eb49f05 Mon Sep 17 00:00:00 2001 From: thesayyn Date: Thu, 28 Sep 2023 16:08:26 -0700 Subject: [PATCH] feat: implement gzip stream calculation Signed-off-by: thesayyn --- go.mod | 1 + go.sum | 2 + pkg/apk/implementation.go | 56 +++++++++++++ pkg/apk/installed.go | 42 ++-------- pkg/apk/resolveapk.go | 172 ++++++++++++++++++++++++++++++++++++++ pkg/apk/util.go | 58 +++++++++++++ 6 files changed, 296 insertions(+), 35 deletions(-) create mode 100644 pkg/apk/resolveapk.go diff --git a/go.mod b/go.mod index c77a4ad..981b2c2 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( go.opentelemetry.io/otel v1.19.0 go.opentelemetry.io/otel/trace v1.19.0 golang.org/x/build v0.0.0-20220928220451-9294235e16f5 + golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 golang.org/x/sync v0.4.0 golang.org/x/sys v0.13.0 ) diff --git a/go.sum b/go.sum index 2082adf..c939a9b 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ golang.org/x/build v0.0.0-20220928220451-9294235e16f5 h1:g6/rDfRDvg9YmyUh+Gh15jX golang.org/x/build v0.0.0-20220928220451-9294235e16f5/go.mod h1:09OhLJI8jZv4jqec7zqh+ZlRZKsSZgDyM5MV3pjurk4= golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ= +golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/apk/implementation.go b/pkg/apk/implementation.go index dccd0f2..ab7983f 100644 --- a/pkg/apk/implementation.go +++ b/pkg/apk/implementation.go @@ -457,6 +457,62 @@ func (a *APK) ResolveWorld(ctx context.Context) (toInstall []*repository.Reposit return } +func (a *APK) ResolveAndCalculateWorld(ctx context.Context) ([]*APKResolved, error) { + a.logger.Infof("synchronizing with desired apk world") + + ctx, span := otel.Tracer("go-apk").Start(ctx, "CalculateWorld") + defer span.End() + + allpkgs, _, err := a.ResolveWorld(ctx) + if err != nil { + return nil, fmt.Errorf("error getting package dependencies: %w", err) + } + + // TODO: Consider making this configurable option. + jobs := runtime.GOMAXPROCS(0) + + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(jobs + 1) + + resolved := make([]*APKResolved, len(allpkgs)) + + // A slice of pseudo-promises that get closed when expanded[i] is ready. + done := make([]chan struct{}, len(allpkgs)) + for i := range allpkgs { + done[i] = make(chan struct{}) + } + + // Meanwhile, concurrently fetch and expand all our APKs. + // We signal they are ready to be installed by closing done[i]. + for i, pkg := range allpkgs { + i, pkg := i, pkg + + g.Go(func() error { + r, err := a.FetchPackage(gctx, pkg) + if err != nil { + return fmt.Errorf("fetching %s: %w", pkg.Name, err) + } + res, err := ResolveApk(gctx, r) + if err != nil { + return fmt.Errorf("resolving %s: %w", pkg.Name, err) + } + + res.Package = pkg + resolved[i] = res + + close(done[i]) + + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, fmt.Errorf("installing packages: %w", err) + } + + return resolved, nil +} + // FixateWorld force apk's resolver to re-resolve the requested dependencies in /etc/apk/world. func (a *APK) FixateWorld(ctx context.Context, sourceDateEpoch *time.Time) error { /* diff --git a/pkg/apk/installed.go b/pkg/apk/installed.go index 2d113e2..9f35915 100644 --- a/pkg/apk/installed.go +++ b/pkg/apk/installed.go @@ -191,43 +191,15 @@ func (a *APK) controlValue(controlTarGz io.Reader, want string) ([]string, error defer gz.Close() tr := tar.NewReader(gz) - values := []string{} - for { - header, err := tr.Next() - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, err - } - - // ignore .PKGINFO as it is not a script - if header.Name != ".PKGINFO" { - continue - } - - b, err := io.ReadAll(tr) - if err != nil { - return nil, fmt.Errorf("unable to read .PKGINFO from control tar.gz file: %w", err) - } - lines := strings.Split(string(b), "\n") - for _, line := range lines { - parts := strings.Split(line, "=") - if len(parts) != 2 { - continue - } - key := strings.TrimSpace(parts[0]) - if key != want { - continue - } - - value := strings.TrimSpace(parts[1]) - values = append(values, value) - } - - break + mapping, err := controlValue(tr, want) + if err != nil { + return nil, err } + values, ok := mapping[want] + if !ok { + return []string{}, nil + } return values, nil } diff --git a/pkg/apk/resolveapk.go b/pkg/apk/resolveapk.go new file mode 100644 index 0000000..180d8d3 --- /dev/null +++ b/pkg/apk/resolveapk.go @@ -0,0 +1,172 @@ +//nolint:all +package apk + +import ( + "archive/tar" + "context" + "crypto/sha1" + "encoding/hex" + "fmt" + "hash" + "io" + "strconv" + "strings" + + "github.com/klauspost/compress/gzip" + "gitlab.alpinelinux.org/alpine/go/repository" + + "go.opentelemetry.io/otel" +) + +// An implementation of io.Reader designed specifically for use in the resolveApk() method. +// When used in combination with the expandApkWrier (based on os.File) in a io.TeeReader, +// the Go stdlib optimizes the write, causing readahead, even if the actual stream size +// is less than the size of the incoming buffer. To fix this, the Read() method on this +// Reader has been modified to read only a single byte at a time to workaround the issue. +type noReadAheadApkReader struct { + io.Reader +} + +func newNoReadAheadApkReader(r io.Reader) *noReadAheadApkReader { + return &noReadAheadApkReader{ + Reader: r, + } +} + +func (r *noReadAheadApkReader) Read(b []byte) (int, error) { + buf := make([]byte, 1) + n, err := r.Reader.Read(buf) + if err != nil && err != io.EOF { + err = fmt.Errorf("expandApkReader.Read: %w", err) + } else { + b[0] = buf[0] + } + return n, err +} + +type APKResolved struct { + Package *repository.RepositoryPackage + + SignatureSize int + SignatureHash []byte + + ControlSize int + ControlHash []byte + + DataSize int + DataHash []byte +} + +type countingWriter struct { + bytesWritten int +} + +func (r *countingWriter) Write(p []byte) (n int, err error) { + r.bytesWritten += len(p) + return n, nil +} + +func ResolveApk(ctx context.Context, source io.Reader) (*APKResolved, error) { + ctx, span := otel.Tracer("go-apk").Start(ctx, "ResolveApk") + defer span.End() + + gzipStreamSizes := make([]int, 3) + hashes := make([][]byte, 3) + maxStreams := 2 + streamId := 0 + controlIdx := 0 + signed := false + + var gzi *gzip.Reader + cr := &countingWriter{} + tr := io.TeeReader(source, cr) + norar := newNoReadAheadApkReader(tr) + + for { + var h hash.Hash = sha1.New() //nolint:gosec + + hr := io.TeeReader(norar, h) + + var err error + + if gzi == nil { + gzi, err = gzip.NewReader(hr) + } else { + err = gzi.Reset(hr) + } + + if err == io.EOF { + break + } else if err != nil { + return nil, fmt.Errorf("creating gzip reader: %w", err) + } + gzi.Multistream(false) + + if streamId == 0 { + tr := tar.NewReader(gzi) + hdr, err := tr.Next() + if err != nil { + return nil, fmt.Errorf("ResolveApk error 1: %v", err) + } + if strings.HasPrefix(hdr.Name, ".SIGN.") { + maxStreams = 3 + controlIdx = 1 + signed = true + } + } else if controlIdx == streamId { + mapping, err := controlValue(gzi, "datahash", "size") + if err != nil { + return nil, fmt.Errorf("reading datahash and size from control: %w", err) + } + + if sizes, ok := mapping["size"]; !ok { + return nil, fmt.Errorf("reading size from control: %w", err) + } else if len(sizes) != 1 { + return nil, fmt.Errorf("saw %d size values", len(sizes)) + } else if size, err := strconv.Atoi(sizes[0]); err != nil { + return nil, fmt.Errorf("parsing size from control: %w", err) + } else { + gzipStreamSizes[maxStreams-1] = size + } + + if datahashes, ok := mapping["datahash"]; !ok { + return nil, fmt.Errorf("reading datahash from control: %w", err) + } else if len(datahashes) != 1 { + return nil, fmt.Errorf("saw %d datahash values", len(datahashes)) + } else if hash, err := hex.DecodeString(datahashes[0]); err != nil { + return nil, fmt.Errorf("reading datahash from control: %w", err) + } else { + hashes[maxStreams-1] = hash + } + } + + if streamId <= controlIdx { + if _, err := io.Copy(io.Discard, gzi); err != nil { + return nil, fmt.Errorf("ResolveApk error 2: %v", err) + } + + hashes[streamId] = h.Sum(nil) + gzipStreamSizes[streamId] = cr.bytesWritten + } else { + gzi.Close() + break + } + + streamId++ + } + + resolved := &APKResolved{ + SignatureSize: 0, + ControlSize: gzipStreamSizes[controlIdx], + ControlHash: hashes[controlIdx], + DataSize: gzipStreamSizes[controlIdx+1], + DataHash: hashes[controlIdx+1], + } + + if signed { + resolved.SignatureSize = gzipStreamSizes[0] + resolved.SignatureHash = hashes[0] + } + + return resolved, nil +} diff --git a/pkg/apk/util.go b/pkg/apk/util.go index e8634e2..72e04b2 100644 --- a/pkg/apk/util.go +++ b/pkg/apk/util.go @@ -14,6 +14,16 @@ package apk +import ( + "archive/tar" + "errors" + "fmt" + "io" + "strings" + + "golang.org/x/exp/slices" +) + func uniqify[T comparable](s []T) []T { seen := make(map[T]struct{}, len(s)) uniq := make([]T, 0, len(s)) @@ -28,3 +38,51 @@ func uniqify[T comparable](s []T) []T { return uniq } + +func controlValue(controlTar io.Reader, want ...string) (map[string][]string, error) { + tr := tar.NewReader(controlTar) + mapping := map[string][]string{} + for { + header, err := tr.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + + // ignore .PKGINFO as it is not a script + if header.Name != ".PKGINFO" { + continue + } + + b, err := io.ReadAll(tr) + if err != nil { + return nil, fmt.Errorf("unable to read .PKGINFO from control tar.gz file: %w", err) + } + lines := strings.Split(string(b), "\n") + for _, line := range lines { + parts := strings.Split(line, "=") + if len(parts) != 2 { + continue + } + key := strings.TrimSpace(parts[0]) + if !slices.Contains(want, key) { + continue + } + + values, ok := mapping[key] + if !ok { + values = []string{} + } + + value := strings.TrimSpace(parts[1]) + values = append(values, value) + + mapping[key] = values + } + + break + } + return mapping, nil +}