From f9c1e276593517250f116aec5fcedb4ac826e498 Mon Sep 17 00:00:00 2001 From: Gergely Brautigam <182850+Skarlso@users.noreply.github.com> Date: Thu, 19 Dec 2024 12:25:43 +0100 Subject: [PATCH 1/4] fix: deal with internal error coming from docker registry (#1203) #### What this PR does / why we need it #### Which issue(s) this PR fixes --------- Co-authored-by: jakobmoellerdev --- .../repositories/ocireg/repository.go | 13 +- api/tech/oras/client.go | 144 ++---------------- api/tech/oras/delayed_reader.go | 57 ------- api/tech/oras/fetcher.go | 64 ++++++++ api/tech/oras/lister.go | 31 ++++ api/tech/oras/pusher.go | 70 +++++++++ 6 files changed, 187 insertions(+), 192 deletions(-) delete mode 100644 api/tech/oras/delayed_reader.go create mode 100644 api/tech/oras/fetcher.go create mode 100644 api/tech/oras/lister.go create mode 100644 api/tech/oras/pusher.go diff --git a/api/oci/extensions/repositories/ocireg/repository.go b/api/oci/extensions/repositories/ocireg/repository.go index 1f1ede981a..9393cbffa7 100644 --- a/api/oci/extensions/repositories/ocireg/repository.go +++ b/api/oci/extensions/repositories/ocireg/repository.go @@ -164,14 +164,21 @@ func (r *RepositoryImpl) getResolver(comp string) (oras.Resolver, error) { } authClient := &auth.Client{ - Client: client, - Cache: auth.NewCache(), - Credential: auth.StaticCredential(r.info.HostPort(), authCreds), + Client: client, + Cache: auth.NewCache(), + Credential: auth.CredentialFunc(func(ctx context.Context, hostport string) (auth.Credential, error) { + if strings.Contains(hostport, r.info.HostPort()) { + return authCreds, nil + } + logger.Warn("no credentials for host", "host", hostport) + return auth.EmptyCredential, nil + }), } return oras.New(oras.ClientOptions{ Client: authClient, PlainHTTP: r.info.Scheme == "http", + Logger: logger, }), nil } diff --git a/api/tech/oras/client.go b/api/tech/oras/client.go index a73e0003a9..60f4fede67 100644 --- a/api/tech/oras/client.go +++ b/api/tech/oras/client.go @@ -4,11 +4,9 @@ import ( "context" "errors" "fmt" - "io" - "strings" - "sync" "github.com/containerd/containerd/errdefs" + "github.com/mandelsoft/logging" ociv1 "github.com/opencontainers/image-spec/specs-go/v1" oraserr "oras.land/oras-go/v2/errdef" "oras.land/oras-go/v2/registry/remote" @@ -18,55 +16,35 @@ import ( type ClientOptions struct { Client *auth.Client PlainHTTP bool + Logger logging.Logger } type Client struct { client *auth.Client plainHTTP bool - ref string - mu sync.RWMutex + logger logging.Logger } -var ( - _ Resolver = &Client{} - _ Fetcher = &Client{} - _ Pusher = &Client{} - _ Lister = &Client{} -) +var _ Resolver = &Client{} func New(opts ClientOptions) *Client { - return &Client{client: opts.Client, plainHTTP: opts.PlainHTTP} + return &Client{client: opts.Client, plainHTTP: opts.PlainHTTP, logger: opts.Logger} } func (c *Client) Fetcher(ctx context.Context, ref string) (Fetcher, error) { - c.mu.Lock() - defer c.mu.Unlock() - - c.ref = ref - return c, nil + return &OrasFetcher{client: c.client, ref: ref, plainHTTP: c.plainHTTP}, nil } func (c *Client) Pusher(ctx context.Context, ref string) (Pusher, error) { - c.mu.Lock() - defer c.mu.Unlock() - - c.ref = ref - return c, nil + return &OrasPusher{client: c.client, ref: ref, plainHTTP: c.plainHTTP}, nil } func (c *Client) Lister(ctx context.Context, ref string) (Lister, error) { - c.mu.Lock() - defer c.mu.Unlock() - - c.ref = ref - return c, nil + return &OrasLister{client: c.client, ref: ref, plainHTTP: c.plainHTTP}, nil } func (c *Client) Resolve(ctx context.Context, ref string) (string, ociv1.Descriptor, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - src, err := c.createRepository(ref) + src, err := createRepository(ref, c.client, c.plainHTTP) if err != nil { return "", ociv1.Descriptor{}, err } @@ -88,114 +66,16 @@ func (c *Client) Resolve(ctx context.Context, ref string) (string, ociv1.Descrip return "", desc, nil } -func (c *Client) Push(ctx context.Context, d ociv1.Descriptor, src Source) error { - c.mu.RLock() - defer c.mu.RUnlock() - - reader, err := src.Reader() - if err != nil { - return err - } - - repository, err := c.createRepository(c.ref) - if err != nil { - return err - } - - if split := strings.Split(c.ref, ":"); len(split) == 2 { - // Once we get a reference that contains a tag, we need to re-push that - // layer with the reference included. PushReference then will tag - // that layer resulting in the created tag pointing to the right - // blob data. - if err := repository.PushReference(ctx, d, reader, c.ref); err != nil { - return fmt.Errorf("failed to push tag: %w", err) - } - - return nil - } - - // We have a digest, so we use plain push for the digest. - // Push here decides if it's a Manifest or a Blob. - if err := repository.Push(ctx, d, reader); err != nil { - return fmt.Errorf("failed to push: %w, %s", err, c.ref) - } - - return nil -} - -func (c *Client) Fetch(ctx context.Context, desc ociv1.Descriptor) (io.ReadCloser, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - src, err := c.createRepository(c.ref) - if err != nil { - return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) - } - - // oras requires a Resolve to happen before a fetch because - // -1 is an invalid size and results in a content-length mismatch error by design. - // This is a security consideration on ORAS' side. - // manifest is not set in the descriptor - // We explicitly call resolve on manifest first because it might be - // that the mediatype is not set at this point so we don't want ORAS to try to - // select the wrong layer to fetch from. - rdesc, err := src.Manifests().Resolve(ctx, desc.Digest.String()) - if errors.Is(err, oraserr.ErrNotFound) { - rdesc, err = src.Blobs().Resolve(ctx, desc.Digest.String()) - if err != nil { - return nil, fmt.Errorf("failed to resolve fetch blob %q: %w", desc.Digest.String(), err) - } - - delayer := func() (io.ReadCloser, error) { - return src.Blobs().Fetch(ctx, rdesc) - } - - return newDelayedReader(delayer) - } - - if err != nil { - return nil, fmt.Errorf("failed to resolve fetch manifest %q: %w", desc.Digest.String(), err) - } - - // lastly, try a manifest fetch. - fetch, err := src.Fetch(ctx, rdesc) - if err != nil { - return nil, fmt.Errorf("failed to fetch manifest: %w", err) - } - - return fetch, err -} - -func (c *Client) List(ctx context.Context) ([]string, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - src, err := c.createRepository(c.ref) - if err != nil { - return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) - } - - var result []string - if err := src.Tags(ctx, "", func(tags []string) error { - result = append(result, tags...) - return nil - }); err != nil { - return nil, fmt.Errorf("failed to list tags: %w", err) - } - - return result, nil -} - // createRepository creates a new repository representation using the passed in ref. // This is a cheap operation. -func (c *Client) createRepository(ref string) (*remote.Repository, error) { +func createRepository(ref string, client *auth.Client, plain bool) (*remote.Repository, error) { src, err := remote.NewRepository(ref) if err != nil { return nil, fmt.Errorf("failed to create new repository: %w", err) } - src.Client = c.client // set up authenticated client. - src.PlainHTTP = c.plainHTTP + src.Client = client // set up authenticated client. + src.PlainHTTP = plain return src, nil } diff --git a/api/tech/oras/delayed_reader.go b/api/tech/oras/delayed_reader.go deleted file mode 100644 index d07a6f870b..0000000000 --- a/api/tech/oras/delayed_reader.go +++ /dev/null @@ -1,57 +0,0 @@ -package oras - -import ( - "io" -) - -// delayedReader sets up a reader that only fetches a blob -// upon explicit reading request, otherwise, it stores the -// way of getting the reader. -type delayedReader struct { - open func() (io.ReadCloser, error) - rc io.ReadCloser - closed bool -} - -func newDelayedReader(open func() (io.ReadCloser, error)) (*delayedReader, error) { - return &delayedReader{ - open: open, - }, nil -} - -func (d *delayedReader) Read(p []byte) (n int, err error) { - if d.closed { - return 0, io.EOF - } - - reader, err := d.reader() - if err != nil { - return 0, err - } - - return reader.Read(p) -} - -func (d *delayedReader) reader() (io.ReadCloser, error) { - if d.rc != nil { - return d.rc, nil - } - - rc, err := d.open() - if err != nil { - return nil, err - } - - d.rc = rc - return rc, nil -} - -func (d *delayedReader) Close() error { - if d.closed { - return nil - } - - // we close regardless of an error - d.closed = true - return d.rc.Close() -} diff --git a/api/tech/oras/fetcher.go b/api/tech/oras/fetcher.go new file mode 100644 index 0000000000..33c1bf1915 --- /dev/null +++ b/api/tech/oras/fetcher.go @@ -0,0 +1,64 @@ +package oras + +import ( + "context" + "errors" + "fmt" + "io" + + ociv1 "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2/registry/remote/auth" +) + +type OrasFetcher struct { + client *auth.Client + ref string + plainHTTP bool +} + +func (c *OrasFetcher) Fetch(ctx context.Context, desc ociv1.Descriptor) (io.ReadCloser, error) { + src, err := createRepository(c.ref, c.client, c.plainHTTP) + if err != nil { + return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) + } + + // oras requires a Resolve to happen before a fetch because + // -1 or 0 are invalid sizes and result in a content-length mismatch error by design. + // This is a security consideration on ORAS' side. + // For more information (https://github.com/oras-project/oras-go/issues/822#issuecomment-2325622324) + // We explicitly call resolve on manifest first because it might be + // that the mediatype is not set at this point so we don't want ORAS to try to + // select the wrong layer to fetch from. + if desc.Size < 1 || desc.Digest == "" { + rdesc, err := src.Manifests().Resolve(ctx, desc.Digest.String()) + if err != nil { + var berr error + rdesc, berr = src.Blobs().Resolve(ctx, desc.Digest.String()) + if berr != nil { + // also display the first manifest resolve error + err = errors.Join(err, berr) + + return nil, fmt.Errorf("failed to resolve fetch blob %q: %w", desc.Digest.String(), err) + } + + reader, err := src.Blobs().Fetch(ctx, rdesc) + if err != nil { + return nil, fmt.Errorf("failed to fetch blob: %w", err) + } + + return reader, nil + } + + // no error + desc = rdesc + } + + // manifest resolve succeeded return the reader directly + // mediatype of the descriptor should now be set to the correct type. + fetch, err := src.Fetch(ctx, desc) + if err != nil { + return nil, fmt.Errorf("failed to fetch manifest: %w", err) + } + + return fetch, nil +} diff --git a/api/tech/oras/lister.go b/api/tech/oras/lister.go new file mode 100644 index 0000000000..ba74568e45 --- /dev/null +++ b/api/tech/oras/lister.go @@ -0,0 +1,31 @@ +package oras + +import ( + "context" + "fmt" + + "oras.land/oras-go/v2/registry/remote/auth" +) + +type OrasLister struct { + client *auth.Client + ref string + plainHTTP bool +} + +func (c *OrasLister) List(ctx context.Context) ([]string, error) { + src, err := createRepository(c.ref, c.client, c.plainHTTP) + if err != nil { + return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) + } + + var result []string + if err := src.Tags(ctx, "", func(tags []string) error { + result = append(result, tags...) + return nil + }); err != nil { + return nil, fmt.Errorf("failed to list tags: %w", err) + } + + return result, nil +} diff --git a/api/tech/oras/pusher.go b/api/tech/oras/pusher.go new file mode 100644 index 0000000000..784b2ae99f --- /dev/null +++ b/api/tech/oras/pusher.go @@ -0,0 +1,70 @@ +package oras + +import ( + "context" + "fmt" + + "github.com/containerd/errdefs" + ociv1 "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2/registry" + "oras.land/oras-go/v2/registry/remote/auth" + + "ocm.software/ocm/api/oci/ociutils" +) + +type OrasPusher struct { + client *auth.Client + ref string + plainHTTP bool +} + +func (c *OrasPusher) Push(ctx context.Context, d ociv1.Descriptor, src Source) (retErr error) { + reader, err := src.Reader() + if err != nil { + return err + } + + repository, err := createRepository(c.ref, c.client, c.plainHTTP) + if err != nil { + return err + } + + ref, err := registry.ParseReference(c.ref) + if err != nil { + return fmt.Errorf("failed to parse reference %q: %w", c.ref, err) + } + + vers, err := ociutils.ParseVersion(ref.Reference) + if err != nil { + return fmt.Errorf("failed to parse version %q: %w", ref.Reference, err) + } + + if vers.IsTagged() { + // Once we get a reference that contains a tag, we need to re-push that + // layer with the reference included. PushReference then will tag + // that layer resulting in the created tag pointing to the right + // blob data. + if err := repository.PushReference(ctx, d, reader, c.ref); err != nil { + return fmt.Errorf("failed to push tag: %w", err) + } + + return nil + } + + ok, err := repository.Exists(ctx, d) + if err != nil { + return fmt.Errorf("failed to check if repository %q exists: %w", ref.Repository, err) + } + + if ok { + return errdefs.ErrAlreadyExists + } + + // We have a digest, so we use plain push for the digest. + // Push here decides if it's a Manifest or a Blob. + if err := repository.Push(ctx, d, reader); err != nil { + return fmt.Errorf("failed to push: %w, %s", err, c.ref) + } + + return nil +} From 285a20a66ca166fed3672998a25cdbce706ef51f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakob=20M=C3=B6ller?= Date: Fri, 20 Dec 2024 13:33:30 +0100 Subject: [PATCH 2/4] fix: correct fetch order for manifests and blobs with hints on media type (#1209) #### What this PR does / why we need it This PR changes the OCI fetch logic to: 1. Fetch Blobs before Manifests in case there is no media type 2. Fetch Manifests before Blobs or Blobs before Manifests if there is a media type hint #### Which issue(s) this PR fixes --------- Signed-off-by: Gergely Brautigam <182850+Skarlso@users.noreply.github.com> Co-authored-by: Gergely Brautigam <182850+Skarlso@users.noreply.github.com> --- .../repositories/ocireg/repository.go | 2 + api/tech/oras/client.go | 7 +- api/tech/oras/fetcher.go | 78 ++++++++++++------- api/tech/oras/manifest.go | 27 +++++++ api/tech/oras/pusher.go | 10 ++- go.mod | 2 +- 6 files changed, 94 insertions(+), 32 deletions(-) create mode 100644 api/tech/oras/manifest.go diff --git a/api/oci/extensions/repositories/ocireg/repository.go b/api/oci/extensions/repositories/ocireg/repository.go index 9393cbffa7..12fe7276de 100644 --- a/api/oci/extensions/repositories/ocireg/repository.go +++ b/api/oci/extensions/repositories/ocireg/repository.go @@ -11,6 +11,7 @@ import ( "github.com/containerd/errdefs" "github.com/mandelsoft/goutils/errors" "github.com/mandelsoft/logging" + "github.com/moby/locker" "oras.land/oras-go/v2/registry/remote/auth" "oras.land/oras-go/v2/registry/remote/retry" @@ -179,6 +180,7 @@ func (r *RepositoryImpl) getResolver(comp string) (oras.Resolver, error) { Client: authClient, PlainHTTP: r.info.Scheme == "http", Logger: logger, + Lock: locker.New(), }), nil } diff --git a/api/tech/oras/client.go b/api/tech/oras/client.go index 60f4fede67..3cde633d6a 100644 --- a/api/tech/oras/client.go +++ b/api/tech/oras/client.go @@ -7,6 +7,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/mandelsoft/logging" + "github.com/moby/locker" ociv1 "github.com/opencontainers/image-spec/specs-go/v1" oraserr "oras.land/oras-go/v2/errdef" "oras.land/oras-go/v2/registry/remote" @@ -17,18 +18,20 @@ type ClientOptions struct { Client *auth.Client PlainHTTP bool Logger logging.Logger + Lock *locker.Locker } type Client struct { client *auth.Client plainHTTP bool logger logging.Logger + lock *locker.Locker } var _ Resolver = &Client{} func New(opts ClientOptions) *Client { - return &Client{client: opts.Client, plainHTTP: opts.PlainHTTP, logger: opts.Logger} + return &Client{client: opts.Client, plainHTTP: opts.PlainHTTP, logger: opts.Logger, lock: opts.Lock} } func (c *Client) Fetcher(ctx context.Context, ref string) (Fetcher, error) { @@ -36,7 +39,7 @@ func (c *Client) Fetcher(ctx context.Context, ref string) (Fetcher, error) { } func (c *Client) Pusher(ctx context.Context, ref string) (Pusher, error) { - return &OrasPusher{client: c.client, ref: ref, plainHTTP: c.plainHTTP}, nil + return &OrasPusher{client: c.client, ref: ref, plainHTTP: c.plainHTTP, lock: c.lock}, nil } func (c *Client) Lister(ctx context.Context, ref string) (Lister, error) { diff --git a/api/tech/oras/fetcher.go b/api/tech/oras/fetcher.go index 33c1bf1915..a12df685fc 100644 --- a/api/tech/oras/fetcher.go +++ b/api/tech/oras/fetcher.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "io" + "sync" ociv1 "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2/registry/remote" "oras.land/oras-go/v2/registry/remote/auth" ) @@ -14,51 +16,73 @@ type OrasFetcher struct { client *auth.Client ref string plainHTTP bool + mu sync.Mutex } func (c *OrasFetcher) Fetch(ctx context.Context, desc ociv1.Descriptor) (io.ReadCloser, error) { + c.mu.Lock() + defer c.mu.Unlock() + src, err := createRepository(c.ref, c.client, c.plainHTTP) if err != nil { return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) } - // oras requires a Resolve to happen before a fetch because + // oras requires a Resolve to happen in some cases before a fetch because // -1 or 0 are invalid sizes and result in a content-length mismatch error by design. // This is a security consideration on ORAS' side. // For more information (https://github.com/oras-project/oras-go/issues/822#issuecomment-2325622324) - // We explicitly call resolve on manifest first because it might be - // that the mediatype is not set at this point so we don't want ORAS to try to - // select the wrong layer to fetch from. - if desc.Size < 1 || desc.Digest == "" { - rdesc, err := src.Manifests().Resolve(ctx, desc.Digest.String()) - if err != nil { - var berr error - rdesc, berr = src.Blobs().Resolve(ctx, desc.Digest.String()) - if berr != nil { - // also display the first manifest resolve error - err = errors.Join(err, berr) + // + // To workaround, we resolve the correct size + if desc.Size < 1 { + if desc, err = c.resolveDescriptor(ctx, desc, src); err != nil { + return nil, err + } + } - return nil, fmt.Errorf("failed to resolve fetch blob %q: %w", desc.Digest.String(), err) - } + // manifest resolve succeeded return the reader directly + // mediatype of the descriptor should now be set to the correct type. + reader, err := src.Fetch(ctx, desc) + if err != nil { + return nil, fmt.Errorf("failed to fetch blob: %w", err) + } - reader, err := src.Blobs().Fetch(ctx, rdesc) - if err != nil { - return nil, fmt.Errorf("failed to fetch blob: %w", err) - } + return reader, nil +} - return reader, nil +// resolveDescriptor resolves the descriptor by fetching the blob or manifest based on the digest as a reference. +// If the descriptor has a media type, it will be resolved directly. +// If the descriptor has no media type, it will first try to resolve the blob, then the manifest as a fallback. +func (c *OrasFetcher) resolveDescriptor(ctx context.Context, desc ociv1.Descriptor, src *remote.Repository) (ociv1.Descriptor, error) { + if desc.MediaType != "" { + var err error + // if there is a media type, resolve the descriptor directly + if isManifest(src.ManifestMediaTypes, desc) { + desc, err = src.Manifests().Resolve(ctx, desc.Digest.String()) + } else { + desc, err = src.Blobs().Resolve(ctx, desc.Digest.String()) } - - // no error - desc = rdesc + if err != nil { + return ociv1.Descriptor{}, fmt.Errorf("failed to resolve descriptor %q: %w", desc.Digest.String(), err) + } + return desc, nil } - // manifest resolve succeeded return the reader directly - // mediatype of the descriptor should now be set to the correct type. - fetch, err := src.Fetch(ctx, desc) + // if there is no media type, first try the blob, then the manifest + // To reader: DO NOT fetch manifest first, this can result in high latency calls + bdesc, err := src.Blobs().Resolve(ctx, desc.Digest.String()) if err != nil { - return nil, fmt.Errorf("failed to fetch manifest: %w", err) + mdesc, merr := src.Manifests().Resolve(ctx, desc.Digest.String()) + if merr != nil { + // also display the first manifest resolve error + err = errors.Join(err, merr) + + return ociv1.Descriptor{}, fmt.Errorf("failed to resolve manifest %q: %w", desc.Digest.String(), err) + } + desc = mdesc + } else { + desc = bdesc } - return fetch, nil + return desc, err } diff --git a/api/tech/oras/manifest.go b/api/tech/oras/manifest.go new file mode 100644 index 0000000000..cc473d87ff --- /dev/null +++ b/api/tech/oras/manifest.go @@ -0,0 +1,27 @@ +package oras + +import ( + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// defaultManifestMediaTypes contains the default set of manifests media types. +var defaultManifestMediaTypes = []string{ + "application/vnd.docker.distribution.manifest.v2+json", + "application/vnd.docker.distribution.manifest.list.v2+json", + "application/vnd.oci.artifact.manifest.v1+json", + ocispec.MediaTypeImageManifest, + ocispec.MediaTypeImageIndex, +} + +// isManifest determines if the given descriptor points to a manifest. +func isManifest(manifestMediaTypes []string, desc ocispec.Descriptor) bool { + if len(manifestMediaTypes) == 0 { + manifestMediaTypes = defaultManifestMediaTypes + } + for _, mediaType := range manifestMediaTypes { + if desc.MediaType == mediaType { + return true + } + } + return false +} diff --git a/api/tech/oras/pusher.go b/api/tech/oras/pusher.go index 784b2ae99f..0372c13ebd 100644 --- a/api/tech/oras/pusher.go +++ b/api/tech/oras/pusher.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/containerd/errdefs" + "github.com/moby/locker" ociv1 "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2/registry" "oras.land/oras-go/v2/registry/remote/auth" @@ -16,13 +17,20 @@ type OrasPusher struct { client *auth.Client ref string plainHTTP bool + lock *locker.Locker } func (c *OrasPusher) Push(ctx context.Context, d ociv1.Descriptor, src Source) (retErr error) { + c.lock.Lock(c.ref) + defer c.lock.Unlock(c.ref) + reader, err := src.Reader() if err != nil { return err } + defer func() { + reader.Close() + }() repository, err := createRepository(c.ref, c.client, c.plainHTTP) if err != nil { @@ -60,8 +68,6 @@ func (c *OrasPusher) Push(ctx context.Context, d ociv1.Descriptor, src Source) ( return errdefs.ErrAlreadyExists } - // We have a digest, so we use plain push for the digest. - // Push here decides if it's a Manifest or a Blob. if err := repository.Push(ctx, d, reader); err != nil { return fmt.Errorf("failed to push: %w, %s", err, c.ref) } diff --git a/go.mod b/go.mod index 10e02fada5..2eab77b612 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/mikefarah/yq/v4 v4.44.6 github.com/mitchellh/copystructure v1.2.0 github.com/mittwald/go-helm-client v0.12.15 + github.com/moby/locker v1.0.1 github.com/modern-go/reflect2 v1.0.2 github.com/onsi/ginkgo/v2 v2.22.0 github.com/onsi/gomega v1.36.1 @@ -258,7 +259,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect - github.com/moby/locker v1.0.1 // indirect github.com/moby/spdystream v0.5.0 // indirect github.com/moby/sys/capability v0.3.0 // indirect github.com/moby/sys/mountinfo v0.7.2 // indirect From c2605ccca2122d4eb879ed3cedc5afb727569749 Mon Sep 17 00:00:00 2001 From: "ocmbot[bot]" <125909804+ocmbot[bot]@users.noreply.github.com> Date: Fri, 20 Dec 2024 13:52:09 +0100 Subject: [PATCH 3/4] chore: update 'flake.nix' (#1201) Update OCM CLI vendor hash (see: .github/workflows/flake_vendorhash.yaml) Co-authored-by: ocmbot[bot] <125909804+ocmbot[bot]@users.noreply.github.com> --- flake.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index 0f18a3344e..653312c44d 100644 --- a/flake.nix +++ b/flake.nix @@ -35,7 +35,7 @@ state = if (self ? rev) then "clean" else "dirty"; # This vendorHash represents a derivative of all go.mod dependencies and needs to be adjusted with every change - vendorHash = "sha256-3h/k5p/simo4GtVm14mTBD5/4CG7QC8bfo48C6CuCbY="; + vendorHash = "sha256-hDIJjccA6G34X0AAGsPDjZuWfgW0BeiGerv6JGb4Sq8="; src = ./.; From 2cb31876cb74501608f9333bad830ad976848b17 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Fri, 20 Dec 2024 14:03:09 +0100 Subject: [PATCH 4/4] feat: support splitting blobs when stored as OCI layer (#1140) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What this PR does / why we need it There are OCI repositories with a layer size limitation. Because OCM potentially maps any external artifact to a single blob stored as layer on OCI registries, this could lead to problems. An obvious problematic scenario is the transport of a multi-platform OCI image. Its blob format is an archive containing all images and all layers of those images. This PR introduces the possibility to specify blob limits for OCI registries. The OCM-to-OCI mapping then splits larger blobs into multiple layers. The `localBlob` access method then uses a comma-separated list of blob layer-blob digest to remember the sequence of layers. The access then combines the layerblobs again to a single stream. #### Which issue(s) this PR fixes Fixes: https://github.com/open-component-model/ocm-project/issues/12 A follow-up issue https://github.com/open-component-model/ocm-project/issues/338 describes the provisioning of appropriate defaults for common public registries. --------- Co-authored-by: Jakob Möller --- api/credentials/identity/hostpath/identity.go | 11 ++ .../repositories/ocireg/repository.go | 1 + api/ocm/cpi/repocpi/bridge_r.go | 18 +++ .../genericocireg/accessmethod_localblob.go | 136 +++++++++++++++- .../repositories/genericocireg/bloblimits.go | 53 ++++++ .../genericocireg/componentversion.go | 118 +++++++++++--- .../repositories/genericocireg/config/type.go | 110 +++++++++++++ .../repositories/genericocireg/config_test.go | 62 +++++++ .../repositories/genericocireg/repo_test.go | 151 ++++++++++++++++++ .../repositories/genericocireg/repository.go | 58 ++++++- .../repositories/genericocireg/type.go | 26 ++- docs/reference/ocm_configfile.md | 22 +++ 12 files changed, 732 insertions(+), 34 deletions(-) create mode 100644 api/ocm/extensions/repositories/genericocireg/bloblimits.go create mode 100644 api/ocm/extensions/repositories/genericocireg/config/type.go create mode 100644 api/ocm/extensions/repositories/genericocireg/config_test.go diff --git a/api/credentials/identity/hostpath/identity.go b/api/credentials/identity/hostpath/identity.go index baa2aad8a9..968412dda6 100644 --- a/api/credentials/identity/hostpath/identity.go +++ b/api/credentials/identity/hostpath/identity.go @@ -157,3 +157,14 @@ func PathPrefix(id cpi.ConsumerIdentity) string { } return strings.TrimPrefix(id[ID_PATHPREFIX], "/") } + +func HostPort(id cpi.ConsumerIdentity) string { + if id == nil { + return "" + } + host := id[ID_HOSTNAME] + if port, ok := id[ID_PORT]; ok { + return host + ":" + port + } + return host +} diff --git a/api/oci/extensions/repositories/ocireg/repository.go b/api/oci/extensions/repositories/ocireg/repository.go index 12fe7276de..cc2c845ac0 100644 --- a/api/oci/extensions/repositories/ocireg/repository.go +++ b/api/oci/extensions/repositories/ocireg/repository.go @@ -70,6 +70,7 @@ func NewRepository(ctx cpi.Context, spec *RepositorySpec, info *RepositoryInfo) spec: spec, info: info, } + i.logger.Debug("created repository") return cpi.NewRepository(i), nil } diff --git a/api/ocm/cpi/repocpi/bridge_r.go b/api/ocm/cpi/repocpi/bridge_r.go index f7eea176fe..53ed151f5a 100644 --- a/api/ocm/cpi/repocpi/bridge_r.go +++ b/api/ocm/cpi/repocpi/bridge_r.go @@ -34,6 +34,24 @@ type RepositoryImpl interface { io.Closer } +// Chunked is an optional interface, which +// may be implemented to accept a blob limit for mapping +// local blobs to an external storage system. +type Chunked interface { + // SetBlobLimit sets the blob limit if possible. + // It returns true, if this was successful. + SetBlobLimit(s int64) bool +} + +// SetBlobLimit tries to set a blob limit for a repository +// implementation. It returns true, if this was possible. +func SetBlobLimit(i RepositoryImpl, s int64) bool { + if c, ok := i.(Chunked); ok { + return c.SetBlobLimit(s) + } + return false +} + type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository] type repositoryBridge struct { diff --git a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go index 94a0a85966..c661c9f294 100644 --- a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go +++ b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go @@ -1,10 +1,14 @@ package genericocireg import ( + "bytes" "io" + "os" + "strings" "sync" "github.com/mandelsoft/goutils/errors" + "github.com/mandelsoft/goutils/finalizer" "github.com/opencontainers/go-digest" "ocm.software/ocm/api/oci" @@ -88,9 +92,19 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) { return nil, errors.ErrNotImplemented("artifact blob synthesis") } } - _, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference)) - if err != nil { - return nil, err + refs := strings.Split(m.spec.LocalReference, ",") + + var ( + data blobaccess.DataAccess + err error + ) + if len(refs) < 2 { + _, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference)) + if err != nil { + return nil, err + } + } else { + data = &composedBlock{m, refs} } m.data = data return m.data, err @@ -111,3 +125,119 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) { func (m *localBlobAccessMethod) MimeType() string { return m.spec.MediaType } + +//////////////////////////////////////////////////////////////////////////////// + +type composedBlock struct { + m *localBlobAccessMethod + refs []string +} + +var _ blobaccess.DataAccess = (*composedBlock)(nil) + +func (c *composedBlock) Get() ([]byte, error) { + buf := bytes.NewBuffer(nil) + for _, ref := range c.refs { + var finalize finalizer.Finalizer + + _, data, err := c.m.namespace.GetBlobData(digest.Digest(ref)) + if err != nil { + return nil, err + } + finalize.Close(data) + r, err := data.Reader() + if err != nil { + return nil, err + } + finalize.Close(r) + _, err = io.Copy(buf, r) + if err != nil { + return nil, err + } + err = finalize.Finalize() + if err != nil { + return nil, err + } + } + return buf.Bytes(), nil +} + +func (c *composedBlock) Reader() (io.ReadCloser, error) { + return &composedReader{ + m: c.m, + refs: c.refs, + }, nil +} + +func (c *composedBlock) Close() error { + return nil +} + +type composedReader struct { + lock sync.Mutex + m *localBlobAccessMethod + refs []string + reader io.ReadCloser + data blobaccess.DataAccess +} + +func (c *composedReader) Read(p []byte) (n int, err error) { + c.lock.Lock() + defer c.lock.Unlock() + + for { + if c.reader != nil { + n, err := c.reader.Read(p) + + if err == io.EOF { + c.reader.Close() + c.data.Close() + c.refs = c.refs[1:] + c.reader = nil + c.data = nil + // start new layer and return partial (>0) read before next layer is started + err = nil + } + // return partial read (even a zero read if layer is not yet finished) or error + if c.reader != nil || err != nil || n > 0 { + return n, err + } + // otherwise, we can use the given buffer for the next layer + + // now, we have to check for a next succeeding layer. + // This means to finish with the actual reader and continue + // with the next one. + } + + // If no more layers are available, report EOF. + if len(c.refs) == 0 { + return 0, io.EOF + } + + ref := strings.TrimSpace(c.refs[0]) + _, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref)) + if err != nil { + return 0, err + } + c.reader, err = c.data.Reader() + if err != nil { + return 0, err + } + } +} + +func (c *composedReader) Close() error { + c.lock.Lock() + defer c.lock.Unlock() + + if c.reader == nil && c.refs == nil { + return os.ErrClosed + } + if c.reader != nil { + c.reader.Close() + c.data.Close() + c.reader = nil + c.refs = nil + } + return nil +} diff --git a/api/ocm/extensions/repositories/genericocireg/bloblimits.go b/api/ocm/extensions/repositories/genericocireg/bloblimits.go new file mode 100644 index 0000000000..c4ecfc799a --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/bloblimits.go @@ -0,0 +1,53 @@ +package genericocireg + +import ( + "sync" + + configctx "ocm.software/ocm/api/config" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" +) + +var ( + defaultBlobLimits config.BlobLimits + lock sync.Mutex +) + +const ( + KB = int64(1000) + MB = 1000 * KB + GB = 1000 * MB +) + +func init() { + defaultBlobLimits = config.BlobLimits{} + + // Add limits for known OCI repositories, here, + // or provide init functions in specialized packages + // by calling AddDefaultBlobLimit. + AddDefaultBlobLimit("ghcr.io", 10*GB) // https://github.com/orgs/community/discussions/77429 +} + +// AddDefaultBlobLimit can be used to set default blob limits +// for known repositories. +// Those limits will be overwritten, by blob limits +// given by a configuration object and the repository +// specification. +func AddDefaultBlobLimit(name string, limit int64) { + lock.Lock() + defer lock.Unlock() + + defaultBlobLimits[name] = limit +} + +func ConfigureBlobLimits(ctx configctx.ContextProvider, target config.Configurable) { + if target != nil { + lock.Lock() + defer lock.Unlock() + + target.ConfigureBlobLimits(defaultBlobLimits) + + if ctx != nil { + ctx.ConfigContext().ApplyTo(0, target) + } + } +} diff --git a/api/ocm/extensions/repositories/genericocireg/componentversion.go b/api/ocm/extensions/repositories/genericocireg/componentversion.go index f603fbe509..d332ca2527 100644 --- a/api/ocm/extensions/repositories/genericocireg/componentversion.go +++ b/api/ocm/extensions/repositories/genericocireg/componentversion.go @@ -2,13 +2,16 @@ package genericocireg import ( "fmt" + "io" "path" "strings" "github.com/mandelsoft/goutils/errors" "github.com/mandelsoft/goutils/set" + "github.com/mandelsoft/vfs/pkg/vfs" "github.com/opencontainers/go-digest" + "ocm.software/ocm/api/datacontext/attrs/vfsattr" "ocm.software/ocm/api/oci" "ocm.software/ocm/api/oci/artdesc" "ocm.software/ocm/api/oci/extensions/repositories/artifactset" @@ -25,7 +28,9 @@ import ( ocihdlr "ocm.software/ocm/api/ocm/extensions/blobhandler/handlers/oci" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/accessobj" + "ocm.software/ocm/api/utils/blobaccess" "ocm.software/ocm/api/utils/errkind" + "ocm.software/ocm/api/utils/mime" common "ocm.software/ocm/api/utils/misc" "ocm.software/ocm/api/utils/refmgmt" "ocm.software/ocm/api/utils/runtime" @@ -183,11 +188,11 @@ func (c *ComponentVersionContainer) Update() (bool, error) { layers.Add(i) } for i, r := range desc.Resources { - s, l, err := c.evalLayer(r.Access) + s, list, err := c.evalLayer(r.Access) if err != nil { return false, fmt.Errorf("failed resource layer evaluation: %w", err) } - if l > 0 { + for _, l := range list { layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ Kind: ARTKIND_RESOURCE, Identity: r.GetIdentity(desc.Resources), @@ -199,11 +204,11 @@ func (c *ComponentVersionContainer) Update() (bool, error) { } } for i, r := range desc.Sources { - s, l, err := c.evalLayer(r.Access) + s, list, err := c.evalLayer(r.Access) if err != nil { return false, fmt.Errorf("failed source layer evaluation: %w", err) } - if l > 0 { + for _, l := range list { layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ Kind: ARTKIND_SOURCE, Identity: r.GetIdentity(desc.Sources), @@ -259,32 +264,45 @@ func (c *ComponentVersionContainer) Update() (bool, error) { return false, nil } -func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, int, error) { - var d *artdesc.Descriptor +func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, []int, error) { + var ( + d *artdesc.Descriptor + layernums []int + ) spec, err := c.GetContext().AccessSpecForSpec(s) if err != nil { - return s, 0, err + return s, nil, err } if a, ok := spec.(*localblob.AccessSpec); ok { if ok, _ := artdesc.IsDigest(a.LocalReference); !ok { - return s, 0, errors.ErrInvalid("digest", a.LocalReference) + return s, nil, errors.ErrInvalid("digest", a.LocalReference) } - d = &artdesc.Descriptor{Digest: digest.Digest(a.LocalReference), MediaType: a.GetMimeType()} - } - if d != nil { - // find layer - layers := c.manifest.GetDescriptor().Layers - maxLen := len(layers) - 1 - for i := range layers { - l := layers[len(layers)-1-i] - if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) { - return s, len(layers) - 1 - i, nil + refs := strings.Split(a.LocalReference, ",") + media := a.GetMimeType() + if len(refs) > 1 { + media = mime.MIME_OCTET + } + for _, ref := range refs { + d = &artdesc.Descriptor{Digest: digest.Digest(strings.TrimSpace(ref)), MediaType: media} + // find layer + layers := c.manifest.GetDescriptor().Layers + maxLen := len(layers) - 1 + found := false + for i := maxLen; i > 0; i-- { // layer 0 is the component descriptor + l := layers[i] + if l.Digest == d.Digest { + layernums = append(layernums, i) + found = true + break + } + } + if !found { + return s, nil, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType) } } - return s, 0, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType) } - return s, 0, nil + return s, layernums, nil } func (c *ComponentVersionContainer) GetDescriptor() *compdesc.ComponentDescriptor { @@ -299,20 +317,74 @@ func (c *ComponentVersionContainer) GetStorageContext() cpi.StorageContext { return ocihdlr.New(c.comp.GetName(), c.Repository(), c.comp.repo.ocirepo.GetSpecification().GetKind(), c.comp.repo.ocirepo, c.comp.namespace, c.manifest) } +func blobAccessForChunk(blob blobaccess.BlobAccess, fs vfs.FileSystem, r io.Reader, limit int64) (cpi.BlobAccess, bool, error) { + f, err := blobaccess.NewTempFile("", "chunk-*", fs) + if err != nil { + return nil, true, err + } + written, err := io.CopyN(f.Writer(), r, limit) + if err != nil && !errors.Is(err, io.EOF) { + f.Close() + return nil, false, err + } + if written <= 0 { + f.Close() + return nil, false, nil + } + return f.AsBlob(blob.MimeType()), written == limit, nil +} + func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { if blob == nil { return nil, errors.New("a resource has to be defined") } + fs := vfsattr.Get(c.GetContext()) + size := blob.Size() + limit := c.comp.repo.blobLimit + var refs []string + if limit > 0 && size != blobaccess.BLOB_UNKNOWN_SIZE && size > limit { + reader, err := blob.Reader() + if err != nil { + return nil, err + } + defer reader.Close() + + var b blobaccess.BlobAccess + cont := true + for cont { + b, cont, err = blobAccessForChunk(blob, fs, reader, limit) + if err != nil { + return nil, err + } + if b != nil { + err = c.addLayer(b, &refs) + b.Close() + if err != nil { + return nil, err + } + } + } + } else { + err := c.addLayer(blob, &refs) + if err != nil { + return nil, err + } + } + return localblob.New(strings.Join(refs, ","), refName, blob.MimeType(), global), nil +} + +func (c *ComponentVersionContainer) addLayer(blob cpi.BlobAccess, refs *[]string) error { err := c.manifest.AddBlob(blob) if err != nil { - return nil, err + return err } err = ocihdlr.AssureLayer(c.manifest.GetDescriptor(), blob) if err != nil { - return nil, err + return err } - return localblob.New(blob.Digest().String(), refName, blob.MimeType(), global), nil + *refs = append(*refs, blob.Digest().String()) + return nil } // AssureGlobalRef provides a global manifest for a local OCI Artifact. diff --git a/api/ocm/extensions/repositories/genericocireg/config/type.go b/api/ocm/extensions/repositories/genericocireg/config/type.go new file mode 100644 index 0000000000..137c1d2688 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/config/type.go @@ -0,0 +1,110 @@ +package config + +import ( + "net" + "strings" + + "ocm.software/ocm/api/config" + cfgcpi "ocm.software/ocm/api/config/cpi" + "ocm.software/ocm/api/utils/runtime" +) + +const ( + ConfigType = "blobLimits.ocireg.ocm" + cfgcpi.OCM_CONFIG_TYPE_SUFFIX + ConfigTypeV1 = ConfigType + runtime.VersionSeparator + "v1" +) + +func init() { + cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigType, usage)) + cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigTypeV1)) +} + +// Config describes a memory based config interface +// for configuring blob limits for underlying OCI manifest layers. +type Config struct { + runtime.ObjectVersionedType `json:",inline"` + // BlobLimits describe the limit setting for host:port + // entries. As a special case (for testing) it is possible + // to configure limits for CTF, also, by using "@"+filepath. + BlobLimits BlobLimits `json:"blobLimits"` +} + +type BlobLimits map[string]int64 + +func (b BlobLimits) GetLimit(hostport string) int64 { + if b == nil { + return -1 + } + l, ok := b[hostport] + if ok { + return l + } + + if !strings.HasPrefix(hostport, "@") { + host, _, err := net.SplitHostPort(hostport) + if err == nil { + l, ok = b[host] + if ok { + return l + } + } + } + return -1 +} + +type Configurable interface { + ConfigureBlobLimits(limits BlobLimits) +} + +// New creates a blob limit ConfigSpec. +func New() *Config { + return &Config{ + ObjectVersionedType: runtime.NewVersionedTypedObject(ConfigType), + } +} + +func (a *Config) GetType() string { + return ConfigType +} + +func (a *Config) AddLimit(hostport string, limit int64) { + if a.BlobLimits == nil { + a.BlobLimits = BlobLimits{} + } + a.BlobLimits[hostport] = limit +} + +func (a *Config) ApplyTo(ctx config.Context, target interface{}) error { + t, ok := target.(Configurable) + if !ok { + return config.ErrNoContext(ConfigType) + } + if a.BlobLimits != nil { + t.ConfigureBlobLimits(a.BlobLimits) + } + return nil +} + +const usage = ` +The config type ` + ConfigType + ` can be used to set some +blob layer limits for particular OCI registries used to host OCM repositories. +The blobLimits field maps a OCI registry address to the blob limit to use: + +
+    type: ` + ConfigType + `
+    blobLimits:
+        dummy.io: 65564
+        dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit
+
+ +If blob limits apply to a registry, local blobs with a size larger than +the configured limit will be split into several layers with a maximum +size of the given value. + +These settings can be overwritten by explicit settings in an OCM +repository specification for those repositories. + +The most specific entry will be used. If a registry with a dedicated +port is requested, but no explicit configuration is found, the +setting for the sole hostname is used (if configured). +` diff --git a/api/ocm/extensions/repositories/genericocireg/config_test.go b/api/ocm/extensions/repositories/genericocireg/config_test.go new file mode 100644 index 0000000000..2435497a81 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/config_test.go @@ -0,0 +1,62 @@ +package genericocireg_test + +import ( + "reflect" + + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "ocm.software/ocm/api/datacontext" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" + + "github.com/mandelsoft/goutils/finalizer" + "github.com/mandelsoft/vfs/pkg/osfs" + "github.com/mandelsoft/vfs/pkg/vfs" + "ocm.software/ocm/api/oci" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/ocm" + "ocm.software/ocm/api/ocm/cpi/repocpi" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg" + "ocm.software/ocm/api/utils/accessio" + "ocm.software/ocm/api/utils/accessobj" +) + +var _ = Describe("component repository mapping", func() { + var tempfs vfs.FileSystem + + var ocispec oci.RepositorySpec + var spec *genericocireg.RepositorySpec + + BeforeEach(func() { + t, err := osfs.NewTempFileSystem() + Expect(err).To(Succeed()) + tempfs = t + + // ocmlog.Context().AddRule(logging.NewConditionRule(logging.TraceLevel, accessio.ALLOC_REALM)) + + ocispec, err = ctf.NewRepositorySpec(accessobj.ACC_CREATE, "test", accessio.PathFileSystem(tempfs), accessobj.FormatDirectory) + Expect(err).To(Succeed()) + spec = genericocireg.NewRepositorySpec(ocispec, nil) + }) + + AfterEach(func() { + vfs.Cleanup(tempfs) + }) + + It("creates a dummy component with configured chunks", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + ctx := ocm.New(datacontext.MODE_EXTENDED) + + cfg := config.New() + cfg.AddLimit("@test", 5) + ctx.ConfigContext().ApplyConfig(cfg, "direct") + + repo := finalizer.ClosingWith(&finalize, Must(ctx.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + + Expect(impl.(*genericocireg.RepositoryImpl).GetBlobLimit()).To(Equal(int64(5))) + }) +}) diff --git a/api/ocm/extensions/repositories/genericocireg/repo_test.go b/api/ocm/extensions/repositories/genericocireg/repo_test.go index 5ea992c69a..a892bf0f9c 100644 --- a/api/ocm/extensions/repositories/genericocireg/repo_test.go +++ b/api/ocm/extensions/repositories/genericocireg/repo_test.go @@ -2,6 +2,7 @@ package genericocireg_test import ( "fmt" + "io" "path" "reflect" @@ -123,6 +124,156 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }) + const ref4 = "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08,sha256:3a6eb0790f39ac87c94f3856b2dd2c5d110e6811602261a9a923d3bb23adc8b7" + const ref5 = "sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d" + const ref8 = "sha256:" + ocmtesthelper.D_TESTDATA + + DescribeTable("creates a dummy component with chunks", func(limit int, f func(ocm.ResourceAccess), ref string) { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + repo := finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + repocpi.SetBlobLimit(impl, int64(limit)) + + comp := finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers := finalizer.ClosingWith(&finalize, Must(comp.NewVersion("v1"))) + + m1 := compdesc.NewResourceMeta("rsc1", resourcetypes.PLAIN_TEXT, metav1.LocalRelation) + blob := blobaccess.ForString(mime.MIME_TEXT, ocmtesthelper.S_TESTDATA) + MustBeSuccessful(vers.SetResourceBlob(m1, blob, "", nil)) + + MustBeSuccessful(comp.AddVersion(vers)) + + noref := vers.Repository() + Expect(noref).NotTo(BeNil()) + Expect(noref.IsClosed()).To(BeFalse()) + Expect(noref.Close()).To(Succeed()) + Expect(noref.IsClosed()).To(BeFalse()) + + MustBeSuccessful(finalize.Finalize()) + + Expect(noref.IsClosed()).To(BeTrue()) + Expect(noref.Close()).To(MatchError("closed")) + ExpectError(noref.LookupComponent("dummy")).To(MatchError("closed")) + + // access it again + repo = finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + + ok := Must(repo.ExistsComponentVersion(COMPONENT, "v1")) + Expect(ok).To(BeTrue()) + + comp = finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers = finalizer.ClosingWith(&finalize, Must(comp.LookupVersion("v1"))) + + rsc := Must(vers.GetResourceByIndex(0)) + acc := Must(rsc.Access()) + local, ok := acc.(*localblob.AccessSpec) + Expect(ok).To(BeTrue()) + // fmt.Printf("localref: %s\n", local.LocalReference) + Expect(local.LocalReference).To(Equal(ref)) + Expect(rsc.Meta().Digest).NotTo(BeNil()) + Expect(rsc.Meta().Digest.Value).To(Equal(ocmtesthelper.D_TESTDATA)) + + f(rsc) + + MustBeSuccessful(finalize.Finalize()) + }, + Entry("get blob", 5, func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob", 5, func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob with small buffer", 5, func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + + Entry("get blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob with small buffer (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + + Entry("get blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + Entry("stream blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + Entry("stream blob with small buffer (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + ) + It("handles legacylocalociblob access method", func() { var finalize finalizer.Finalizer defer Defer(finalize.Finalize) diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index 70661f76f5..8773350ea5 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -11,12 +11,16 @@ import ( "github.com/mandelsoft/goutils/general" "ocm.software/ocm/api/credentials" + "ocm.software/ocm/api/credentials/identity/hostpath" "ocm.software/ocm/api/datacontext" "ocm.software/ocm/api/oci" ocicpi "ocm.software/ocm/api/oci/cpi" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/oci/extensions/repositories/ocireg" "ocm.software/ocm/api/ocm/cpi" "ocm.software/ocm/api/ocm/cpi/repocpi" "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/componentmapping" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" ) type OCIBasedRepository interface { @@ -44,23 +48,69 @@ type RepositoryImpl struct { nonref cpi.Repository ocirepo oci.Repository readonly bool + // blobLimit is the size limit for layers maintained for the storage of localBlobs. + // The value -1 means an unconfigured value (a default from the blob limit configuration is used), + // a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), + // a value > 0 enabled the usage of the specified size. + blobLimit int64 } var ( _ repocpi.RepositoryImpl = (*RepositoryImpl)(nil) _ credentials.ConsumerIdentityProvider = (*RepositoryImpl)(nil) + _ config.Configurable = (*RepositoryImpl)(nil) ) -func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository) cpi.Repository { +// NewRepository creates a new OCM repository based on any OCI abstraction from +// the OCI context type. +// The optional blobLimit is the size limit for layers maintained for the storage of localBlobs. +// The value -1 means an unconfigured value (a default from the blob limit configuration is used), +// a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), +// a value > 0 enabled the usage of the specified size. +func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository, blobLimit ...int64) cpi.Repository { ctx := datacontext.InternalContextRef(ctxp.OCMContext()) + impl := &RepositoryImpl{ - ctx: ctx, - meta: *DefaultComponentRepositoryMeta(meta), - ocirepo: ocirepo, + ctx: ctx, + meta: *DefaultComponentRepositoryMeta(meta), + ocirepo: ocirepo, + blobLimit: general.OptionalDefaulted(-1, blobLimit...), + } + if impl.blobLimit < 0 { + ConfigureBlobLimits(ctxp.OCMContext(), impl) } return repocpi.NewRepository(impl, "OCM repo[OCI]") } +func (r *RepositoryImpl) ConfigureBlobLimits(limits config.BlobLimits) { + if len(limits) == 0 { + return + } + if spec, ok := r.ocirepo.GetSpecification().(*ocireg.RepositorySpec); ok { + id := spec.GetConsumerId() + hp := hostpath.HostPort(id) + l := limits.GetLimit(hp) + if l >= 0 { + r.blobLimit = l + } + } + if spec, ok := r.ocirepo.GetSpecification().(*ctf.RepositorySpec); ok { + l := limits.GetLimit("@" + spec.FilePath) + if l >= 0 { + r.blobLimit = l + } + } +} + +func (r *RepositoryImpl) SetBlobLimit(s int64) bool { + r.blobLimit = s + return true +} + +func (r *RepositoryImpl) GetBlobLimit() int64 { + return r.blobLimit +} + func (r *RepositoryImpl) Close() error { return r.ocirepo.Close() } diff --git a/api/ocm/extensions/repositories/genericocireg/type.go b/api/ocm/extensions/repositories/genericocireg/type.go index 809d598140..b3c4b460e5 100644 --- a/api/ocm/extensions/repositories/genericocireg/type.go +++ b/api/ocm/extensions/repositories/genericocireg/type.go @@ -91,6 +91,7 @@ func NewComponentRepositoryMeta(subPath string, mapping ComponentNameMapping) *C type RepositorySpec struct { oci.RepositorySpec ComponentRepositoryMeta + BlobLimit *int64 } var ( @@ -127,19 +128,28 @@ func (a *RepositorySpec) AsUniformSpec(cpi.Context) *cpi.UniformRepositorySpec { return &cpi.UniformRepositorySpec{Type: a.GetKind(), Scheme: spec.Scheme, Host: spec.Host, Info: spec.Info, TypeHint: spec.TypeHint, SubPath: a.SubPath} } +type meta struct { + ComponentRepositoryMeta `json:",inline"` + BlobLimit *int64 `json:"blobLimit,omitempty"` +} + func (u *RepositorySpec) UnmarshalJSON(data []byte) error { logrus.Debugf("unmarshal generic ocireg spec %s\n", string(data)) ocispec := &oci.GenericRepositorySpec{} if err := json.Unmarshal(data, ocispec); err != nil { return err } - compmeta := &ComponentRepositoryMeta{} - if err := json.Unmarshal(data, ocispec); err != nil { + + var m meta + if err := json.Unmarshal(data, &m); err != nil { return err } u.RepositorySpec = ocispec - u.ComponentRepositoryMeta = *compmeta + u.ComponentRepositoryMeta = m.ComponentRepositoryMeta + if m.BlobLimit != nil { + u.BlobLimit = m.BlobLimit + } normalizers.Normalize(u) return nil @@ -154,7 +164,12 @@ func (u RepositorySpec) MarshalJSON() ([]byte, error) { if err != nil { return nil, err } - compmeta, err := runtime.ToUnstructuredObject(u.ComponentRepositoryMeta) + + m := meta{ + ComponentRepositoryMeta: u.ComponentRepositoryMeta, + BlobLimit: u.BlobLimit, + } + compmeta, err := runtime.ToUnstructuredObject(&m) if err != nil { return nil, err } @@ -166,6 +181,9 @@ func (s *RepositorySpec) Repository(ctx cpi.Context, creds credentials.Credentia if err != nil { return nil, err } + if s.BlobLimit != nil { + return NewRepository(ctx, &s.ComponentRepositoryMeta, r, *s.BlobLimit), nil + } return NewRepository(ctx, &s.ComponentRepositoryMeta, r), nil } diff --git a/docs/reference/ocm_configfile.md b/docs/reference/ocm_configfile.md index ae9e51920d..4f5282064d 100644 --- a/docs/reference/ocm_configfile.md +++ b/docs/reference/ocm_configfile.md @@ -24,6 +24,28 @@ The following configuration types are supported: <name>: <yaml defining the attribute> ... +- blobLimits.ocireg.ocm.config.ocm.software + The config type blobLimits.ocireg.ocm.config.ocm.software can be used to set some + blob layer limits for particular OCI registries used to host OCM repositories. + The blobLimits field maps a OCI registry address to the blob limit to use: + +
+      type: blobLimits.ocireg.ocm.config.ocm.software
+      blobLimits:
+          dummy.io: 65564
+          dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit
+  
+ + If blob limits apply to a registry, local blobs with a size larger than + the configured limit will be split into several layers with a maximum + size of the given value. + + These settings can be overwritten by explicit settings in an OCM + repository specification for those repositories. + + The most specific entry will be used. If a registry with a dedicated + port is requested, but no explicit configuration is found, the + setting for the sole hostname is used (if configured). - cli.ocm.config.ocm.software The config type cli.ocm.config.ocm.software is used to handle the main configuration flags of the OCM command line tool.