Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: Fix walkBlob to call callback only for unlazied contents #5595

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor,
descHandlers := descHandlersOf(opts...)
if desc.Digest != "" && (descHandlers == nil || descHandlers[desc.Digest] == nil) {
if _, err := cm.ContentStore.Info(ctx, desc.Digest); errors.Is(err, cerrdefs.ErrNotFound) {
return nil, NeedsRemoteProviderError([]digest.Digest{desc.Digest})
return nil, NeedsRemoteProviderError([]DigestDescriptionPair{{Digest: desc.Digest}})
} else if err != nil {
return nil, err
}
Expand Down Expand Up @@ -396,7 +396,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt
if isLazy, err := cr.isLazy(ctx); err != nil {
return err
} else if isLazy && dhs[blob] == nil {
missing = append(missing, blob)
missing = append(missing, DigestDescriptionPair{Digest: blob, Description: cr.GetDescription()})
}
return nil
}); err != nil {
Expand Down
13 changes: 11 additions & 2 deletions cache/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@ func descHandlersOf(opts ...RefOption) DescHandlers {

type DescHandlerKey digest.Digest

type NeedsRemoteProviderError []digest.Digest //nolint:errname
type NeedsRemoteProviderError []DigestDescriptionPair //nolint:errname

type DigestDescriptionPair struct {
Digest digest.Digest
Description string
}

func (d DigestDescriptionPair) String() string {
return fmt.Sprintf("%s: %s", d.Digest, d.Description)
}

func (m NeedsRemoteProviderError) Error() string {
return fmt.Sprintf("missing descriptor handlers for lazy blobs %+v", []digest.Digest(m))
return fmt.Sprintf("missing descriptor handlers for lazy blobs %+v", []DigestDescriptionPair(m))
}

type Unlazy session.Group
Expand Down
20 changes: 20 additions & 0 deletions cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Ref interface {
RefMetadata
Release(context.Context) error
IdentityMapping() *idtools.IdentityMapping
DescHandlers() DescHandlers
DescHandler(digest.Digest) *DescHandler
}

Expand Down Expand Up @@ -612,6 +613,13 @@ func (sr *immutableRef) LayerChain() RefList {
return l
}

func (sr *immutableRef) DescHandlers() DescHandlers {
// clone to prevent mutation of internal state
dhs := make(DescHandlers)
maps.Copy(dhs, sr.descHandlers)
return dhs
}

func (sr *immutableRef) DescHandler(dgst digest.Digest) *DescHandler {
return sr.descHandlers[dgst]
}
Expand Down Expand Up @@ -640,6 +648,13 @@ func (sr *mutableRef) traceLogFields() logrus.Fields {
return m
}

func (sr *mutableRef) DescHandlers() DescHandlers {
// clone to prevent mutation of internal state
dhs := make(DescHandlers)
maps.Copy(dhs, sr.descHandlers)
return dhs
}

func (sr *mutableRef) DescHandler(dgst digest.Digest) *DescHandler {
return sr.descHandlers[dgst]
}
Expand Down Expand Up @@ -833,6 +848,11 @@ func getBlobWithCompression(ctx context.Context, cs content.Store, desc ocispecs
}

func walkBlob(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, f func(ocispecs.Descriptor) bool) error {
if _, err := cs.Info(ctx, desc.Digest); errors.Is(err, cerrdefs.ErrNotFound) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know a more defined call path for this case? I assume the blob is missing because it was not pulled and is missing the descHandlers. Is this expected or did we forget to pass the descHandlers? Or is this a callpath where we would want to skip a blob if we only have access to it remotely and not locally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called from getAvailableBlobs and in this context, this function doesn't have access to refs(and descHandlers) to compression variants thus GetRemtoes currently treats only local blobs as available compression variants. To change GetRemote to return all remotely accessible variants, we need to change getAvailableBlobs somehow accessible to all compression variant's refs and descHandlers to trigger unlazying of them.

return nil // this blob doesn't exist in the content store. Don't call the callback.
} else if err != nil {
return err
}
if !f(desc) {
return nil
}
Expand Down
242 changes: 242 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){
testLayerLimitOnMounts,
testFrontendVerifyPlatforms,
testRunValidExitCodes,
testSameChainIDWithLazyBlobsCacheExport,
testSameChainIDWithLazyBlobsCacheMountBase,
}

func TestIntegration(t *testing.T) {
Expand Down Expand Up @@ -10895,3 +10897,243 @@ func testRunValidExitCodes(t *testing.T, sb integration.Sandbox) {
require.Error(t, err)
require.ErrorContains(t, err, "exit code: 0")
}

func testSameChainIDWithLazyBlobsCacheExport(t *testing.T, sb integration.Sandbox) {
requiresLinux(t)
workers.CheckFeatureCompat(t, sb,
workers.FeatureCacheExport,
workers.FeatureCacheImport,
workers.FeatureCacheBackendRegistry,
)

c, err := New(sb.Context(), sb.Address())
require.NoError(t, err)
defer c.Close()

registry, err := sb.NewRegistry()
if errors.Is(err, integration.ErrRequirements) {
t.Skip(err.Error())
}
require.NoError(t, err)

// push the base busybox image, ensuring it uses gzip

def, err := llb.Image("busybox:latest").
Marshal(sb.Context())
require.NoError(t, err)
busyboxGzipRef := registry + "/buildkit/busyboxgzip:latest"
_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": busyboxGzipRef,
"push": "true",
"compression": "gzip",
"force-compression": "true",
},
},
},
}, nil)
require.NoError(t, err)

