diff --git a/api/credentials/identity/hostpath/identity.go b/api/credentials/identity/hostpath/identity.go
index baa2aad8a9..968412dda6 100644
--- a/api/credentials/identity/hostpath/identity.go
+++ b/api/credentials/identity/hostpath/identity.go
@@ -157,3 +157,14 @@ func PathPrefix(id cpi.ConsumerIdentity) string {
}
return strings.TrimPrefix(id[ID_PATHPREFIX], "/")
}
+
+func HostPort(id cpi.ConsumerIdentity) string {
+ if id == nil {
+ return ""
+ }
+ host := id[ID_HOSTNAME]
+ if port, ok := id[ID_PORT]; ok {
+ return host + ":" + port
+ }
+ return host
+}
diff --git a/api/oci/extensions/repositories/ocireg/repository.go b/api/oci/extensions/repositories/ocireg/repository.go
index 1f1ede981a..cc2c845ac0 100644
--- a/api/oci/extensions/repositories/ocireg/repository.go
+++ b/api/oci/extensions/repositories/ocireg/repository.go
@@ -11,6 +11,7 @@ import (
"github.com/containerd/errdefs"
"github.com/mandelsoft/goutils/errors"
"github.com/mandelsoft/logging"
+ "github.com/moby/locker"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"
@@ -69,6 +70,7 @@ func NewRepository(ctx cpi.Context, spec *RepositorySpec, info *RepositoryInfo)
spec: spec,
info: info,
}
+ i.logger.Debug("created repository")
return cpi.NewRepository(i), nil
}
@@ -164,14 +166,22 @@ func (r *RepositoryImpl) getResolver(comp string) (oras.Resolver, error) {
}
authClient := &auth.Client{
- Client: client,
- Cache: auth.NewCache(),
- Credential: auth.StaticCredential(r.info.HostPort(), authCreds),
+ Client: client,
+ Cache: auth.NewCache(),
+ Credential: auth.CredentialFunc(func(ctx context.Context, hostport string) (auth.Credential, error) {
+ if strings.Contains(hostport, r.info.HostPort()) {
+ return authCreds, nil
+ }
+ logger.Warn("no credentials for host", "host", hostport)
+ return auth.EmptyCredential, nil
+ }),
}
return oras.New(oras.ClientOptions{
Client: authClient,
PlainHTTP: r.info.Scheme == "http",
+ Logger: logger,
+ Lock: locker.New(),
}), nil
}
diff --git a/api/ocm/cpi/repocpi/bridge_r.go b/api/ocm/cpi/repocpi/bridge_r.go
index f7eea176fe..53ed151f5a 100644
--- a/api/ocm/cpi/repocpi/bridge_r.go
+++ b/api/ocm/cpi/repocpi/bridge_r.go
@@ -34,6 +34,24 @@ type RepositoryImpl interface {
io.Closer
}
+// Chunked is an optional interface, which
+// may be implemented to accept a blob limit for mapping
+// local blobs to an external storage system.
+type Chunked interface {
+ // SetBlobLimit sets the blob limit if possible.
+ // It returns true, if this was successful.
+ SetBlobLimit(s int64) bool
+}
+
+// SetBlobLimit tries to set a blob limit for a repository
+// implementation. It returns true, if this was possible.
+func SetBlobLimit(i RepositoryImpl, s int64) bool {
+ if c, ok := i.(Chunked); ok {
+ return c.SetBlobLimit(s)
+ }
+ return false
+}
+
type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository]
type repositoryBridge struct {
diff --git a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go
index 94a0a85966..c661c9f294 100644
--- a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go
+++ b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go
@@ -1,10 +1,14 @@
package genericocireg
import (
+ "bytes"
"io"
+ "os"
+ "strings"
"sync"
"github.com/mandelsoft/goutils/errors"
+ "github.com/mandelsoft/goutils/finalizer"
"github.com/opencontainers/go-digest"
"ocm.software/ocm/api/oci"
@@ -88,9 +92,19 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) {
return nil, errors.ErrNotImplemented("artifact blob synthesis")
}
}
- _, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
- if err != nil {
- return nil, err
+ refs := strings.Split(m.spec.LocalReference, ",")
+
+ var (
+ data blobaccess.DataAccess
+ err error
+ )
+ if len(refs) < 2 {
+ _, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ data = &composedBlock{m, refs}
}
m.data = data
return m.data, err
@@ -111,3 +125,119 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) {
func (m *localBlobAccessMethod) MimeType() string {
return m.spec.MediaType
}
+
+////////////////////////////////////////////////////////////////////////////////
+
+type composedBlock struct {
+ m *localBlobAccessMethod
+ refs []string
+}
+
+var _ blobaccess.DataAccess = (*composedBlock)(nil)
+
+func (c *composedBlock) Get() ([]byte, error) {
+ buf := bytes.NewBuffer(nil)
+ for _, ref := range c.refs {
+ var finalize finalizer.Finalizer
+
+ _, data, err := c.m.namespace.GetBlobData(digest.Digest(ref))
+ if err != nil {
+ return nil, err
+ }
+ finalize.Close(data)
+ r, err := data.Reader()
+ if err != nil {
+ return nil, err
+ }
+ finalize.Close(r)
+ _, err = io.Copy(buf, r)
+ if err != nil {
+ return nil, err
+ }
+ err = finalize.Finalize()
+ if err != nil {
+ return nil, err
+ }
+ }
+ return buf.Bytes(), nil
+}
+
+func (c *composedBlock) Reader() (io.ReadCloser, error) {
+ return &composedReader{
+ m: c.m,
+ refs: c.refs,
+ }, nil
+}
+
+func (c *composedBlock) Close() error {
+ return nil
+}
+
+type composedReader struct {
+ lock sync.Mutex
+ m *localBlobAccessMethod
+ refs []string
+ reader io.ReadCloser
+ data blobaccess.DataAccess
+}
+
+func (c *composedReader) Read(p []byte) (n int, err error) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ for {
+ if c.reader != nil {
+ n, err := c.reader.Read(p)
+
+ if err == io.EOF {
+ c.reader.Close()
+ c.data.Close()
+ c.refs = c.refs[1:]
+ c.reader = nil
+ c.data = nil
+ // start new layer and return partial (>0) read before next layer is started
+ err = nil
+ }
+ // return partial read (even a zero read if layer is not yet finished) or error
+ if c.reader != nil || err != nil || n > 0 {
+ return n, err
+ }
+ // otherwise, we can use the given buffer for the next layer
+
+ // now, we have to check for a next succeeding layer.
+ // This means to finish with the actual reader and continue
+ // with the next one.
+ }
+
+ // If no more layers are available, report EOF.
+ if len(c.refs) == 0 {
+ return 0, io.EOF
+ }
+
+ ref := strings.TrimSpace(c.refs[0])
+ _, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref))
+ if err != nil {
+ return 0, err
+ }
+ c.reader, err = c.data.Reader()
+ if err != nil {
+ return 0, err
+ }
+ }
+}
+
+func (c *composedReader) Close() error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ if c.reader == nil && c.refs == nil {
+ return os.ErrClosed
+ }
+ if c.reader != nil {
+ c.reader.Close()
+ c.data.Close()
+ c.reader = nil
+ c.refs = nil
+ }
+ return nil
+}
diff --git a/api/ocm/extensions/repositories/genericocireg/bloblimits.go b/api/ocm/extensions/repositories/genericocireg/bloblimits.go
new file mode 100644
index 0000000000..c4ecfc799a
--- /dev/null
+++ b/api/ocm/extensions/repositories/genericocireg/bloblimits.go
@@ -0,0 +1,53 @@
+package genericocireg
+
+import (
+ "sync"
+
+ configctx "ocm.software/ocm/api/config"
+ "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config"
+)
+
+var (
+ defaultBlobLimits config.BlobLimits
+ lock sync.Mutex
+)
+
+const (
+ KB = int64(1000)
+ MB = 1000 * KB
+ GB = 1000 * MB
+)
+
+func init() {
+ defaultBlobLimits = config.BlobLimits{}
+
+ // Add limits for known OCI repositories, here,
+ // or provide init functions in specialized packages
+ // by calling AddDefaultBlobLimit.
+ AddDefaultBlobLimit("ghcr.io", 10*GB) // https://github.com/orgs/community/discussions/77429
+}
+
+// AddDefaultBlobLimit can be used to set default blob limits
+// for known repositories.
+// Those limits will be overwritten, by blob limits
+// given by a configuration object and the repository
+// specification.
+func AddDefaultBlobLimit(name string, limit int64) {
+ lock.Lock()
+ defer lock.Unlock()
+
+ defaultBlobLimits[name] = limit
+}
+
+func ConfigureBlobLimits(ctx configctx.ContextProvider, target config.Configurable) {
+ if target != nil {
+ lock.Lock()
+ defer lock.Unlock()
+
+ target.ConfigureBlobLimits(defaultBlobLimits)
+
+ if ctx != nil {
+ ctx.ConfigContext().ApplyTo(0, target)
+ }
+ }
+}
diff --git a/api/ocm/extensions/repositories/genericocireg/componentversion.go b/api/ocm/extensions/repositories/genericocireg/componentversion.go
index d7b2ec9334..e0dd0f5470 100644
--- a/api/ocm/extensions/repositories/genericocireg/componentversion.go
+++ b/api/ocm/extensions/repositories/genericocireg/componentversion.go
@@ -2,13 +2,16 @@ package genericocireg
import (
"fmt"
+ "io"
"path"
"strings"
"github.com/mandelsoft/goutils/errors"
"github.com/mandelsoft/goutils/set"
+ "github.com/mandelsoft/vfs/pkg/vfs"
"github.com/opencontainers/go-digest"
+ "ocm.software/ocm/api/datacontext/attrs/vfsattr"
"ocm.software/ocm/api/oci"
"ocm.software/ocm/api/oci/artdesc"
"ocm.software/ocm/api/oci/extensions/repositories/artifactset"
@@ -25,7 +28,9 @@ import (
ocihdlr "ocm.software/ocm/api/ocm/extensions/blobhandler/handlers/oci"
"ocm.software/ocm/api/utils/accessio"
"ocm.software/ocm/api/utils/accessobj"
+ "ocm.software/ocm/api/utils/blobaccess"
"ocm.software/ocm/api/utils/errkind"
+ "ocm.software/ocm/api/utils/mime"
common "ocm.software/ocm/api/utils/misc"
"ocm.software/ocm/api/utils/refmgmt"
"ocm.software/ocm/api/utils/runtime"
@@ -183,11 +188,11 @@ func (c *ComponentVersionContainer) Update() (bool, error) {
layers.Add(i)
}
for i, r := range desc.Resources {
- s, l, err := c.evalLayer(r.Access)
+ s, list, err := c.evalLayer(r.Access)
if err != nil {
return false, fmt.Errorf("failed resource layer evaluation: %w", err)
}
- if l > 0 {
+ for _, l := range list {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Kind: ARTKIND_RESOURCE,
Identity: r.GetIdentity(desc.Resources),
@@ -199,11 +204,11 @@ func (c *ComponentVersionContainer) Update() (bool, error) {
}
}
for i, r := range desc.Sources {
- s, l, err := c.evalLayer(r.Access)
+ s, list, err := c.evalLayer(r.Access)
if err != nil {
return false, fmt.Errorf("failed source layer evaluation: %w", err)
}
- if l > 0 {
+ for _, l := range list {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Kind: ARTKIND_SOURCE,
Identity: r.GetIdentity(desc.Sources),
@@ -259,32 +264,45 @@ func (c *ComponentVersionContainer) Update() (bool, error) {
return false, nil
}
-func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, int, error) {
- var d *artdesc.Descriptor
+func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, []int, error) {
+ var (
+ d *artdesc.Descriptor
+ layernums []int
+ )
spec, err := c.GetContext().AccessSpecForSpec(s)
if err != nil {
- return s, 0, err
+ return s, nil, err
}
if a, ok := spec.(*localblob.AccessSpec); ok {
if ok, _ := artdesc.IsDigest(a.LocalReference); !ok {
- return s, 0, errors.ErrInvalid("digest", a.LocalReference)
+ return s, nil, errors.ErrInvalid("digest", a.LocalReference)
}
- d = &artdesc.Descriptor{Digest: digest.Digest(a.LocalReference), MediaType: a.GetMimeType()}
- }
- if d != nil {
- // find layer
- layers := c.manifest.GetDescriptor().Layers
- maxLen := len(layers) - 1
- for i := range layers {
- l := layers[len(layers)-1-i]
- if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) {
- return s, len(layers) - 1 - i, nil
+ refs := strings.Split(a.LocalReference, ",")
+ media := a.GetMimeType()
+ if len(refs) > 1 {
+ media = mime.MIME_OCTET
+ }
+ for _, ref := range refs {
+ d = &artdesc.Descriptor{Digest: digest.Digest(strings.TrimSpace(ref)), MediaType: media}
+ // find layer
+ layers := c.manifest.GetDescriptor().Layers
+ maxLen := len(layers) - 1
+ found := false
+ for i := maxLen; i > 0; i-- { // layer 0 is the component descriptor
+ l := layers[i]
+ if l.Digest == d.Digest {
+ layernums = append(layernums, i)
+ found = true
+ break
+ }
+ }
+ if !found {
+ return s, nil, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType)
}
}
- return s, 0, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType)
}
- return s, 0, nil
+ return s, layernums, nil
}
func (c *ComponentVersionContainer) GetDescriptor() *compdesc.ComponentDescriptor {
@@ -299,20 +317,74 @@ func (c *ComponentVersionContainer) GetStorageContext() cpi.StorageContext {
return ocihdlr.New(c.comp.GetName(), c.Repository(), c.comp.repo.ocirepo.GetSpecification().GetKind(), c.comp.repo.ocirepo, c.comp.namespace, c.manifest)
}
+func blobAccessForChunk(blob blobaccess.BlobAccess, fs vfs.FileSystem, r io.Reader, limit int64) (cpi.BlobAccess, bool, error) {
+ f, err := blobaccess.NewTempFile("", "chunk-*", fs)
+ if err != nil {
+ return nil, true, err
+ }
+ written, err := io.CopyN(f.Writer(), r, limit)
+ if err != nil && !errors.Is(err, io.EOF) {
+ f.Close()
+ return nil, false, err
+ }
+ if written <= 0 {
+ f.Close()
+ return nil, false, nil
+ }
+ return f.AsBlob(blob.MimeType()), written == limit, nil
+}
+
func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) {
if blob == nil {
return nil, errors.New("a resource has to be defined")
}
+ fs := vfsattr.Get(c.GetContext())
+ size := blob.Size()
+ limit := c.comp.repo.blobLimit
+ var refs []string
+ if limit > 0 && size != blobaccess.BLOB_UNKNOWN_SIZE && size > limit {
+ reader, err := blob.Reader()
+ if err != nil {
+ return nil, err
+ }
+ defer reader.Close()
+
+ var b blobaccess.BlobAccess
+ cont := true
+ for cont {
+ b, cont, err = blobAccessForChunk(blob, fs, reader, limit)
+ if err != nil {
+ return nil, err
+ }
+ if b != nil {
+ err = c.addLayer(b, &refs)
+ b.Close()
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+ } else {
+ err := c.addLayer(blob, &refs)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return localblob.New(strings.Join(refs, ","), refName, blob.MimeType(), global), nil
+}
+
+func (c *ComponentVersionContainer) addLayer(blob cpi.BlobAccess, refs *[]string) error {
err := c.manifest.AddBlob(blob)
if err != nil {
- return nil, err
+ return err
}
err = ocihdlr.AssureLayer(c.manifest.GetDescriptor(), blob)
if err != nil {
- return nil, err
+ return err
}
- return localblob.New(blob.Digest().String(), refName, blob.MimeType(), global), nil
+ *refs = append(*refs, blob.Digest().String())
+ return nil
}
// AssureGlobalRef provides a global manifest for a local OCI Artifact.
diff --git a/api/ocm/extensions/repositories/genericocireg/config/type.go b/api/ocm/extensions/repositories/genericocireg/config/type.go
new file mode 100644
index 0000000000..137c1d2688
--- /dev/null
+++ b/api/ocm/extensions/repositories/genericocireg/config/type.go
@@ -0,0 +1,110 @@
+package config
+
+import (
+ "net"
+ "strings"
+
+ "ocm.software/ocm/api/config"
+ cfgcpi "ocm.software/ocm/api/config/cpi"
+ "ocm.software/ocm/api/utils/runtime"
+)
+
+const (
+ ConfigType = "blobLimits.ocireg.ocm" + cfgcpi.OCM_CONFIG_TYPE_SUFFIX
+ ConfigTypeV1 = ConfigType + runtime.VersionSeparator + "v1"
+)
+
+func init() {
+ cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigType, usage))
+ cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigTypeV1))
+}
+
+// Config describes a memory based config interface
+// for configuring blob limits for underlying OCI manifest layers.
+type Config struct {
+ runtime.ObjectVersionedType `json:",inline"`
+ // BlobLimits describe the limit setting for host:port
+ // entries. As a special case (for testing) it is possible
+ // to configure limits for CTF, also, by using "@"+filepath.
+ BlobLimits BlobLimits `json:"blobLimits"`
+}
+
+type BlobLimits map[string]int64
+
+func (b BlobLimits) GetLimit(hostport string) int64 {
+ if b == nil {
+ return -1
+ }
+ l, ok := b[hostport]
+ if ok {
+ return l
+ }
+
+ if !strings.HasPrefix(hostport, "@") {
+ host, _, err := net.SplitHostPort(hostport)
+ if err == nil {
+ l, ok = b[host]
+ if ok {
+ return l
+ }
+ }
+ }
+ return -1
+}
+
+type Configurable interface {
+ ConfigureBlobLimits(limits BlobLimits)
+}
+
+// New creates a blob limit ConfigSpec.
+func New() *Config {
+ return &Config{
+ ObjectVersionedType: runtime.NewVersionedTypedObject(ConfigType),
+ }
+}
+
+func (a *Config) GetType() string {
+ return ConfigType
+}
+
+func (a *Config) AddLimit(hostport string, limit int64) {
+ if a.BlobLimits == nil {
+ a.BlobLimits = BlobLimits{}
+ }
+ a.BlobLimits[hostport] = limit
+}
+
+func (a *Config) ApplyTo(ctx config.Context, target interface{}) error {
+ t, ok := target.(Configurable)
+ if !ok {
+ return config.ErrNoContext(ConfigType)
+ }
+ if a.BlobLimits != nil {
+ t.ConfigureBlobLimits(a.BlobLimits)
+ }
+ return nil
+}
+
+const usage = `
+The config type ` + ConfigType + `
can be used to set some
+blob layer limits for particular OCI registries used to host OCM repositories.
+The blobLimits
field maps a OCI registry address to the blob limit to use:
+
+
+ type: ` + ConfigType + ` + blobLimits: + dummy.io: 65564 + dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit ++ +If blob limits apply to a registry, local blobs with a size larger than +the configured limit will be split into several layers with a maximum +size of the given value. + +These settings can be overwritten by explicit settings in an OCM +repository specification for those repositories. + +The most specific entry will be used. If a registry with a dedicated +port is requested, but no explicit configuration is found, the +setting for the sole hostname is used (if configured). +` diff --git a/api/ocm/extensions/repositories/genericocireg/config_test.go b/api/ocm/extensions/repositories/genericocireg/config_test.go new file mode 100644 index 0000000000..2435497a81 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/config_test.go @@ -0,0 +1,62 @@ +package genericocireg_test + +import ( + "reflect" + + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "ocm.software/ocm/api/datacontext" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" + + "github.com/mandelsoft/goutils/finalizer" + "github.com/mandelsoft/vfs/pkg/osfs" + "github.com/mandelsoft/vfs/pkg/vfs" + "ocm.software/ocm/api/oci" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/ocm" + "ocm.software/ocm/api/ocm/cpi/repocpi" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg" + "ocm.software/ocm/api/utils/accessio" + "ocm.software/ocm/api/utils/accessobj" +) + +var _ = Describe("component repository mapping", func() { + var tempfs vfs.FileSystem + + var ocispec oci.RepositorySpec + var spec *genericocireg.RepositorySpec + + BeforeEach(func() { + t, err := osfs.NewTempFileSystem() + Expect(err).To(Succeed()) + tempfs = t + + // ocmlog.Context().AddRule(logging.NewConditionRule(logging.TraceLevel, accessio.ALLOC_REALM)) + + ocispec, err = ctf.NewRepositorySpec(accessobj.ACC_CREATE, "test", accessio.PathFileSystem(tempfs), accessobj.FormatDirectory) + Expect(err).To(Succeed()) + spec = genericocireg.NewRepositorySpec(ocispec, nil) + }) + + AfterEach(func() { + vfs.Cleanup(tempfs) + }) + + It("creates a dummy component with configured chunks", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + ctx := ocm.New(datacontext.MODE_EXTENDED) + + cfg := config.New() + cfg.AddLimit("@test", 5) + ctx.ConfigContext().ApplyConfig(cfg, "direct") + + repo := finalizer.ClosingWith(&finalize, Must(ctx.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + + Expect(impl.(*genericocireg.RepositoryImpl).GetBlobLimit()).To(Equal(int64(5))) + }) +}) diff --git a/api/ocm/extensions/repositories/genericocireg/repo_test.go b/api/ocm/extensions/repositories/genericocireg/repo_test.go index 5ea992c69a..a892bf0f9c 100644 --- a/api/ocm/extensions/repositories/genericocireg/repo_test.go +++ b/api/ocm/extensions/repositories/genericocireg/repo_test.go @@ -2,6 +2,7 @@ package genericocireg_test import ( "fmt" + "io" "path" "reflect" @@ -123,6 +124,156 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }) + const ref4 = "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08,sha256:3a6eb0790f39ac87c94f3856b2dd2c5d110e6811602261a9a923d3bb23adc8b7" + const ref5 = "sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d" + const ref8 = "sha256:" + ocmtesthelper.D_TESTDATA + + DescribeTable("creates a dummy component with chunks", func(limit int, f func(ocm.ResourceAccess), ref string) { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + repo := finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + repocpi.SetBlobLimit(impl, int64(limit)) + + comp := finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers := finalizer.ClosingWith(&finalize, Must(comp.NewVersion("v1"))) + + m1 := compdesc.NewResourceMeta("rsc1", resourcetypes.PLAIN_TEXT, metav1.LocalRelation) + blob := blobaccess.ForString(mime.MIME_TEXT, ocmtesthelper.S_TESTDATA) + MustBeSuccessful(vers.SetResourceBlob(m1, blob, "", nil)) + + MustBeSuccessful(comp.AddVersion(vers)) + + noref := vers.Repository() + Expect(noref).NotTo(BeNil()) + Expect(noref.IsClosed()).To(BeFalse()) + Expect(noref.Close()).To(Succeed()) + Expect(noref.IsClosed()).To(BeFalse()) + + MustBeSuccessful(finalize.Finalize()) + + Expect(noref.IsClosed()).To(BeTrue()) + Expect(noref.Close()).To(MatchError("closed")) + ExpectError(noref.LookupComponent("dummy")).To(MatchError("closed")) + + // access it again + repo = finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + + ok := Must(repo.ExistsComponentVersion(COMPONENT, "v1")) + Expect(ok).To(BeTrue()) + + comp = finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers = finalizer.ClosingWith(&finalize, Must(comp.LookupVersion("v1"))) + + rsc := Must(vers.GetResourceByIndex(0)) + acc := Must(rsc.Access()) + local, ok := acc.(*localblob.AccessSpec) + Expect(ok).To(BeTrue()) + // fmt.Printf("localref: %s\n", local.LocalReference) + Expect(local.LocalReference).To(Equal(ref)) + Expect(rsc.Meta().Digest).NotTo(BeNil()) + Expect(rsc.Meta().Digest.Value).To(Equal(ocmtesthelper.D_TESTDATA)) + + f(rsc) + + MustBeSuccessful(finalize.Finalize()) + }, + Entry("get blob", 5, func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob", 5, func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob with small buffer", 5, func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + + Entry("get blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob with small buffer (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + + Entry("get blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + Entry("stream blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + Entry("stream blob with small buffer (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + ) + It("handles legacylocalociblob access method", func() { var finalize finalizer.Finalizer defer Defer(finalize.Finalize) diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index 70661f76f5..8773350ea5 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -11,12 +11,16 @@ import ( "github.com/mandelsoft/goutils/general" "ocm.software/ocm/api/credentials" + "ocm.software/ocm/api/credentials/identity/hostpath" "ocm.software/ocm/api/datacontext" "ocm.software/ocm/api/oci" ocicpi "ocm.software/ocm/api/oci/cpi" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/oci/extensions/repositories/ocireg" "ocm.software/ocm/api/ocm/cpi" "ocm.software/ocm/api/ocm/cpi/repocpi" "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/componentmapping" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" ) type OCIBasedRepository interface { @@ -44,23 +48,69 @@ type RepositoryImpl struct { nonref cpi.Repository ocirepo oci.Repository readonly bool + // blobLimit is the size limit for layers maintained for the storage of localBlobs. + // The value -1 means an unconfigured value (a default from the blob limit configuration is used), + // a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), + // a value > 0 enabled the usage of the specified size. + blobLimit int64 } var ( _ repocpi.RepositoryImpl = (*RepositoryImpl)(nil) _ credentials.ConsumerIdentityProvider = (*RepositoryImpl)(nil) + _ config.Configurable = (*RepositoryImpl)(nil) ) -func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository) cpi.Repository { +// NewRepository creates a new OCM repository based on any OCI abstraction from +// the OCI context type. +// The optional blobLimit is the size limit for layers maintained for the storage of localBlobs. +// The value -1 means an unconfigured value (a default from the blob limit configuration is used), +// a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), +// a value > 0 enabled the usage of the specified size. +func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository, blobLimit ...int64) cpi.Repository { ctx := datacontext.InternalContextRef(ctxp.OCMContext()) + impl := &RepositoryImpl{ - ctx: ctx, - meta: *DefaultComponentRepositoryMeta(meta), - ocirepo: ocirepo, + ctx: ctx, + meta: *DefaultComponentRepositoryMeta(meta), + ocirepo: ocirepo, + blobLimit: general.OptionalDefaulted(-1, blobLimit...), + } + if impl.blobLimit < 0 { + ConfigureBlobLimits(ctxp.OCMContext(), impl) } return repocpi.NewRepository(impl, "OCM repo[OCI]") } +func (r *RepositoryImpl) ConfigureBlobLimits(limits config.BlobLimits) { + if len(limits) == 0 { + return + } + if spec, ok := r.ocirepo.GetSpecification().(*ocireg.RepositorySpec); ok { + id := spec.GetConsumerId() + hp := hostpath.HostPort(id) + l := limits.GetLimit(hp) + if l >= 0 { + r.blobLimit = l + } + } + if spec, ok := r.ocirepo.GetSpecification().(*ctf.RepositorySpec); ok { + l := limits.GetLimit("@" + spec.FilePath) + if l >= 0 { + r.blobLimit = l + } + } +} + +func (r *RepositoryImpl) SetBlobLimit(s int64) bool { + r.blobLimit = s + return true +} + +func (r *RepositoryImpl) GetBlobLimit() int64 { + return r.blobLimit +} + func (r *RepositoryImpl) Close() error { return r.ocirepo.Close() } diff --git a/api/ocm/extensions/repositories/genericocireg/type.go b/api/ocm/extensions/repositories/genericocireg/type.go index 809d598140..b3c4b460e5 100644 --- a/api/ocm/extensions/repositories/genericocireg/type.go +++ b/api/ocm/extensions/repositories/genericocireg/type.go @@ -91,6 +91,7 @@ func NewComponentRepositoryMeta(subPath string, mapping ComponentNameMapping) *C type RepositorySpec struct { oci.RepositorySpec ComponentRepositoryMeta + BlobLimit *int64 } var ( @@ -127,19 +128,28 @@ func (a *RepositorySpec) AsUniformSpec(cpi.Context) *cpi.UniformRepositorySpec { return &cpi.UniformRepositorySpec{Type: a.GetKind(), Scheme: spec.Scheme, Host: spec.Host, Info: spec.Info, TypeHint: spec.TypeHint, SubPath: a.SubPath} } +type meta struct { + ComponentRepositoryMeta `json:",inline"` + BlobLimit *int64 `json:"blobLimit,omitempty"` +} + func (u *RepositorySpec) UnmarshalJSON(data []byte) error { logrus.Debugf("unmarshal generic ocireg spec %s\n", string(data)) ocispec := &oci.GenericRepositorySpec{} if err := json.Unmarshal(data, ocispec); err != nil { return err } - compmeta := &ComponentRepositoryMeta{} - if err := json.Unmarshal(data, ocispec); err != nil { + + var m meta + if err := json.Unmarshal(data, &m); err != nil { return err } u.RepositorySpec = ocispec - u.ComponentRepositoryMeta = *compmeta + u.ComponentRepositoryMeta = m.ComponentRepositoryMeta + if m.BlobLimit != nil { + u.BlobLimit = m.BlobLimit + } normalizers.Normalize(u) return nil @@ -154,7 +164,12 @@ func (u RepositorySpec) MarshalJSON() ([]byte, error) { if err != nil { return nil, err } - compmeta, err := runtime.ToUnstructuredObject(u.ComponentRepositoryMeta) + + m := meta{ + ComponentRepositoryMeta: u.ComponentRepositoryMeta, + BlobLimit: u.BlobLimit, + } + compmeta, err := runtime.ToUnstructuredObject(&m) if err != nil { return nil, err } @@ -166,6 +181,9 @@ func (s *RepositorySpec) Repository(ctx cpi.Context, creds credentials.Credentia if err != nil { return nil, err } + if s.BlobLimit != nil { + return NewRepository(ctx, &s.ComponentRepositoryMeta, r, *s.BlobLimit), nil + } return NewRepository(ctx, &s.ComponentRepositoryMeta, r), nil } diff --git a/api/tech/oras/client.go b/api/tech/oras/client.go index a73e0003a9..3cde633d6a 100644 --- a/api/tech/oras/client.go +++ b/api/tech/oras/client.go @@ -4,11 +4,10 @@ import ( "context" "errors" "fmt" - "io" - "strings" - "sync" "github.com/containerd/containerd/errdefs" + "github.com/mandelsoft/logging" + "github.com/moby/locker" ociv1 "github.com/opencontainers/image-spec/specs-go/v1" oraserr "oras.land/oras-go/v2/errdef" "oras.land/oras-go/v2/registry/remote" @@ -18,55 +17,37 @@ import ( type ClientOptions struct { Client *auth.Client PlainHTTP bool + Logger logging.Logger + Lock *locker.Locker } type Client struct { client *auth.Client plainHTTP bool - ref string - mu sync.RWMutex + logger logging.Logger + lock *locker.Locker } -var ( - _ Resolver = &Client{} - _ Fetcher = &Client{} - _ Pusher = &Client{} - _ Lister = &Client{} -) +var _ Resolver = &Client{} func New(opts ClientOptions) *Client { - return &Client{client: opts.Client, plainHTTP: opts.PlainHTTP} + return &Client{client: opts.Client, plainHTTP: opts.PlainHTTP, logger: opts.Logger, lock: opts.Lock} } func (c *Client) Fetcher(ctx context.Context, ref string) (Fetcher, error) { - c.mu.Lock() - defer c.mu.Unlock() - - c.ref = ref - return c, nil + return &OrasFetcher{client: c.client, ref: ref, plainHTTP: c.plainHTTP}, nil } func (c *Client) Pusher(ctx context.Context, ref string) (Pusher, error) { - c.mu.Lock() - defer c.mu.Unlock() - - c.ref = ref - return c, nil + return &OrasPusher{client: c.client, ref: ref, plainHTTP: c.plainHTTP, lock: c.lock}, nil } func (c *Client) Lister(ctx context.Context, ref string) (Lister, error) { - c.mu.Lock() - defer c.mu.Unlock() - - c.ref = ref - return c, nil + return &OrasLister{client: c.client, ref: ref, plainHTTP: c.plainHTTP}, nil } func (c *Client) Resolve(ctx context.Context, ref string) (string, ociv1.Descriptor, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - src, err := c.createRepository(ref) + src, err := createRepository(ref, c.client, c.plainHTTP) if err != nil { return "", ociv1.Descriptor{}, err } @@ -88,114 +69,16 @@ func (c *Client) Resolve(ctx context.Context, ref string) (string, ociv1.Descrip return "", desc, nil } -func (c *Client) Push(ctx context.Context, d ociv1.Descriptor, src Source) error { - c.mu.RLock() - defer c.mu.RUnlock() - - reader, err := src.Reader() - if err != nil { - return err - } - - repository, err := c.createRepository(c.ref) - if err != nil { - return err - } - - if split := strings.Split(c.ref, ":"); len(split) == 2 { - // Once we get a reference that contains a tag, we need to re-push that - // layer with the reference included. PushReference then will tag - // that layer resulting in the created tag pointing to the right - // blob data. - if err := repository.PushReference(ctx, d, reader, c.ref); err != nil { - return fmt.Errorf("failed to push tag: %w", err) - } - - return nil - } - - // We have a digest, so we use plain push for the digest. - // Push here decides if it's a Manifest or a Blob. - if err := repository.Push(ctx, d, reader); err != nil { - return fmt.Errorf("failed to push: %w, %s", err, c.ref) - } - - return nil -} - -func (c *Client) Fetch(ctx context.Context, desc ociv1.Descriptor) (io.ReadCloser, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - src, err := c.createRepository(c.ref) - if err != nil { - return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) - } - - // oras requires a Resolve to happen before a fetch because - // -1 is an invalid size and results in a content-length mismatch error by design. - // This is a security consideration on ORAS' side. - // manifest is not set in the descriptor - // We explicitly call resolve on manifest first because it might be - // that the mediatype is not set at this point so we don't want ORAS to try to - // select the wrong layer to fetch from. - rdesc, err := src.Manifests().Resolve(ctx, desc.Digest.String()) - if errors.Is(err, oraserr.ErrNotFound) { - rdesc, err = src.Blobs().Resolve(ctx, desc.Digest.String()) - if err != nil { - return nil, fmt.Errorf("failed to resolve fetch blob %q: %w", desc.Digest.String(), err) - } - - delayer := func() (io.ReadCloser, error) { - return src.Blobs().Fetch(ctx, rdesc) - } - - return newDelayedReader(delayer) - } - - if err != nil { - return nil, fmt.Errorf("failed to resolve fetch manifest %q: %w", desc.Digest.String(), err) - } - - // lastly, try a manifest fetch. - fetch, err := src.Fetch(ctx, rdesc) - if err != nil { - return nil, fmt.Errorf("failed to fetch manifest: %w", err) - } - - return fetch, err -} - -func (c *Client) List(ctx context.Context) ([]string, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - src, err := c.createRepository(c.ref) - if err != nil { - return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) - } - - var result []string - if err := src.Tags(ctx, "", func(tags []string) error { - result = append(result, tags...) - return nil - }); err != nil { - return nil, fmt.Errorf("failed to list tags: %w", err) - } - - return result, nil -} - // createRepository creates a new repository representation using the passed in ref. // This is a cheap operation. -func (c *Client) createRepository(ref string) (*remote.Repository, error) { +func createRepository(ref string, client *auth.Client, plain bool) (*remote.Repository, error) { src, err := remote.NewRepository(ref) if err != nil { return nil, fmt.Errorf("failed to create new repository: %w", err) } - src.Client = c.client // set up authenticated client. - src.PlainHTTP = c.plainHTTP + src.Client = client // set up authenticated client. + src.PlainHTTP = plain return src, nil } diff --git a/api/tech/oras/delayed_reader.go b/api/tech/oras/delayed_reader.go deleted file mode 100644 index d07a6f870b..0000000000 --- a/api/tech/oras/delayed_reader.go +++ /dev/null @@ -1,57 +0,0 @@ -package oras - -import ( - "io" -) - -// delayedReader sets up a reader that only fetches a blob -// upon explicit reading request, otherwise, it stores the -// way of getting the reader. -type delayedReader struct { - open func() (io.ReadCloser, error) - rc io.ReadCloser - closed bool -} - -func newDelayedReader(open func() (io.ReadCloser, error)) (*delayedReader, error) { - return &delayedReader{ - open: open, - }, nil -} - -func (d *delayedReader) Read(p []byte) (n int, err error) { - if d.closed { - return 0, io.EOF - } - - reader, err := d.reader() - if err != nil { - return 0, err - } - - return reader.Read(p) -} - -func (d *delayedReader) reader() (io.ReadCloser, error) { - if d.rc != nil { - return d.rc, nil - } - - rc, err := d.open() - if err != nil { - return nil, err - } - - d.rc = rc - return rc, nil -} - -func (d *delayedReader) Close() error { - if d.closed { - return nil - } - - // we close regardless of an error - d.closed = true - return d.rc.Close() -} diff --git a/api/tech/oras/fetcher.go b/api/tech/oras/fetcher.go new file mode 100644 index 0000000000..a12df685fc --- /dev/null +++ b/api/tech/oras/fetcher.go @@ -0,0 +1,88 @@ +package oras + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + + ociv1 "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2/registry/remote" + "oras.land/oras-go/v2/registry/remote/auth" +) + +type OrasFetcher struct { + client *auth.Client + ref string + plainHTTP bool + mu sync.Mutex +} + +func (c *OrasFetcher) Fetch(ctx context.Context, desc ociv1.Descriptor) (io.ReadCloser, error) { + c.mu.Lock() + defer c.mu.Unlock() + + src, err := createRepository(c.ref, c.client, c.plainHTTP) + if err != nil { + return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) + } + + // oras requires a Resolve to happen in some cases before a fetch because + // -1 or 0 are invalid sizes and result in a content-length mismatch error by design. + // This is a security consideration on ORAS' side. + // For more information (https://github.com/oras-project/oras-go/issues/822#issuecomment-2325622324) + // + // To workaround, we resolve the correct size + if desc.Size < 1 { + if desc, err = c.resolveDescriptor(ctx, desc, src); err != nil { + return nil, err + } + } + + // manifest resolve succeeded return the reader directly + // mediatype of the descriptor should now be set to the correct type. + reader, err := src.Fetch(ctx, desc) + if err != nil { + return nil, fmt.Errorf("failed to fetch blob: %w", err) + } + + return reader, nil +} + +// resolveDescriptor resolves the descriptor by fetching the blob or manifest based on the digest as a reference. +// If the descriptor has a media type, it will be resolved directly. +// If the descriptor has no media type, it will first try to resolve the blob, then the manifest as a fallback. +func (c *OrasFetcher) resolveDescriptor(ctx context.Context, desc ociv1.Descriptor, src *remote.Repository) (ociv1.Descriptor, error) { + if desc.MediaType != "" { + var err error + // if there is a media type, resolve the descriptor directly + if isManifest(src.ManifestMediaTypes, desc) { + desc, err = src.Manifests().Resolve(ctx, desc.Digest.String()) + } else { + desc, err = src.Blobs().Resolve(ctx, desc.Digest.String()) + } + if err != nil { + return ociv1.Descriptor{}, fmt.Errorf("failed to resolve descriptor %q: %w", desc.Digest.String(), err) + } + return desc, nil + } + + // if there is no media type, first try the blob, then the manifest + // To reader: DO NOT fetch manifest first, this can result in high latency calls + bdesc, err := src.Blobs().Resolve(ctx, desc.Digest.String()) + if err != nil { + mdesc, merr := src.Manifests().Resolve(ctx, desc.Digest.String()) + if merr != nil { + // also display the first manifest resolve error + err = errors.Join(err, merr) + + return ociv1.Descriptor{}, fmt.Errorf("failed to resolve manifest %q: %w", desc.Digest.String(), err) + } + desc = mdesc + } else { + desc = bdesc + } + + return desc, err +} diff --git a/api/tech/oras/lister.go b/api/tech/oras/lister.go new file mode 100644 index 0000000000..ba74568e45 --- /dev/null +++ b/api/tech/oras/lister.go @@ -0,0 +1,31 @@ +package oras + +import ( + "context" + "fmt" + + "oras.land/oras-go/v2/registry/remote/auth" +) + +type OrasLister struct { + client *auth.Client + ref string + plainHTTP bool +} + +func (c *OrasLister) List(ctx context.Context) ([]string, error) { + src, err := createRepository(c.ref, c.client, c.plainHTTP) + if err != nil { + return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) + } + + var result []string + if err := src.Tags(ctx, "", func(tags []string) error { + result = append(result, tags...) + return nil + }); err != nil { + return nil, fmt.Errorf("failed to list tags: %w", err) + } + + return result, nil +} diff --git a/api/tech/oras/manifest.go b/api/tech/oras/manifest.go new file mode 100644 index 0000000000..cc473d87ff --- /dev/null +++ b/api/tech/oras/manifest.go @@ -0,0 +1,27 @@ +package oras + +import ( + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// defaultManifestMediaTypes contains the default set of manifests media types. +var defaultManifestMediaTypes = []string{ + "application/vnd.docker.distribution.manifest.v2+json", + "application/vnd.docker.distribution.manifest.list.v2+json", + "application/vnd.oci.artifact.manifest.v1+json", + ocispec.MediaTypeImageManifest, + ocispec.MediaTypeImageIndex, +} + +// isManifest determines if the given descriptor points to a manifest. +func isManifest(manifestMediaTypes []string, desc ocispec.Descriptor) bool { + if len(manifestMediaTypes) == 0 { + manifestMediaTypes = defaultManifestMediaTypes + } + for _, mediaType := range manifestMediaTypes { + if desc.MediaType == mediaType { + return true + } + } + return false +} diff --git a/api/tech/oras/pusher.go b/api/tech/oras/pusher.go new file mode 100644 index 0000000000..0372c13ebd --- /dev/null +++ b/api/tech/oras/pusher.go @@ -0,0 +1,76 @@ +package oras + +import ( + "context" + "fmt" + + "github.com/containerd/errdefs" + "github.com/moby/locker" + ociv1 "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2/registry" + "oras.land/oras-go/v2/registry/remote/auth" + + "ocm.software/ocm/api/oci/ociutils" +) + +type OrasPusher struct { + client *auth.Client + ref string + plainHTTP bool + lock *locker.Locker +} + +func (c *OrasPusher) Push(ctx context.Context, d ociv1.Descriptor, src Source) (retErr error) { + c.lock.Lock(c.ref) + defer c.lock.Unlock(c.ref) + + reader, err := src.Reader() + if err != nil { + return err + } + defer func() { + reader.Close() + }() + + repository, err := createRepository(c.ref, c.client, c.plainHTTP) + if err != nil { + return err + } + + ref, err := registry.ParseReference(c.ref) + if err != nil { + return fmt.Errorf("failed to parse reference %q: %w", c.ref, err) + } + + vers, err := ociutils.ParseVersion(ref.Reference) + if err != nil { + return fmt.Errorf("failed to parse version %q: %w", ref.Reference, err) + } + + if vers.IsTagged() { + // Once we get a reference that contains a tag, we need to re-push that + // layer with the reference included. PushReference then will tag + // that layer resulting in the created tag pointing to the right + // blob data. + if err := repository.PushReference(ctx, d, reader, c.ref); err != nil { + return fmt.Errorf("failed to push tag: %w", err) + } + + return nil + } + + ok, err := repository.Exists(ctx, d) + if err != nil { + return fmt.Errorf("failed to check if repository %q exists: %w", ref.Repository, err) + } + + if ok { + return errdefs.ErrAlreadyExists + } + + if err := repository.Push(ctx, d, reader); err != nil { + return fmt.Errorf("failed to push: %w, %s", err, c.ref) + } + + return nil +} diff --git a/docs/reference/ocm_configfile.md b/docs/reference/ocm_configfile.md index ae9e51920d..4f5282064d 100644 --- a/docs/reference/ocm_configfile.md +++ b/docs/reference/ocm_configfile.md @@ -24,6 +24,28 @@ The following configuration types are supported: <name>: <yaml defining the attribute> ... +-
blobLimits.ocireg.ocm.config.ocm.software
+ The config type blobLimits.ocireg.ocm.config.ocm.software
can be used to set some
+ blob layer limits for particular OCI registries used to host OCM repositories.
+ The blobLimits
field maps a OCI registry address to the blob limit to use:
+
+ + type: blobLimits.ocireg.ocm.config.ocm.software + blobLimits: + dummy.io: 65564 + dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit ++ + If blob limits apply to a registry, local blobs with a size larger than + the configured limit will be split into several layers with a maximum + size of the given value. + + These settings can be overwritten by explicit settings in an OCM + repository specification for those repositories. + + The most specific entry will be used. If a registry with a dedicated + port is requested, but no explicit configuration is found, the + setting for the sole hostname is used (if configured). -
cli.ocm.config.ocm.software
The config type cli.ocm.config.ocm.software
is used to handle the
main configuration flags of the OCM command line tool.
diff --git a/flake.nix b/flake.nix
index 0f18a3344e..653312c44d 100644
--- a/flake.nix
+++ b/flake.nix
@@ -35,7 +35,7 @@
state = if (self ? rev) then "clean" else "dirty";
# This vendorHash represents a derivative of all go.mod dependencies and needs to be adjusted with every change
- vendorHash = "sha256-3h/k5p/simo4GtVm14mTBD5/4CG7QC8bfo48C6CuCbY=";
+ vendorHash = "sha256-hDIJjccA6G34X0AAGsPDjZuWfgW0BeiGerv6JGb4Sq8=";
src = ./.;
diff --git a/go.mod b/go.mod
index 10e02fada5..2eab77b612 100644
--- a/go.mod
+++ b/go.mod
@@ -49,6 +49,7 @@ require (
github.com/mikefarah/yq/v4 v4.44.6
github.com/mitchellh/copystructure v1.2.0
github.com/mittwald/go-helm-client v0.12.15
+ github.com/moby/locker v1.0.1
github.com/modern-go/reflect2 v1.0.2
github.com/onsi/ginkgo/v2 v2.22.0
github.com/onsi/gomega v1.36.1
@@ -258,7 +259,6 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
- github.com/moby/locker v1.0.1 // indirect
github.com/moby/spdystream v0.5.0 // indirect
github.com/moby/sys/capability v0.3.0 // indirect
github.com/moby/sys/mountinfo v0.7.2 // indirect