Skip to content
This repository has been archived by the owner on Jun 5, 2024. It is now read-only.

Commit

Permalink
feat: implement gzip stream calculation
Browse files Browse the repository at this point in the history
Signed-off-by: thesayyn <[email protected]>
  • Loading branch information
thesayyn committed Oct 11, 2023
1 parent cb033c6 commit e0653a7
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 35 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
golang.org/x/build v0.0.0-20220928220451-9294235e16f5
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/sync v0.4.0
golang.org/x/sys v0.13.0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ golang.org/x/build v0.0.0-20220928220451-9294235e16f5 h1:g6/rDfRDvg9YmyUh+Gh15jX
golang.org/x/build v0.0.0-20220928220451-9294235e16f5/go.mod h1:09OhLJI8jZv4jqec7zqh+ZlRZKsSZgDyM5MV3pjurk4=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
56 changes: 56 additions & 0 deletions pkg/apk/implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,62 @@ func (a *APK) ResolveWorld(ctx context.Context) (toInstall []*repository.Reposit
return
}

func (a *APK) ResolveAndCalculateWorld(ctx context.Context) ([]*APKResolved, error) {
a.logger.Infof("synchronizing with desired apk world")

ctx, span := otel.Tracer("go-apk").Start(ctx, "CalculateWorld")
defer span.End()

allpkgs, _, err := a.ResolveWorld(ctx)
if err != nil {
return nil, fmt.Errorf("error getting package dependencies: %w", err)
}

// TODO: Consider making this configurable option.
jobs := runtime.GOMAXPROCS(0)

g, gctx := errgroup.WithContext(ctx)
g.SetLimit(jobs + 1)

resolved := make([]*APKResolved, len(allpkgs))

// A slice of pseudo-promises that get closed when expanded[i] is ready.
done := make([]chan struct{}, len(allpkgs))
for i := range allpkgs {
done[i] = make(chan struct{})
}

// Meanwhile, concurrently fetch and expand all our APKs.
// We signal they are ready to be installed by closing done[i].
for i, pkg := range allpkgs {
i, pkg := i, pkg

g.Go(func() error {
r, err := a.FetchPackage(gctx, pkg)
if err != nil {
return fmt.Errorf("fetching %s: %w", pkg.Name, err)
}
res, err := ResolveApk(gctx, r)
if err != nil {
return fmt.Errorf("resolving %s: %w", pkg.Name, err)
}

res.Package = pkg
resolved[i] = res

close(done[i])

return nil
})
}

if err := g.Wait(); err != nil {
return nil, fmt.Errorf("installing packages: %w", err)
}

return resolved, nil
}