// push the base busybox image plus an extra layer, ensuring it uses zstd
// the extra layer allows us to avoid edge-merge/cache-load later
def, err = llb.Image("busybox:latest").
Run(llb.Shlex(`touch /foo`)).Root().
Marshal(sb.Context())
require.NoError(t, err)
busyboxZstdRef := registry + "/buildkit/busyboxzstd:latest"
_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": busyboxZstdRef,
"push": "true",
"compression": "zstd",
"force-compression": "true",
},
},
},
}, nil)
require.NoError(t, err)

ensurePruneAll(t, c, sb)

// create non-lazy cache refs for the zstd image
def, err = llb.Image(busyboxZstdRef).
Run(llb.Shlex(`true`)).Root().
Marshal(sb.Context())
require.NoError(t, err)
_, err = c.Solve(sb.Context(), def, SolveOpt{}, nil)
require.NoError(t, err)

// Create lazy cache refs for the gzip layers that will be deduped by chainID with
// the zstd layers made in the previous solve.
// Put a random file in the rootfs, run a cache invalidation step and then copy
// the random file to a r/w mnt.
def, err = llb.Image(busyboxGzipRef).
Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 > /rand"`)).Root().
Run(llb.Shlex(`echo `+identity.NewID())).Root().
Run(llb.Shlex(`cp /rand /mnt/rand`)).AddMount("/mnt", llb.Scratch()).
Marshal(sb.Context())
require.NoError(t, err)

outDir := t.TempDir()
_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterLocal,
OutputDir: outDir,
},
},
CacheExports: []CacheOptionsEntry{
{
Type: "registry",
Attrs: map[string]string{
"ref": registry + "/buildkit/idc:latest",
"mode": "max",
},
},
},
}, nil)
require.NoError(t, err)

rand1, err := os.ReadFile(filepath.Join(outDir, "rand"))
require.NoError(t, err)

ensurePruneAll(t, c, sb)

// Run the same steps as before but with a different cache invalidation step in the middle
// The random file should still be cached from earlier and thus the output should be the same
def, err = llb.Image(busyboxGzipRef).
Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 > /rand"`)).Root().
Run(llb.Shlex(`echo `+identity.NewID())).Root().
Run(llb.Shlex(`cp /rand /mnt/rand`)).AddMount("/mnt", llb.Scratch()).
Marshal(sb.Context())
require.NoError(t, err)

outDir = t.TempDir()
_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterLocal,
OutputDir: outDir,
},
},
CacheImports: []CacheOptionsEntry{
{
Type: "registry",
Attrs: map[string]string{
"ref": registry + "/buildkit/idc:latest",
"mode": "max",
},
},
},
}, nil)
require.NoError(t, err)

rand2, err := os.ReadFile(filepath.Join(outDir, "rand"))
require.NoError(t, err)

require.Equal(t, string(rand1), string(rand2))
}

func testSameChainIDWithLazyBlobsCacheMountBase(t *testing.T, sb integration.Sandbox) {
requiresLinux(t)
workers.CheckFeatureCompat(t, sb,
workers.FeatureCacheExport,
workers.FeatureCacheImport,
workers.FeatureCacheBackendRegistry,
)

c, err := New(sb.Context(), sb.Address())
require.NoError(t, err)
defer c.Close()

registry, err := sb.NewRegistry()
if errors.Is(err, integration.ErrRequirements) {
t.Skip(err.Error())
}
require.NoError(t, err)

// push the base busybox image, ensuring it uses gzip

def, err := llb.Image("busybox:latest").
Marshal(sb.Context())
require.NoError(t, err)
busyboxGzipRef := registry + "/buildkit/busyboxgzip:latest"
_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": busyboxGzipRef,
"push": "true",
"compression": "gzip",
"force-compression": "true",
},
},
},
}, nil)
require.NoError(t, err)

// push the base busybox image plus an extra layer, ensuring it uses zstd
// the extra layer allows us to avoid edge-merge/cache-load later
def, err = llb.Image("busybox:latest").
Run(llb.Shlex(`touch /foo`)).Root().
Marshal(sb.Context())
require.NoError(t, err)
busyboxZstdRef := registry + "/buildkit/busyboxzstd:latest"
_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": busyboxZstdRef,
"push": "true",
"compression": "zstd",
"force-compression": "true",
},
},
},
}, nil)
require.NoError(t, err)

ensurePruneAll(t, c, sb)

// create non-lazy cache refs for the zstd image
def, err = llb.Image(busyboxZstdRef).
Run(llb.Shlex(`true`)).Root().
Marshal(sb.Context())
require.NoError(t, err)
_, err = c.Solve(sb.Context(), def, SolveOpt{}, nil)
require.NoError(t, err)

// use the gzip image as a cache mount base, the cache ref will be deduped by
// chainID with the zstd layers made in the previous solve
def, err = llb.Image(busyboxZstdRef).Run(
llb.Shlex(`touch /mnt/bar`),
llb.AddMount("/mnt",
llb.Image(busyboxGzipRef),
llb.AsPersistentCacheDir("idc", llb.CacheMountShared),
),
).Root().Marshal(sb.Context())
require.NoError(t, err)
_, err = c.Solve(sb.Context(), def, SolveOpt{}, nil)
require.NoError(t, err)

// try to re-use the cache mount from before, ensure we successfully get
// the same one written to in previous step
def, err = llb.Image(busyboxZstdRef).Run(
llb.Shlex(`stat /mnt/bar`),
llb.AddMount("/mnt",
llb.Image(busyboxGzipRef),
llb.AsPersistentCacheDir("idc", llb.CacheMountShared),
),
).Root().Marshal(sb.Context())
require.NoError(t, err)
_, err = c.Solve(sb.Context(), def, SolveOpt{}, nil)
require.NoError(t, err)
}
Loading
Loading