Skip to content

Commit

Permalink
OCI layer size limit
Browse files Browse the repository at this point in the history
  • Loading branch information
mandelsoft committed Nov 25, 2024
1 parent 2c03823 commit 879378a
Show file tree
Hide file tree
Showing 11 changed files with 493 additions and 132 deletions.
10 changes: 10 additions & 0 deletions api/ocm/cpi/repocpi/bridge_r.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ type RepositoryImpl interface {
io.Closer
}

type Chunked interface {
SetBlobLimit(s int64)
}

func SetBlobLimit(i RepositoryImpl, s int64) {
if c, ok := i.(Chunked); ok {
c.SetBlobLimit(s)
}
}

type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository]

type repositoryBridge struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package genericocireg

import (
"bytes"
"io"
"strings"
"sync"

"github.com/mandelsoft/goutils/errors"
"github.com/mandelsoft/goutils/finalizer"
"github.com/opencontainers/go-digest"

"ocm.software/ocm/api/oci"
Expand Down Expand Up @@ -88,7 +91,17 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) {
return nil, errors.ErrNotImplemented("artifact blob synthesis")
}
}
_, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
refs := strings.Split(m.spec.LocalReference, ",")

var (
data blobaccess.DataAccess
err error
)
if len(refs) < 2 {
_, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
} else {
data = &composedBlock{m, refs}
}
if err != nil {
return nil, err
}
Expand All @@ -111,3 +124,107 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) {
func (m *localBlobAccessMethod) MimeType() string {
return m.spec.MediaType
}

////////////////////////////////////////////////////////////////////////////////

type composedBlock struct {
m *localBlobAccessMethod
refs []string
}

var _ blobaccess.DataAccess = (*composedBlock)(nil)

func (c *composedBlock) Get() ([]byte, error) {
buf := bytes.NewBuffer(nil)
for _, ref := range c.refs {
var finalize finalizer.Finalizer

_, data, err := c.m.namespace.GetBlobData(digest.Digest(ref))
if err != nil {
return nil, err
}
finalize.Close(data)
r, err := data.Reader()
if err != nil {
return nil, err
}
finalize.Close(r)
_, err = io.Copy(buf, r)
if err != nil {
return nil, err
}
err = finalize.Finalize()
if err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}

func (c *composedBlock) Reader() (io.ReadCloser, error) {
return &composedReader{
m: c.m,
refs: c.refs,
}, nil
}

func (c composedBlock) Close() error {
return nil
}

type composedReader struct {
lock sync.Mutex
m *localBlobAccessMethod
refs []string
reader io.ReadCloser
data blobaccess.DataAccess
}

func (c *composedReader) Read(p []byte) (n int, err error) {
c.lock.Lock()
defer c.lock.Unlock()

for {
if c.reader != nil {
n, err := c.reader.Read(p)
if n > 0 {
if err == io.EOF {
err = nil
}
return n, err
}
if err != nil {
return n, err
}
c.reader.Close()
c.data.Close()
c.reader = nil
}
if len(c.refs) == 0 {
return 0, io.EOF
}

ref := strings.TrimSpace(c.refs[0])
_, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref))
if err != nil {
return 0, err
}
c.reader, err = c.data.Reader()
if err != nil {
return 0, err
}
}
}

func (c *composedReader) Close() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.reader != nil {
c.reader.Close()
c.data.Close()
c.reader = nil
c.refs = nil
}
return nil
}
121 changes: 88 additions & 33 deletions api/ocm/extensions/repositories/genericocireg/componentversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/mandelsoft/goutils/set"
"github.com/opencontainers/go-digest"