// FixateWorld force apk's resolver to re-resolve the requested dependencies in /etc/apk/world.
func (a *APK) FixateWorld(ctx context.Context, sourceDateEpoch *time.Time) error {
/*
Expand Down
42 changes: 7 additions & 35 deletions pkg/apk/installed.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,43 +191,15 @@ func (a *APK) controlValue(controlTarGz io.Reader, want string) ([]string, error
defer gz.Close()
tr := tar.NewReader(gz)

values := []string{}
for {
header, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}

// ignore .PKGINFO as it is not a script
if header.Name != ".PKGINFO" {
continue
}

b, err := io.ReadAll(tr)
if err != nil {
return nil, fmt.Errorf("unable to read .PKGINFO from control tar.gz file: %w", err)
}
lines := strings.Split(string(b), "\n")
for _, line := range lines {
parts := strings.Split(line, "=")
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
if key != want {
continue
}

value := strings.TrimSpace(parts[1])
values = append(values, value)
}

break
mapping, err := controlValue(tr, want)
if err != nil {
return nil, err
}

values, ok := mapping[want]
if !ok {
return []string{}, nil
}
return values, nil
}

Expand Down
172 changes: 172 additions & 0 deletions pkg/apk/resolveapk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
//nolint:all
package apk

import (
"archive/tar"
"context"
"crypto/sha1"
"encoding/hex"
"fmt"
"hash"
"io"
"strconv"
"strings"

"github.com/klauspost/compress/gzip"
"gitlab.alpinelinux.org/alpine/go/repository"

"go.opentelemetry.io/otel"
)

// An implementation of io.Reader designed specifically for use in the resolveApk() method.
// When used in combination with the expandApkWrier (based on os.File) in a io.TeeReader,
// the Go stdlib optimizes the write, causing readahead, even if the actual stream size
// is less than the size of the incoming buffer. To fix this, the Read() method on this
// Reader has been modified to read only a single byte at a time to workaround the issue.
type noReadAheadApkReader struct {
io.Reader
}

func newNoReadAheadApkReader(r io.Reader) *noReadAheadApkReader {
return &noReadAheadApkReader{
Reader: r,
}
}

func (r *noReadAheadApkReader) Read(b []byte) (int, error) {
buf := make([]byte, 1)
n, err := r.Reader.Read(buf)
if err != nil && err != io.EOF {
err = fmt.Errorf("expandApkReader.Read: %w", err)
} else {
b[0] = buf[0]
}
return n, err
}

type APKResolved struct {
Package *repository.RepositoryPackage

SignatureSize int
SignatureHash []byte

ControlSize int
ControlHash []byte

DataSize int
DataHash []byte
}

type countingWriter struct {
bytesWritten int
}

func (r *countingWriter) Write(p []byte) (n int, err error) {
r.bytesWritten += len(p)
return n, nil
}

func ResolveApk(ctx context.Context, source io.Reader) (*APKResolved, error) {
ctx, span := otel.Tracer("go-apk").Start(ctx, "ResolveApk")
defer span.End()

gzipStreamSizes := make([]int, 3)
hashes := make([][]byte, 3)
maxStreams := 2
streamId := 0
controlIdx := 0
signed := false

var gzi *gzip.Reader
cr := &countingWriter{}
tr := io.TeeReader(source, cr)
norar := newNoReadAheadApkReader(tr)

for {
var h hash.Hash = sha1.New() //nolint:gosec

hr := io.TeeReader(norar, h)

var err error

if gzi == nil {
gzi, err = gzip.NewReader(hr)
} else {
err = gzi.Reset(hr)
}

if err == io.EOF {
break
} else if err != nil {
return nil, fmt.Errorf("creating gzip reader: %w", err)
}
gzi.Multistream(false)

if streamId == 0 {
tr := tar.NewReader(gzi)
hdr, err := tr.Next()
if err != nil {
return nil, fmt.Errorf("ResolveApk error 1: %v", err)
}
if strings.HasPrefix(hdr.Name, ".SIGN.") {
maxStreams = 3
controlIdx = 1
signed = true
}
} else if controlIdx == streamId {
mapping, err := controlValue(gzi, "datahash", "size")
if err != nil {
return nil, fmt.Errorf("reading datahash and size from control: %w", err)
}

if sizes, ok := mapping["size"]; !ok {
return nil, fmt.Errorf("reading size from control: %w", err)
} else if len(sizes) != 1 {
return nil, fmt.Errorf("saw %d size values", len(sizes))
} else if size, err := strconv.Atoi(sizes[0]); err != nil {
return nil, fmt.Errorf("parsing size from control: %w", err)
} else {
gzipStreamSizes[maxStreams-1] = size
}

if datahashes, ok := mapping["datahash"]; !ok {
return nil, fmt.Errorf("reading datahash from control: %w", err)
} else if len(datahashes) != 1 {
return nil, fmt.Errorf("saw %d datahash values", len(datahashes))
} else if hash, err := hex.DecodeString(datahashes[0]); err != nil {
return nil, fmt.Errorf("reading datahash from control: %w", err)
} else {
hashes[maxStreams-1] = hash
}
}

if streamId <= controlIdx {
if _, err := io.Copy(io.Discard, gzi); err != nil {
return nil, fmt.Errorf("ResolveApk error 2: %v", err)
}

hashes[streamId] = h.Sum(nil)
gzipStreamSizes[streamId] = cr.bytesWritten
} else {
gzi.Close()
break
}

streamId++
}

resolved := &APKResolved{
SignatureSize: 0,
ControlSize: gzipStreamSizes[controlIdx],
ControlHash: hashes[controlIdx],
DataSize: gzipStreamSizes[controlIdx+1],
DataHash: hashes[controlIdx+1],
}

if signed {
resolved.SignatureSize = gzipStreamSizes[0]
resolved.SignatureHash = hashes[0]
}

return resolved, nil
}
58 changes: 58 additions & 0 deletions pkg/apk/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@

package apk

import (
"archive/tar"
"errors"
"fmt"
"io"
"strings"

"golang.org/x/exp/slices"
)

func uniqify[T comparable](s []T) []T {
seen := make(map[T]struct{}, len(s))
uniq := make([]T, 0, len(s))
Expand All @@ -28,3 +38,51 @@ func uniqify[T comparable](s []T) []T {

return uniq
}

func controlValue(controlTar io.Reader, want ...string) (map[string][]string, error) {
tr := tar.NewReader(controlTar)
mapping := map[string][]string{}
for {
header, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}

// ignore .PKGINFO as it is not a script
if header.Name != ".PKGINFO" {
continue
}

b, err := io.ReadAll(tr)
if err != nil {
return nil, fmt.Errorf("unable to read .PKGINFO from control tar.gz file: %w", err)
}
lines := strings.Split(string(b), "\n")
for _, line := range lines {
parts := strings.Split(line, "=")
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
if !slices.Contains(want, key) {
continue
}

values, ok := mapping[key]
if !ok {
values = []string{}
}

value := strings.TrimSpace(parts[1])
values = append(values, value)

mapping[key] = values
}

break
}
return mapping, nil
}

0 comments on commit e0653a7

Please sign in to comment.