Skip to content

Commit

Permalink
simplified blob splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
mandelsoft committed Dec 11, 2024
1 parent 1153b2e commit d2e335c
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 720 deletions.
2 changes: 1 addition & 1 deletion api/ocm/cpi/repocpi/bridge_r.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Chunked interface {
SetBlobLimit(s int64) bool
}

// SetBlobLimit tries to set a blob limt for a repository
// SetBlobLimit tries to set a blob limit for a repository
// implementation. It returns true, if this was possible.
func SetBlobLimit(i RepositoryImpl, s int64) bool {
if c, ok := i.(Chunked); ok {
Expand Down
4 changes: 2 additions & 2 deletions api/ocm/extensions/repositories/genericocireg/bloblimits.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func init() {
// AddDefaultBlobLimit can be used to set default blob limits
// for known repositories.
// Those limits will be overwritten, by blob limits
// given by a configuration ovject and the repository
// specification
// given by a configuration object and the repository
// specification.
func AddDefaultBlobLimit(name string, limit int64) {
lock.Lock()
defer lock.Unlock()
Expand Down
49 changes: 34 additions & 15 deletions api/ocm/extensions/repositories/genericocireg/componentversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package genericocireg

import (
"fmt"
"io"
"path"
"strings"

"github.com/mandelsoft/goutils/errors"
"github.com/mandelsoft/goutils/set"
"github.com/mandelsoft/vfs/pkg/vfs"
"github.com/opencontainers/go-digest"

"ocm.software/ocm/api/datacontext/attrs/vfsattr"

"ocm.software/ocm/api/oci"
"ocm.software/ocm/api/oci/artdesc"
"ocm.software/ocm/api/oci/extensions/repositories/artifactset"
Expand All @@ -26,8 +28,7 @@ import (
ocihdlr "ocm.software/ocm/api/ocm/extensions/blobhandler/handlers/oci"
"ocm.software/ocm/api/utils/accessio"
"ocm.software/ocm/api/utils/accessobj"
"ocm.software/ocm/api/utils/blobaccess/blobaccess"
"ocm.software/ocm/api/utils/blobaccess/chunked"
"ocm.software/ocm/api/utils/blobaccess"
"ocm.software/ocm/api/utils/errkind"
"ocm.software/ocm/api/utils/mime"
common "ocm.software/ocm/api/utils/misc"
Expand Down Expand Up @@ -316,11 +317,29 @@ func (c *ComponentVersionContainer) GetStorageContext() cpi.StorageContext {
return ocihdlr.New(c.comp.GetName(), c.Repository(), c.comp.repo.ocirepo.GetSpecification().GetKind(), c.comp.repo.ocirepo, c.comp.namespace, c.manifest)
}

func blobAccessForChunk(blob blobaccess.BlobAccess, fs vfs.FileSystem, r io.Reader, limit int64) (cpi.BlobAccess, bool, error) {
f, err := blobaccess.NewTempFile("", "chunk-*", fs)
if err != nil {
return nil, true, err
}
written, err := io.CopyN(f.Writer(), r, limit)
if err != nil && !errors.Is(err, io.EOF) {
f.Close()
return nil, false, err
}
if written <= 0 {
f.Close()
return nil, false, nil
}
return f.AsBlob(blob.MimeType()), written == limit, nil
}

func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) {
if blob == nil {
return nil, errors.New("a resource has to be defined")
}

fs := vfsattr.Get(c.GetContext())
size := blob.Size()
limit := c.comp.repo.blobLimit
var refs []string
Expand All @@ -330,20 +349,20 @@ func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string,
return nil, err
}
defer reader.Close()
ch := chunked.New(reader, limit, vfsattr.Get(c.GetContext()))
for {
b, err := ch.Next()
if err != nil {
return nil, errors.Wrapf(err, "chunked blob")
}
if b == nil {
break
}
err = c.addLayer(b, &refs)
b.Close()

var b blobaccess.BlobAccess
cont := true
for cont {
b, cont, err = blobAccessForChunk(blob, fs, reader, limit)
if err != nil {
return nil, errors.Wrapf(err, "chunked blob")
return nil, err
}
if b != nil {
err = c.addLayer(b, &refs)
b.Close()
if err != nil {
return nil, err
}
}
}
} else {
Expand Down
7 changes: 4 additions & 3 deletions api/ocm/extensions/repositories/genericocireg/config/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,14 @@ func (a *Config) ApplyTo(ctx config.Context, target interface{}) error {

const usage = `
The config type <code>` + ConfigType + `</code> can be used to set some
blob layer limits for particular OCI registries used to host OCM repositories;
blob layer limits for particular OCI registries used to host OCM repositories.
The <code>blobLimits</code> field maps a OCI registry address to the blob limit to use:
<pre>
type: ` + ConfigType + `
blobLimits:
dummy.io: 65564
dummy.io:8443: 32768
dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit
</pre>
If blob limits apply to a registry, local blobs with a size larger than
Expand All @@ -104,6 +105,6 @@ These settings can be overwritten by explicit settings in an OCM
repository specification for those repositories.
The most specific entry will be used. If a registry with a dedicated
port is requested, but no explicit such configuration is found, the
port is requested, but no explicit configuration is found, the
setting for the sole hostname is used (if configured).
`
85 changes: 76 additions & 9 deletions api/ocm/extensions/repositories/genericocireg/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,18 @@ var _ = Describe("component repository mapping", func() {
MustBeSuccessful(finalize.Finalize())
})

DescribeTable("creates a dummy component with chunks", func(f func(ocm.ResourceAccess)) {
const ref4 = "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08,sha256:3a6eb0790f39ac87c94f3856b2dd2c5d110e6811602261a9a923d3bb23adc8b7"
const ref5 = "sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d"
const ref8 = "sha256:" + ocmtesthelper.D_TESTDATA

DescribeTable("creates a dummy component with chunks", func(limit int, f func(ocm.ResourceAccess), ref string) {
var finalize finalizer.Finalizer
defer Defer(finalize.Finalize)

repo := finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec)))
impl := Must(repocpi.GetRepositoryImplementation(repo))
Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl"))
repocpi.SetBlobLimit(impl, 5)
repocpi.SetBlobLimit(impl, int64(limit))

comp := finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT)))
vers := finalizer.ClosingWith(&finalize, Must(comp.NewVersion("v1")))
Expand Down Expand Up @@ -167,24 +171,87 @@ var _ = Describe("component repository mapping", func() {
acc := Must(rsc.Access())
local, ok := acc.(*localblob.AccessSpec)
Expect(ok).To(BeTrue())
Expect(local.LocalReference).To(Equal("sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d"))
// fmt.Printf("localref: %s\n", local.LocalReference)
Expect(local.LocalReference).To(Equal(ref))
Expect(rsc.Meta().Digest).NotTo(BeNil())
Expect(rsc.Meta().Digest.Value).To(Equal(ocmtesthelper.D_TESTDATA))

f(rsc)

MustBeSuccessful(finalize.Finalize())
},
Entry("get blob", func(rsc ocm.ResourceAccess) {
Entry("get blob", 5, func(rsc ocm.ResourceAccess) {
data := Must(ocmutils.GetResourceData(rsc))
Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA))
}, ref5),
Entry("stream blob", 5, func(rsc ocm.ResourceAccess) {
r := Must(ocmutils.GetResourceReader(rsc))
data := Must(io.ReadAll(r))
Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA))
}, ref5),
Entry("stream blob with small buffer", 5, func(rsc ocm.ResourceAccess) {
var buf [2]byte
var data []byte

r := Must(ocmutils.GetResourceReader(rsc))

for {
n, err := r.Read(buf[:])
if n > 0 {
data = append(data, buf[:n]...)
}
if err != nil {
if err == io.EOF {
break
} else {
MustBeSuccessful(err)
}
}
}
Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA))
}, ref5),

Entry("get blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) {
data := Must(ocmutils.GetResourceData(rsc))
Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA))
}, ref8),
Entry("stream blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) {
r := Must(ocmutils.GetResourceReader(rsc))
data := Must(io.ReadAll(r))
Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA))
}, ref8),
Entry("stream blob with small buffer (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) {
var buf [2]byte
var data []byte

r := Must(ocmutils.GetResourceReader(rsc))

for {
n, err := r.Read(buf[:])
if n > 0 {
data = append(data, buf[:n]...)
}
if err != nil {
if err == io.EOF {
break
} else {
MustBeSuccessful(err)
}
}
}
Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA))
}, ref8),

