From 879378a5f9d23a4bb42249b7003eb986f23e0369 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Mon, 25 Nov 2024 17:17:15 +0100 Subject: [PATCH] OCI layer size limit --- api/ocm/cpi/repocpi/bridge_r.go | 10 ++ .../genericocireg/accessmethod_localblob.go | 119 ++++++++++++++++- .../genericocireg/componentversion.go | 121 +++++++++++++----- .../repositories/genericocireg/repo_test.go | 53 ++++++++ .../repositories/genericocireg/repository.go | 26 ++-- .../repositories/genericocireg/type.go | 23 +++- api/utils/accessio/chunkedreader.go | 33 +++-- api/utils/accessio/chunkedreader_test.go | 4 +- api/utils/blobaccess/chunked/access.go | 111 +++++++--------- api/utils/blobaccess/chunked/chunked_test.go | 112 ++++++++++++++++ api/utils/blobaccess/chunked/suite_test.go | 13 ++ 11 files changed, 493 insertions(+), 132 deletions(-) create mode 100644 api/utils/blobaccess/chunked/chunked_test.go create mode 100644 api/utils/blobaccess/chunked/suite_test.go diff --git a/api/ocm/cpi/repocpi/bridge_r.go b/api/ocm/cpi/repocpi/bridge_r.go index f7eea176fe..bce9642245 100644 --- a/api/ocm/cpi/repocpi/bridge_r.go +++ b/api/ocm/cpi/repocpi/bridge_r.go @@ -34,6 +34,16 @@ type RepositoryImpl interface { io.Closer } +type Chunked interface { + SetBlobLimit(s int64) +} + +func SetBlobLimit(i RepositoryImpl, s int64) { + if c, ok := i.(Chunked); ok { + c.SetBlobLimit(s) + } +} + type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository] type repositoryBridge struct { diff --git a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go index 94a0a85966..2bb3feaad3 100644 --- a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go +++ b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go @@ -1,10 +1,13 @@ package genericocireg import ( + "bytes" "io" + "strings" "sync" "github.com/mandelsoft/goutils/errors" + "github.com/mandelsoft/goutils/finalizer" "github.com/opencontainers/go-digest" "ocm.software/ocm/api/oci" @@ -88,7 +91,17 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) { return nil, errors.ErrNotImplemented("artifact blob synthesis") } } - _, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference)) + refs := strings.Split(m.spec.LocalReference, ",") + + var ( + data blobaccess.DataAccess + err error + ) + if len(refs) < 2 { + _, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference)) + } else { + data = &composedBlock{m, refs} + } if err != nil { return nil, err } @@ -111,3 +124,107 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) { func (m *localBlobAccessMethod) MimeType() string { return m.spec.MediaType } + +//////////////////////////////////////////////////////////////////////////////// + +type composedBlock struct { + m *localBlobAccessMethod + refs []string +} + +var _ blobaccess.DataAccess = (*composedBlock)(nil) + +func (c *composedBlock) Get() ([]byte, error) { + buf := bytes.NewBuffer(nil) + for _, ref := range c.refs { + var finalize finalizer.Finalizer + + _, data, err := c.m.namespace.GetBlobData(digest.Digest(ref)) + if err != nil { + return nil, err + } + finalize.Close(data) + r, err := data.Reader() + if err != nil { + return nil, err + } + finalize.Close(r) + _, err = io.Copy(buf, r) + if err != nil { + return nil, err + } + err = finalize.Finalize() + if err != nil { + return nil, err + } + } + return buf.Bytes(), nil +} + +func (c *composedBlock) Reader() (io.ReadCloser, error) { + return &composedReader{ + m: c.m, + refs: c.refs, + }, nil +} + +func (c composedBlock) Close() error { + return nil +} + +type composedReader struct { + lock sync.Mutex + m *localBlobAccessMethod + refs []string + reader io.ReadCloser + data blobaccess.DataAccess +} + +func (c *composedReader) Read(p []byte) (n int, err error) { + c.lock.Lock() + defer c.lock.Unlock() + + for { + if c.reader != nil { + n, err := c.reader.Read(p) + if n > 0 { + if err == io.EOF { + err = nil + } + return n, err + } + if err != nil { + return n, err + } + c.reader.Close() + c.data.Close() + c.reader = nil + } + if len(c.refs) == 0 { + return 0, io.EOF + } + + ref := strings.TrimSpace(c.refs[0]) + _, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref)) + if err != nil { + return 0, err + } + c.reader, err = c.data.Reader() + if err != nil { + return 0, err + } + } +} + +func (c *composedReader) Close() error { + c.lock.Lock() + defer c.lock.Unlock() + + if c.reader != nil { + c.reader.Close() + c.data.Close() + c.reader = nil + c.refs = nil + } + return nil +} diff --git a/api/ocm/extensions/repositories/genericocireg/componentversion.go b/api/ocm/extensions/repositories/genericocireg/componentversion.go index f603fbe509..c9b6338993 100644 --- a/api/ocm/extensions/repositories/genericocireg/componentversion.go +++ b/api/ocm/extensions/repositories/genericocireg/componentversion.go @@ -9,6 +9,7 @@ import ( "github.com/mandelsoft/goutils/set" "github.com/opencontainers/go-digest" + "ocm.software/ocm/api/datacontext/attrs/vfsattr" "ocm.software/ocm/api/oci" "ocm.software/ocm/api/oci/artdesc" "ocm.software/ocm/api/oci/extensions/repositories/artifactset" @@ -25,7 +26,10 @@ import ( ocihdlr "ocm.software/ocm/api/ocm/extensions/blobhandler/handlers/oci" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/accessobj" + "ocm.software/ocm/api/utils/blobaccess/blobaccess" + "ocm.software/ocm/api/utils/blobaccess/chunked" "ocm.software/ocm/api/utils/errkind" + "ocm.software/ocm/api/utils/mime" common "ocm.software/ocm/api/utils/misc" "ocm.software/ocm/api/utils/refmgmt" "ocm.software/ocm/api/utils/runtime" @@ -183,32 +187,36 @@ func (c *ComponentVersionContainer) Update() (bool, error) { layers.Add(i) } for i, r := range desc.Resources { - s, l, err := c.evalLayer(r.Access) + s, list, err := c.evalLayer(r.Access) if err != nil { return false, fmt.Errorf("failed resource layer evaluation: %w", err) } - if l > 0 { - layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ - Kind: ARTKIND_RESOURCE, - Identity: r.GetIdentity(desc.Resources), - }) - layers.Delete(l) + if len(list) > 0 { + for _, l := range list { + layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ + Kind: ARTKIND_RESOURCE, + Identity: r.GetIdentity(desc.Resources), + }) + layers.Delete(l) + } } if s != r.Access { desc.Resources[i].Access = s } } for i, r := range desc.Sources { - s, l, err := c.evalLayer(r.Access) + s, list, err := c.evalLayer(r.Access) if err != nil { return false, fmt.Errorf("failed source layer evaluation: %w", err) } - if l > 0 { - layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ - Kind: ARTKIND_SOURCE, - Identity: r.GetIdentity(desc.Sources), - }) - layers.Delete(l) + if len(list) > 0 { + for _, l := range list { + layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ + Kind: ARTKIND_SOURCE, + Identity: r.GetIdentity(desc.Sources), + }) + layers.Delete(l) + } } if s != r.Access { desc.Sources[i].Access = s @@ -259,32 +267,45 @@ func (c *ComponentVersionContainer) Update() (bool, error) { return false, nil } -func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, int, error) { - var d *artdesc.Descriptor +func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, []int, error) { + var ( + d *artdesc.Descriptor + layernums []int + ) spec, err := c.GetContext().AccessSpecForSpec(s) if err != nil { - return s, 0, err + return s, nil, err } if a, ok := spec.(*localblob.AccessSpec); ok { if ok, _ := artdesc.IsDigest(a.LocalReference); !ok { - return s, 0, errors.ErrInvalid("digest", a.LocalReference) + return s, nil, errors.ErrInvalid("digest", a.LocalReference) } - d = &artdesc.Descriptor{Digest: digest.Digest(a.LocalReference), MediaType: a.GetMimeType()} - } - if d != nil { - // find layer - layers := c.manifest.GetDescriptor().Layers - maxLen := len(layers) - 1 - for i := range layers { - l := layers[len(layers)-1-i] - if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) { - return s, len(layers) - 1 - i, nil + refs := strings.Split(a.LocalReference, ",") + media := a.GetMimeType() + if len(refs) > 1 { + media = mime.MIME_OCTET + } + for _, ref := range refs { + d = &artdesc.Descriptor{Digest: digest.Digest(strings.TrimSpace(ref)), MediaType: media} + // find layer + layers := c.manifest.GetDescriptor().Layers + maxLen := len(layers) - 1 + found := false + for i := range layers { + l := layers[len(layers)-1-i] + if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) { + layernums = append(layernums, len(layers)-1-i) + found = true + break + } + } + if !found { + return s, nil, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType) } } - return s, 0, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType) } - return s, 0, nil + return s, layernums, nil } func (c *ComponentVersionContainer) GetDescriptor() *compdesc.ComponentDescriptor { @@ -304,15 +325,49 @@ func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, return nil, errors.New("a resource has to be defined") } + size := blob.Size() + var refs []string + if c.comp.repo.blobLimit > 0 && size != blobaccess.BLOB_UNKNOWN_SIZE && size > c.comp.repo.blobLimit { + reader, err := blob.Reader() + if err != nil { + return nil, err + } + ch := chunked.New(reader, c.comp.repo.blobLimit, vfsattr.Get(c.GetContext())) + for { + b, err := ch.Next() + if err != nil { + return nil, errors.Wrapf(err, "chunked blob") + } + if b == nil { + break + } + err = c.addLayer(b, &refs) + b.Close() + + if err != nil { + return nil, errors.Wrapf(err, "chunked blob") + } + } + } else { + err := c.addLayer(blob, &refs) + if err != nil { + return nil, err + } + } + return localblob.New(strings.Join(refs, ","), refName, blob.MimeType(), global), nil +} + +func (c *ComponentVersionContainer) addLayer(blob cpi.BlobAccess, refs *[]string) error { err := c.manifest.AddBlob(blob) if err != nil { - return nil, err + return err } err = ocihdlr.AssureLayer(c.manifest.GetDescriptor(), blob) if err != nil { - return nil, err + return err } - return localblob.New(blob.Digest().String(), refName, blob.MimeType(), global), nil + *refs = append(*refs, blob.Digest().String()) + return nil } // AssureGlobalRef provides a global manifest for a local OCI Artifact. diff --git a/api/ocm/extensions/repositories/genericocireg/repo_test.go b/api/ocm/extensions/repositories/genericocireg/repo_test.go index 5ea992c69a..f550ef0a6f 100644 --- a/api/ocm/extensions/repositories/genericocireg/repo_test.go +++ b/api/ocm/extensions/repositories/genericocireg/repo_test.go @@ -123,6 +123,59 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }) + FIt("creates a dummy component with chunks", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + repo := finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + repocpi.SetBlobLimit(impl, 5) + + comp := finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers := finalizer.ClosingWith(&finalize, Must(comp.NewVersion("v1"))) + + m1 := compdesc.NewResourceMeta("rsc1", resourcetypes.PLAIN_TEXT, metav1.LocalRelation) + blob := blobaccess.ForString(mime.MIME_TEXT, ocmtesthelper.S_TESTDATA) + MustBeSuccessful(vers.SetResourceBlob(m1, blob, "", nil)) + + MustBeSuccessful(comp.AddVersion(vers)) + + noref := vers.Repository() + Expect(noref).NotTo(BeNil()) + Expect(noref.IsClosed()).To(BeFalse()) + Expect(noref.Close()).To(Succeed()) + Expect(noref.IsClosed()).To(BeFalse()) + + MustBeSuccessful(finalize.Finalize()) + + Expect(noref.IsClosed()).To(BeTrue()) + Expect(noref.Close()).To(MatchError("closed")) + ExpectError(noref.LookupComponent("dummy")).To(MatchError("closed")) + + // access it again + repo = finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + + ok := Must(repo.ExistsComponentVersion(COMPONENT, "v1")) + Expect(ok).To(BeTrue()) + + comp = finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers = finalizer.ClosingWith(&finalize, Must(comp.LookupVersion("v1"))) + + rsc := Must(vers.GetResourceByIndex(0)) + acc := Must(rsc.Access()) + local, ok := acc.(*localblob.AccessSpec) + Expect(ok).To(BeTrue()) + Expect(local.LocalReference).To(Equal("sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d")) + Expect(rsc.Meta().Digest).NotTo(BeNil()) + Expect(rsc.Meta().Digest.Value).To(Equal(ocmtesthelper.D_TESTDATA)) + + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + + MustBeSuccessful(finalize.Finalize()) + }) + It("handles legacylocalociblob access method", func() { var finalize finalizer.Finalizer defer Defer(finalize.Finalize) diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index 70661f76f5..5bde9477b4 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -38,12 +38,13 @@ func GetOCIRepository(r cpi.Repository) ocicpi.Repository { } type RepositoryImpl struct { - bridge repocpi.RepositoryBridge - ctx cpi.Context - meta ComponentRepositoryMeta - nonref cpi.Repository - ocirepo oci.Repository - readonly bool + bridge repocpi.RepositoryBridge + ctx cpi.Context + meta ComponentRepositoryMeta + nonref cpi.Repository + ocirepo oci.Repository + readonly bool + blobLimit int64 } var ( @@ -51,16 +52,21 @@ var ( _ credentials.ConsumerIdentityProvider = (*RepositoryImpl)(nil) ) -func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository) cpi.Repository { +func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository, blobLimit ...int64) cpi.Repository { ctx := datacontext.InternalContextRef(ctxp.OCMContext()) impl := &RepositoryImpl{ - ctx: ctx, - meta: *DefaultComponentRepositoryMeta(meta), - ocirepo: ocirepo, + ctx: ctx, + meta: *DefaultComponentRepositoryMeta(meta), + ocirepo: ocirepo, + blobLimit: general.OptionalDefaulted(-1, blobLimit...), } return repocpi.NewRepository(impl, "OCM repo[OCI]") } +func (r *RepositoryImpl) SetBlobLimit(s int64) { + r.blobLimit = s +} + func (r *RepositoryImpl) Close() error { return r.ocirepo.Close() } diff --git a/api/ocm/extensions/repositories/genericocireg/type.go b/api/ocm/extensions/repositories/genericocireg/type.go index 809d598140..d9563748be 100644 --- a/api/ocm/extensions/repositories/genericocireg/type.go +++ b/api/ocm/extensions/repositories/genericocireg/type.go @@ -91,6 +91,7 @@ func NewComponentRepositoryMeta(subPath string, mapping ComponentNameMapping) *C type RepositorySpec struct { oci.RepositorySpec ComponentRepositoryMeta + BlobLimit int64 } var ( @@ -127,19 +128,26 @@ func (a *RepositorySpec) AsUniformSpec(cpi.Context) *cpi.UniformRepositorySpec { return &cpi.UniformRepositorySpec{Type: a.GetKind(), Scheme: spec.Scheme, Host: spec.Host, Info: spec.Info, TypeHint: spec.TypeHint, SubPath: a.SubPath} } +type meta struct { + ComponentRepositoryMeta `json:",inline"` + BlobLimit int64 `json:"blobLimit"` +} + func (u *RepositorySpec) UnmarshalJSON(data []byte) error { logrus.Debugf("unmarshal generic ocireg spec %s\n", string(data)) ocispec := &oci.GenericRepositorySpec{} if err := json.Unmarshal(data, ocispec); err != nil { return err } - compmeta := &ComponentRepositoryMeta{} - if err := json.Unmarshal(data, ocispec); err != nil { + + var m meta + if err := json.Unmarshal(data, &m); err != nil { return err } u.RepositorySpec = ocispec - u.ComponentRepositoryMeta = *compmeta + u.ComponentRepositoryMeta = m.ComponentRepositoryMeta + u.BlobLimit = m.BlobLimit normalizers.Normalize(u) return nil @@ -154,7 +162,12 @@ func (u RepositorySpec) MarshalJSON() ([]byte, error) { if err != nil { return nil, err } - compmeta, err := runtime.ToUnstructuredObject(u.ComponentRepositoryMeta) + + m := meta{ + ComponentRepositoryMeta: u.ComponentRepositoryMeta, + BlobLimit: u.BlobLimit, + } + compmeta, err := runtime.ToUnstructuredObject(&m) if err != nil { return nil, err } @@ -166,7 +179,7 @@ func (s *RepositorySpec) Repository(ctx cpi.Context, creds credentials.Credentia if err != nil { return nil, err } - return NewRepository(ctx, &s.ComponentRepositoryMeta, r), nil + return NewRepository(ctx, &s.ComponentRepositoryMeta, r, s.BlobLimit), nil } func (s *RepositorySpec) GetConsumerId(uctx ...credentials.UsageContext) credentials.ConsumerIdentity { diff --git a/api/utils/accessio/chunkedreader.go b/api/utils/accessio/chunkedreader.go index 40ef814e75..6dc728f4d0 100644 --- a/api/utils/accessio/chunkedreader.go +++ b/api/utils/accessio/chunkedreader.go @@ -12,9 +12,9 @@ type ChunkedReader struct { lock sync.Mutex reader io.Reader buffer *bytes.Buffer - size uint64 - chunk uint64 - read uint64 + size int64 + chunk int + read int64 err error preread uint @@ -22,7 +22,7 @@ type ChunkedReader struct { var _ io.Reader = (*ChunkedReader)(nil) -func NewChunkedReader(r io.Reader, chunk uint64, preread ...uint) *ChunkedReader { +func NewChunkedReader(r io.Reader, chunk int64, preread ...uint) *ChunkedReader { return &ChunkedReader{ reader: r, size: chunk, @@ -37,13 +37,13 @@ func (c *ChunkedReader) Read(p []byte) (n int, err error) { if c.read == c.size { return 0, io.EOF } - if c.read+uint64(len(p)) > c.size { + if c.read+int64(len(p)) > c.size { p = p[:c.size-c.read] // read at most rest of chunk size } if c.buffer != nil && c.buffer.Len() > 0 { // first, consume from buffer n, _ := c.buffer.Read(p) - c.read += uint64(n) + c.read += int64(n) if c.buffer.Len() == 0 { c.buffer = nil } @@ -56,7 +56,8 @@ func (c *ChunkedReader) Read(p []byte) (n int, err error) { return 0, c.err } n, err = c.reader.Read(p) - c.read += uint64(n) + c.read += int64(n) + c.err = err return c.report(n, err) } @@ -67,11 +68,18 @@ func (c *ChunkedReader) report(n int, err error) (int, error) { return n, err } +func (c *ChunkedReader) ChunkNo() int { + c.lock.Lock() + defer c.lock.Unlock() + + return c.chunk +} + func (c *ChunkedReader) ChunkDone() bool { c.lock.Lock() defer c.lock.Unlock() - return c.read >= c.size + return c.read >= c.size || c.err != nil } func (c *ChunkedReader) Next() bool { @@ -82,12 +90,11 @@ func (c *ChunkedReader) Next() bool { return false } + // cannot assume that read with size 0 returns EOF as proposed + // by io.Reader.Read (see bytes.Buffer.Read). + // Therefore, we really have to read something. if c.buffer == nil { - // cannot assume that read with size 0 returns EOF as proposed - // by io.Reader.Read (see bytes.Buffer.Read). - // Therefore, we really have to read something. - - var buf = make([]byte, c.preread, c.preread) + buf := make([]byte, c.preread) n, err := c.reader.Read(buf) c.err = err if n > 0 { diff --git a/api/utils/accessio/chunkedreader_test.go b/api/utils/accessio/chunkedreader_test.go index 68a3f46eae..6dad018658 100644 --- a/api/utils/accessio/chunkedreader_test.go +++ b/api/utils/accessio/chunkedreader_test.go @@ -35,7 +35,7 @@ var _ = FDescribe("Test Environment", func() { n, err = chunked.Read(buf[:]) Expect(n).To(Equal(0)) Expect(err).To(Equal(io.EOF)) - Expect(chunked.ChunkDone()).To(Equal(false)) + Expect(chunked.ChunkDone()).To(Equal(true)) Expect(chunked.Next()).To(Equal(false)) n, err = chunked.Read(buf[:]) @@ -55,7 +55,7 @@ var _ = FDescribe("Test Environment", func() { n, err = chunked.Read(buf[:]) Expect(n).To(Equal(0)) Expect(err).To(Equal(io.EOF)) - Expect(chunked.ChunkDone()).To(Equal(false)) + Expect(chunked.ChunkDone()).To(Equal(true)) Expect(chunked.Next()).To(Equal(false)) n, err = chunked.Read(buf[:]) diff --git a/api/utils/blobaccess/chunked/access.go b/api/utils/blobaccess/chunked/access.go index 3b67cadffa..2643681cd9 100644 --- a/api/utils/blobaccess/chunked/access.go +++ b/api/utils/blobaccess/chunked/access.go @@ -1,92 +1,67 @@ package chunked import ( + "fmt" "io" "sync" - "github.com/opencontainers/go-digest" + "github.com/mandelsoft/vfs/pkg/vfs" + "ocm.software/ocm/api/utils" + "ocm.software/ocm/api/utils/accessio" + "ocm.software/ocm/api/utils/blobaccess" "ocm.software/ocm/api/utils/blobaccess/bpi" + "ocm.software/ocm/api/utils/mime" ) -type Chunked interface { - bpi.BlobAccess - - Next() bool -} - -type chunked struct { - lock sync.Mutex - base bpi.BlobAccess - blobsize uint64 - chunksize uint64 - preread uint - - reader io.Reader -} - -var _ bpi.BlobAccessBase = (*chunked)(nil) - -func New(acc bpi.BlobAccess, chunk uint64, preread...uint) (Chunked, error) { - b, err := acc.Dup() +func newChunck(r io.Reader, fss ...vfs.FileSystem) (bpi.BlobAccess, error) { + t, err := blobaccess.NewTempFile("", "chunk-*", fss...) if err != nil { return nil, err } - s := acc.Size() - - return bpi.NewBlobAccessForBase(&chunked{base: b, blobsize: size, chunksize: chunk, preread: utils.OptionalDefaulted(8096, preread...)}), nil -} - -type view struct { - bpi.BlobAccess -} - -func (v *view) Dup() bpi.BlobAccess { - -} -func (c *chunked) Close() error { - c.lock.Lock() - defer c.lock.Unlock() - - if c.base == nil { - return bpi.ErrClosed + _, err = io.Copy(t.Writer(), r) + if err != nil { + t.Close() + return nil, err } - err := c.base.Close() - c.base = nil - return err -} - -func (c *chunked) Get() ([]byte, error) { - c.lock.Lock() - defer c.lock.Unlock() - - if - // TODO implement me - panic("implement me") -} - -func (c *chunked) Reader() (io.ReadCloser, error) { - // TODO implement me - panic("implement me") + return t.AsBlob(mime.MIME_OCTET), nil } -func (c *chunked) Digest() digest.Digest { - // TODO implement me - panic("implement me") +type ChunkedBlobSource interface { + Next() (bpi.BlobAccess, error) } -func (c *chunked) MimeType() string { - // TODO implement me - panic("implement me") +type chunkedAccess struct { + lock sync.Mutex + chunksize int64 + reader *accessio.ChunkedReader + fs vfs.FileSystem + cont bool } -func (c *chunked) DigestKnown() bool { - // TODO implement me - panic("implement me") +func New(r io.Reader, chunksize int64, fss ...vfs.FileSystem) ChunkedBlobSource { + reader := accessio.NewChunkedReader(r, chunksize) + return &chunkedAccess{ + chunksize: chunksize, + reader: reader, + fs: utils.FileSystem(fss...), + cont: false, + } } -func (c *chunked) Size() int64 { - // TODO implement me - panic("implement me") +func (r *chunkedAccess) Next() (bpi.BlobAccess, error) { + r.lock.Lock() + defer r.lock.Unlock() + + if r.cont { + if !r.reader.ChunkDone() { + return nil, fmt.Errorf("unexpected incomplete read") + } + if !r.reader.Next() { + return nil, nil + } + } + r.cont = true + return newChunck(r.reader, r.fs) } diff --git a/api/utils/blobaccess/chunked/chunked_test.go b/api/utils/blobaccess/chunked/chunked_test.go new file mode 100644 index 0000000000..958c94fd3c --- /dev/null +++ b/api/utils/blobaccess/chunked/chunked_test.go @@ -0,0 +1,112 @@ +package chunked_test + +import ( + "bytes" + "io" + + "github.com/mandelsoft/goutils/finalizer" + . "github.com/mandelsoft/goutils/testutils" + "github.com/mandelsoft/vfs/pkg/memoryfs" + "github.com/mandelsoft/vfs/pkg/vfs" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "ocm.software/ocm/api/utils/blobaccess/chunked" +) + +var _ = Describe("Chunked Blobs", func() { + var blobData = []byte("a1a2a3a4a5a6a7a8a9a0b1b2b3b4b5b6b7b8b9b0") + + var src chunked.ChunkedBlobSource + var fs vfs.FileSystem + + Context("blobs", func() { + BeforeEach(func() { + fs = memoryfs.New() + }) + + AfterEach(func() { + vfs.Cleanup(fs) + }) + + It("small blobs", func() { + src = chunked.New(bytes.NewBuffer(blobData), 50, fs) + n := 0 + buf := bytes.NewBuffer(nil) + + var finalize finalizer.Finalizer + defer finalize.Finalize() + for { + b := Must(src.Next()) + if b == nil { + break + } + n++ + finalize.Close(b) + r := Must(b.Reader()) + _, err := io.Copy(buf, r) + r.Close() + MustBeSuccessful(err) + } + Expect(n).To(Equal(1)) + Expect(buf.String()).To(Equal(string(blobData))) + + MustBeSuccessful(finalize.Finalize()) + list := Must(vfs.ReadDir(fs, "/")) + Expect(len(list)).To(Equal(0)) + }) + + It("matching blobs", func() { + src = chunked.New(bytes.NewBuffer(blobData), 40, fs) + n := 0 + buf := bytes.NewBuffer(nil) + + var finalize finalizer.Finalizer + defer finalize.Finalize() + for { + b := Must(src.Next()) + if b == nil { + break + } + n++ + finalize.Close(b) + r := Must(b.Reader()) + _, err := io.Copy(buf, r) + r.Close() + MustBeSuccessful(err) + } + Expect(n).To(Equal(1)) + Expect(buf.String()).To(Equal(string(blobData))) + + MustBeSuccessful(finalize.Finalize()) + list := Must(vfs.ReadDir(fs, "/")) + Expect(len(list)).To(Equal(0)) + }) + + It("large blobs", func() { + src = chunked.New(bytes.NewBuffer(blobData), 18, fs) + n := 0 + buf := bytes.NewBuffer(nil) + + var finalize finalizer.Finalizer + defer finalize.Finalize() + for { + b := Must(src.Next()) + if b == nil { + break + } + n++ + finalize.Close(b) + r := Must(b.Reader()) + _, err := io.Copy(buf, r) + r.Close() + MustBeSuccessful(err) + } + Expect(n).To(Equal(3)) + Expect(buf.String()).To(Equal(string(blobData))) + + MustBeSuccessful(finalize.Finalize()) + list := Must(vfs.ReadDir(fs, "/")) + Expect(len(list)).To(Equal(0)) + }) + }) +}) diff --git a/api/utils/blobaccess/chunked/suite_test.go b/api/utils/blobaccess/chunked/suite_test.go new file mode 100644 index 0000000000..55e1b7ca35 --- /dev/null +++ b/api/utils/blobaccess/chunked/suite_test.go @@ -0,0 +1,13 @@ +package chunked_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestConfig(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "chunked blobs Test Suite") +}