"ocm.software/ocm/api/datacontext/attrs/vfsattr"
"ocm.software/ocm/api/oci"
"ocm.software/ocm/api/oci/artdesc"
"ocm.software/ocm/api/oci/extensions/repositories/artifactset"
Expand All @@ -25,7 +26,10 @@ import (
ocihdlr "ocm.software/ocm/api/ocm/extensions/blobhandler/handlers/oci"
"ocm.software/ocm/api/utils/accessio"
"ocm.software/ocm/api/utils/accessobj"
"ocm.software/ocm/api/utils/blobaccess/blobaccess"
"ocm.software/ocm/api/utils/blobaccess/chunked"
"ocm.software/ocm/api/utils/errkind"
"ocm.software/ocm/api/utils/mime"
common "ocm.software/ocm/api/utils/misc"
"ocm.software/ocm/api/utils/refmgmt"
"ocm.software/ocm/api/utils/runtime"
Expand Down Expand Up @@ -183,32 +187,36 @@ func (c *ComponentVersionContainer) Update() (bool, error) {
layers.Add(i)
}
for i, r := range desc.Resources {
s, l, err := c.evalLayer(r.Access)
s, list, err := c.evalLayer(r.Access)
if err != nil {
return false, fmt.Errorf("failed resource layer evaluation: %w", err)
}
if l > 0 {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Kind: ARTKIND_RESOURCE,
Identity: r.GetIdentity(desc.Resources),
})
layers.Delete(l)
if len(list) > 0 {
for _, l := range list {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Kind: ARTKIND_RESOURCE,
Identity: r.GetIdentity(desc.Resources),
})
layers.Delete(l)
}
}
if s != r.Access {
desc.Resources[i].Access = s
}
}
for i, r := range desc.Sources {
s, l, err := c.evalLayer(r.Access)
s, list, err := c.evalLayer(r.Access)
if err != nil {
return false, fmt.Errorf("failed source layer evaluation: %w", err)
}
if l > 0 {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Kind: ARTKIND_SOURCE,
Identity: r.GetIdentity(desc.Sources),
})
layers.Delete(l)
if len(list) > 0 {
for _, l := range list {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Kind: ARTKIND_SOURCE,
Identity: r.GetIdentity(desc.Sources),
})
layers.Delete(l)
}
}
if s != r.Access {
desc.Sources[i].Access = s
Expand Down Expand Up @@ -259,32 +267,45 @@ func (c *ComponentVersionContainer) Update() (bool, error) {
return false, nil
}

func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, int, error) {
var d *artdesc.Descriptor
func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, []int, error) {
var (
d *artdesc.Descriptor
layernums []int
)

spec, err := c.GetContext().AccessSpecForSpec(s)
if err != nil {
return s, 0, err
return s, nil, err
}
if a, ok := spec.(*localblob.AccessSpec); ok {
if ok, _ := artdesc.IsDigest(a.LocalReference); !ok {
return s, 0, errors.ErrInvalid("digest", a.LocalReference)
return s, nil, errors.ErrInvalid("digest", a.LocalReference)
}
d = &artdesc.Descriptor{Digest: digest.Digest(a.LocalReference), MediaType: a.GetMimeType()}
}
if d != nil {
// find layer
layers := c.manifest.GetDescriptor().Layers
maxLen := len(layers) - 1
for i := range layers {
l := layers[len(layers)-1-i]
if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) {
return s, len(layers) - 1 - i, nil
refs := strings.Split(a.LocalReference, ",")
media := a.GetMimeType()
if len(refs) > 1 {
media = mime.MIME_OCTET
}
for _, ref := range refs {
d = &artdesc.Descriptor{Digest: digest.Digest(strings.TrimSpace(ref)), MediaType: media}
// find layer
layers := c.manifest.GetDescriptor().Layers
maxLen := len(layers) - 1
found := false
for i := range layers {
l := layers[len(layers)-1-i]
if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) {
layernums = append(layernums, len(layers)-1-i)
found = true
break
}
}
if !found {
return s, nil, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType)
}
}
return s, 0, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType)
}
return s, 0, nil
return s, layernums, nil
}

func (c *ComponentVersionContainer) GetDescriptor() *compdesc.ComponentDescriptor {
Expand All @@ -304,15 +325,49 @@ func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string,
return nil, errors.New("a resource has to be defined")
}

size := blob.Size()
var refs []string
if c.comp.repo.blobLimit > 0 && size != blobaccess.BLOB_UNKNOWN_SIZE && size > c.comp.repo.blobLimit {
reader, err := blob.Reader()
if err != nil {
return nil, err
}
ch := chunked.New(reader, c.comp.repo.blobLimit, vfsattr.Get(c.GetContext()))
for {
b, err := ch.Next()
if err != nil {
return nil, errors.Wrapf(err, "chunked blob")
}
if b == nil {
break
}
err = c.addLayer(b, &refs)
b.Close()

if err != nil {
return nil, errors.Wrapf(err, "chunked blob")
}
}
} else {
err := c.addLayer(blob, &refs)
if err != nil {
return nil, err
}
}
return localblob.New(strings.Join(refs, ","), refName, blob.MimeType(), global), nil
}

func (c *ComponentVersionContainer) addLayer(blob cpi.BlobAccess, refs *[]string) error {
err := c.manifest.AddBlob(blob)
if err != nil {
return nil, err
return err
}
err = ocihdlr.AssureLayer(c.manifest.GetDescriptor(), blob)
if err != nil {
return nil, err
return err
}
return localblob.New(blob.Digest().String(), refName, blob.MimeType(), global), nil
*refs = append(*refs, blob.Digest().String())
return nil
}

// AssureGlobalRef provides a global manifest for a local OCI Artifact.
Expand Down
Loading

0 comments on commit 879378a

Please sign in to comment.