Entry("get blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) {
data := Must(ocmutils.GetResourceData(rsc))
Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA))
}),
Entry("stream blob", func(rsc ocm.ResourceAccess) {
}, ref4),
Entry("stream blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) {
r := Must(ocmutils.GetResourceReader(rsc))
data := Must(io.ReadAll(r))
Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA))
}),
Entry("stream blob with small buffer", func(rsc ocm.ResourceAccess) {
}, ref4),
Entry("stream blob with small buffer (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) {
var buf [2]byte
var data []byte

Expand All @@ -204,7 +271,7 @@ var _ = Describe("component repository mapping", func() {
}
}
Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA))
}),
}, ref4),
)

It("handles legacylocalociblob access method", func() {
Expand Down
4 changes: 2 additions & 2 deletions api/ocm/extensions/repositories/genericocireg/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type RepositoryImpl struct {
ocirepo oci.Repository
readonly bool
// blobLimit is the size limit for layers maintained for the storage of localBlobs.
// The value -1 means an unconfigured value,
// a value == 0 disables the limiting and
// The value -1 means an unconfigured value (a default from the blob limit configuration is used),
// a value == 0 disables the limiting and (a default from the blob limit configuration is ignored),
// a value > 0 enabled the usage of the specified size.
blobLimit int64
}
Expand Down
93 changes: 0 additions & 93 deletions api/utils/accessio/chunkedreader.go

This file was deleted.

Loading

0 comments on commit d2e335c

Please sign in to comment.