diff --git a/api/oci/extensions/repositories/ocireg/blobs.go b/api/oci/extensions/repositories/ocireg/blobs.go index 0ecf1b299d..d8491c266f 100644 --- a/api/oci/extensions/repositories/ocireg/blobs.go +++ b/api/oci/extensions/repositories/ocireg/blobs.go @@ -10,7 +10,7 @@ import ( "ocm.software/ocm/api/oci/cpi" "ocm.software/ocm/api/oci/extensions/attrs/cacheattr" - "ocm.software/ocm/api/tech/docker/resolve" + "ocm.software/ocm/api/tech/oras" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/blobaccess/blobaccess" ) @@ -23,20 +23,20 @@ type BlobContainer interface { type blobContainer struct { accessio.StaticAllocatable - fetcher resolve.Fetcher - pusher resolve.Pusher + fetcher oras.Fetcher + pusher oras.Pusher mime string } type BlobContainers struct { lock sync.Mutex cache accessio.BlobCache - fetcher resolve.Fetcher - pusher resolve.Pusher + fetcher oras.Fetcher + pusher oras.Pusher mimes map[string]BlobContainer } -func NewBlobContainers(ctx cpi.Context, fetcher remotes.Fetcher, pusher resolve.Pusher) *BlobContainers { +func NewBlobContainers(ctx cpi.Context, fetcher remotes.Fetcher, pusher oras.Pusher) *BlobContainers { return &BlobContainers{ cache: cacheattr.Get(ctx), fetcher: fetcher, @@ -73,7 +73,7 @@ func (c *BlobContainers) Release() error { return list.Result() } -func newBlobContainer(mime string, fetcher resolve.Fetcher, pusher resolve.Pusher) *blobContainer { +func newBlobContainer(mime string, fetcher oras.Fetcher, pusher oras.Pusher) *blobContainer { return &blobContainer{ mime: mime, fetcher: fetcher, @@ -81,7 +81,7 @@ func newBlobContainer(mime string, fetcher resolve.Fetcher, pusher resolve.Pushe } } -func NewBlobContainer(cache accessio.BlobCache, mime string, fetcher resolve.Fetcher, pusher resolve.Pusher) (BlobContainer, error) { +func NewBlobContainer(cache accessio.BlobCache, mime string, fetcher oras.Fetcher, pusher oras.Pusher) (BlobContainer, error) { c := newBlobContainer(mime, fetcher, pusher) if cache == nil { diff --git a/api/oci/extensions/repositories/ocireg/namespace.go b/api/oci/extensions/repositories/ocireg/namespace.go index 9ef8239979..dd3583d7b6 100644 --- a/api/oci/extensions/repositories/ocireg/namespace.go +++ b/api/oci/extensions/repositories/ocireg/namespace.go @@ -12,7 +12,7 @@ import ( "ocm.software/ocm/api/oci/cpi" "ocm.software/ocm/api/oci/cpi/support" "ocm.software/ocm/api/oci/extensions/actions/oci-repository-prepare" - "ocm.software/ocm/api/tech/docker/resolve" + "ocm.software/ocm/api/tech/oras" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/blobaccess/blobaccess" "ocm.software/ocm/api/utils/logging" @@ -22,10 +22,10 @@ import ( type NamespaceContainer struct { impl support.NamespaceAccessImpl repo *RepositoryImpl - resolver resolve.Resolver - lister resolve.Lister - fetcher resolve.Fetcher - pusher resolve.Pusher + resolver oras.Resolver + lister oras.Lister + fetcher oras.Fetcher + pusher oras.Pusher blobs *BlobContainers checked bool } @@ -69,7 +69,7 @@ func (n *NamespaceContainer) SetImplementation(impl support.NamespaceAccessImpl) n.impl = impl } -func (n *NamespaceContainer) getPusher(vers string) (resolve.Pusher, error) { +func (n *NamespaceContainer) getPusher(vers string) (oras.Pusher, error) { err := n.assureCreated() if err != nil { return nil, err diff --git a/api/oci/extensions/repositories/ocireg/repository.go b/api/oci/extensions/repositories/ocireg/repository.go index bff1e51078..1f1ede981a 100644 --- a/api/oci/extensions/repositories/ocireg/repository.go +++ b/api/oci/extensions/repositories/ocireg/repository.go @@ -8,18 +8,18 @@ import ( "path" "strings" - "github.com/containerd/containerd/remotes/docker/config" "github.com/containerd/errdefs" "github.com/mandelsoft/goutils/errors" "github.com/mandelsoft/logging" + "oras.land/oras-go/v2/registry/remote/auth" + "oras.land/oras-go/v2/registry/remote/retry" "ocm.software/ocm/api/credentials" "ocm.software/ocm/api/datacontext/attrs/rootcertsattr" "ocm.software/ocm/api/oci/artdesc" "ocm.software/ocm/api/oci/cpi" - "ocm.software/ocm/api/tech/docker" - "ocm.software/ocm/api/tech/docker/resolve" "ocm.software/ocm/api/tech/oci/identity" + "ocm.software/ocm/api/tech/oras" "ocm.software/ocm/api/utils" ocmlog "ocm.software/ocm/api/utils/logging" "ocm.software/ocm/api/utils/refmgmt" @@ -114,7 +114,7 @@ func (r *RepositoryImpl) getCreds(comp string) (credentials.Credentials, error) return identity.GetCredentials(r.GetContext(), r.info.Locator, comp) } -func (r *RepositoryImpl) getResolver(comp string) (resolve.Resolver, error) { +func (r *RepositoryImpl) getResolver(comp string) (oras.Resolver, error) { creds, err := r.getCreds(comp) if err != nil { if !errors.IsErrUnknownKind(err, credentials.KIND_CONSUMER) { @@ -126,57 +126,53 @@ func (r *RepositoryImpl) getResolver(comp string) (resolve.Resolver, error) { logger.Trace("no credentials") } - opts := docker.ResolverOptions{ - Hosts: docker.ConvertHosts(config.ConfigureHosts(context.Background(), config.HostOptions{ - UpdateClient: func(client *http.Client) error { - // copy from http.DefaultTransport with a roundtripper injection - client.Transport = ocmlog.NewRoundTripper(client.Transport, logger) - return nil - }, - Credentials: func(host string) (string, string, error) { + authCreds := auth.Credential{} + if creds != nil { + pass := creds.GetProperty(credentials.ATTR_IDENTITY_TOKEN) + if pass == "" { + pass = creds.GetProperty(credentials.ATTR_PASSWORD) + } + authCreds.Username = creds.GetProperty(credentials.ATTR_USERNAME) + authCreds.Password = pass + } + + client := retry.DefaultClient + client.Transport = ocmlog.NewRoundTripper(retry.DefaultClient.Transport, logger) + if r.info.Scheme == "https" { + // set up TLS + //nolint:gosec // used like the default, there are OCI servers (quay.io) not working with min version. + conf := &tls.Config{ + // MinVersion: tls.VersionTLS13, + RootCAs: func() *x509.CertPool { + var rootCAs *x509.CertPool if creds != nil { - p := creds.GetProperty(credentials.ATTR_IDENTITY_TOKEN) - if p == "" { - p = creds.GetProperty(credentials.ATTR_PASSWORD) - } - pw := "" - if p != "" { - pw = "***" + c := creds.GetProperty(credentials.ATTR_CERTIFICATE_AUTHORITY) + if c != "" { + rootCAs = x509.NewCertPool() + rootCAs.AppendCertsFromPEM([]byte(c)) } - logger.Trace("query credentials", ocmlog.ATTR_USER, creds.GetProperty(credentials.ATTR_USERNAME), "pass", pw) - return creds.GetProperty(credentials.ATTR_USERNAME), p, nil } - logger.Trace("no credentials") - return "", "", nil - }, - DefaultScheme: r.info.Scheme, - //nolint:gosec // used like the default, there are OCI servers (quay.io) not working with min version. - DefaultTLS: func() *tls.Config { - if r.info.Scheme == "http" { - return nil - } - return &tls.Config{ - // MinVersion: tls.VersionTLS13, - RootCAs: func() *x509.CertPool { - var rootCAs *x509.CertPool - if creds != nil { - c := creds.GetProperty(credentials.ATTR_CERTIFICATE_AUTHORITY) - if c != "" { - rootCAs = x509.NewCertPool() - rootCAs.AppendCertsFromPEM([]byte(c)) - } - } - if rootCAs == nil { - rootCAs = rootcertsattr.Get(r.GetContext()).GetRootCertPool(true) - } - return rootCAs - }(), + if rootCAs == nil { + rootCAs = rootcertsattr.Get(r.GetContext()).GetRootCertPool(true) } + return rootCAs }(), - })), + } + client.Transport = ocmlog.NewRoundTripper(retry.NewTransport(&http.Transport{ + TLSClientConfig: conf, + }), logger) + } + + authClient := &auth.Client{ + Client: client, + Cache: auth.NewCache(), + Credential: auth.StaticCredential(r.info.HostPort(), authCreds), } - return docker.NewResolver(opts), nil + return oras.New(oras.ClientOptions{ + Client: authClient, + PlainHTTP: r.info.Scheme == "http", + }), nil } func (r *RepositoryImpl) GetRef(comp, vers string) string { diff --git a/api/oci/extensions/repositories/ocireg/utils.go b/api/oci/extensions/repositories/ocireg/utils.go index 17a96f040a..51f5994d17 100644 --- a/api/oci/extensions/repositories/ocireg/utils.go +++ b/api/oci/extensions/repositories/ocireg/utils.go @@ -14,7 +14,7 @@ import ( "ocm.software/ocm/api/oci/artdesc" "ocm.software/ocm/api/oci/cpi" - "ocm.software/ocm/api/tech/docker/resolve" + "ocm.software/ocm/api/tech/oras" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/blobaccess/blobaccess" "ocm.software/ocm/api/utils/logging" @@ -81,28 +81,29 @@ func readAll(reader io.ReadCloser, err error) ([]byte, error) { return data, nil } -func push(ctx context.Context, p resolve.Pusher, blob blobaccess.BlobAccess) error { +func push(ctx context.Context, p oras.Pusher, blob blobaccess.BlobAccess) error { desc := *artdesc.DefaultBlobDescriptor(blob) return pushData(ctx, p, desc, blob) } -func pushData(ctx context.Context, p resolve.Pusher, desc artdesc.Descriptor, data blobaccess.DataAccess) error { +func pushData(ctx context.Context, p oras.Pusher, desc artdesc.Descriptor, data blobaccess.DataAccess) error { key := remotes.MakeRefKey(ctx, desc) if desc.Size == 0 { desc.Size = -1 } logging.Logger().Debug("*** push blob", "mediatype", desc.MediaType, "digest", desc.Digest, "key", key) - req, err := p.Push(ctx, desc, data) - if err != nil { + if err := p.Push(ctx, desc, data); err != nil { if errdefs.IsAlreadyExists(err) { logging.Logger().Debug("blob already exists", "mediatype", desc.MediaType, "digest", desc.Digest) return nil } + return fmt.Errorf("failed to push: %w", err) } - return req.Commit(ctx, desc.Size, desc.Digest) + + return nil } var dummyContext = nologger() diff --git a/api/tech/docker/README.md b/api/tech/docker/README.md deleted file mode 100644 index 096a9c1e18..0000000000 --- a/api/tech/docker/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# containerd - -Taken from github.com/containerd/containerd remotes/docker to add list endpoints -Fix retry of requests with ResendBuffer diff --git a/api/tech/docker/errors/errors.go b/api/tech/docker/errors/errors.go deleted file mode 100644 index a158f75b5a..0000000000 --- a/api/tech/docker/errors/errors.go +++ /dev/null @@ -1,58 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package errors - -import ( - "fmt" - "io" - "net/http" -) - -var _ error = ErrUnexpectedStatus{} - -// ErrUnexpectedStatus is returned if a registry API request returned with unexpected HTTP status -type ErrUnexpectedStatus struct { - Status string - StatusCode int - Body []byte - RequestURL, RequestMethod string -} - -func (e ErrUnexpectedStatus) Error() string { - if len(e.Body) > 0 { - return fmt.Sprintf("unexpected status from %s request to %s: %s: %s", e.RequestMethod, e.RequestURL, e.Status, string(e.Body)) - } - return fmt.Sprintf("unexpected status from %s request to %s: %s", e.RequestMethod, e.RequestURL, e.Status) -} - -// NewUnexpectedStatusErr creates an ErrUnexpectedStatus from HTTP response -func NewUnexpectedStatusErr(resp *http.Response) error { - var b []byte - if resp.Body != nil { - b, _ = io.ReadAll(io.LimitReader(resp.Body, 64000)) // 64KB - } - err := ErrUnexpectedStatus{ - Body: b, - Status: resp.Status, - StatusCode: resp.StatusCode, - RequestMethod: resp.Request.Method, - } - if resp.Request.URL != nil { - err.RequestURL = resp.Request.URL.String() - } - return err -} diff --git a/api/tech/docker/fetcher.go b/api/tech/docker/fetcher.go deleted file mode 100644 index 4a2eec584e..0000000000 --- a/api/tech/docker/fetcher.go +++ /dev/null @@ -1,202 +0,0 @@ -package docker - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "strings" - - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" - "github.com/containerd/containerd/log" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/pkg/errors" - - "ocm.software/ocm/api/utils/accessio" -) - -type dockerFetcher struct { - *dockerBase -} - -func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) { - ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", desc.Digest)) - - hosts := r.filterHosts(HostCapabilityPull) - if len(hosts) == 0 { - return nil, errors.Wrap(errdefs.ErrNotFound, "no pull hosts") - } - - ctx, err := ContextWithRepositoryScope(ctx, r.refspec, false) - if err != nil { - return nil, err - } - - return newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) { - // firstly try fetch via external urls - for _, us := range desc.URLs { - ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", us)) - - u, err := url.Parse(us) - if err != nil { - log.G(ctx).WithError(err).Debug("failed to parse") - continue - } - if u.Scheme != "http" && u.Scheme != "https" { - log.G(ctx).Debug("non-http(s) alternative url is unsupported") - continue - } - log.G(ctx).Debug("trying alternative url") - - // Try this first, parse it - host := RegistryHost{ - Client: http.DefaultClient, - Host: u.Host, - Scheme: u.Scheme, - Path: u.Path, - Capabilities: HostCapabilityPull, - } - req := r.request(host, http.MethodGet) - // Strip namespace from base - req.path = u.Path - if u.RawQuery != "" { - req.path = req.path + "?" + u.RawQuery - } - - rc, err := r.open(ctx, req, desc.MediaType, offset) - if err != nil { - if errdefs.IsNotFound(err) { - continue // try one of the other urls. - } - - return nil, err - } - - return rc, nil - } - - // Try manifests endpoints for manifests types - switch desc.MediaType { - case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList, - images.MediaTypeDockerSchema1Manifest, - ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: - - var firstErr error - for _, host := range r.hosts { - req := r.request(host, http.MethodGet, "manifests", desc.Digest.String()) - if err := req.addNamespace(r.refspec.Hostname()); err != nil { - return nil, err - } - - rc, err := r.open(ctx, req, desc.MediaType, offset) - if err != nil { - // Store the error for referencing later - if firstErr == nil { - firstErr = err - } - continue // try another host - } - - return rc, nil - } - - return nil, firstErr - } - - // Finally use blobs endpoints - var firstErr error - for _, host := range r.hosts { - req := r.request(host, http.MethodGet, "blobs", desc.Digest.String()) - if err := req.addNamespace(r.refspec.Hostname()); err != nil { - return nil, err - } - - rc, err := r.open(ctx, req, desc.MediaType, offset) - if err != nil { - // Store the error for referencing later - if firstErr == nil { - firstErr = err - } - continue // try another host - } - - return rc, nil - } - - if errdefs.IsNotFound(firstErr) { - firstErr = errors.Wrapf(errdefs.ErrNotFound, - "could not fetch content descriptor %v (%v) from remote", - desc.Digest, desc.MediaType) - } - - return nil, firstErr - }) -} - -func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string, offset int64) (_ io.ReadCloser, retErr error) { - mt := "*/*" - if mediatype != "" { - mt = mediatype + ", " + mt - } - req.header.Set("Accept", mt) - - if offset > 0 { - // Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints - // will return the header without supporting the range. The content - // range must always be checked. - req.header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) - } - - resp, err := req.doWithRetries(ctx, nil) - if err != nil { - return nil, accessio.RetriableError(err) - } - defer func() { - if retErr != nil { - resp.Body.Close() - } - }() - - if resp.StatusCode > 299 { - // TODO(stevvooe): When doing a offset specific request, we should - // really distinguish between a 206 and a 200. In the case of 200, we - // can discard the bytes, hiding the seek behavior from the - // implementation. - - if resp.StatusCode == http.StatusNotFound { - return nil, errors.Wrapf(errdefs.ErrNotFound, "content at %v not found", req.String()) - } - var registryErr Errors - if err := json.NewDecoder(resp.Body).Decode(®istryErr); err != nil || registryErr.Len() < 1 { - return nil, errors.Errorf("unexpected status code %v: %v", req.String(), resp.Status) - } - return nil, errors.Errorf("unexpected status code %v: %s - Server message: %s", req.String(), resp.Status, registryErr.Error()) - } - if offset > 0 { - cr := resp.Header.Get("content-range") - if cr != "" { - if !strings.HasPrefix(cr, fmt.Sprintf("bytes %d-", offset)) { - return nil, errors.Errorf("unhandled content range in response: %v", cr) - } - } else { - // TODO: Should any cases where use of content range - // without the proper header be considered? - // 206 responses? - - // Discard up to offset - // Could use buffer pool here but this case should be rare - n, err := io.Copy(io.Discard, io.LimitReader(resp.Body, offset)) - if err != nil { - return nil, errors.Wrap(err, "failed to discard to offset") - } - if n != offset { - return nil, errors.Errorf("unable to discard to offset") - } - } - } - - return resp.Body, nil -} diff --git a/api/tech/docker/handler.go b/api/tech/docker/handler.go deleted file mode 100644 index 0ff9959ad3..0000000000 --- a/api/tech/docker/handler.go +++ /dev/null @@ -1,136 +0,0 @@ -package docker - -import ( - "context" - "fmt" - "net/url" - "strings" - - "github.com/containerd/containerd/content" - "github.com/containerd/containerd/images" - "github.com/containerd/containerd/labels" - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/reference" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" -) - -// labelDistributionSource describes the source blob comes from. -var labelDistributionSource = "containerd.io/distribution.source" - -// AppendDistributionSourceLabel updates the label of blob with distribution source. -func AppendDistributionSourceLabel(manager content.Manager, ref string) (images.HandlerFunc, error) { - refspec, err := reference.Parse(ref) - if err != nil { - return nil, err - } - - u, err := url.Parse("dummy://" + refspec.Locator) - if err != nil { - return nil, err - } - - source, repo := u.Hostname(), strings.TrimPrefix(u.Path, "/") - return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - info, err := manager.Info(ctx, desc.Digest) - if err != nil { - return nil, err - } - - key := distributionSourceLabelKey(source) - - originLabel := "" - if info.Labels != nil { - originLabel = info.Labels[key] - } - value := appendDistributionSourceLabel(originLabel, repo) - - // The repo name has been limited under 256 and the distribution - // label might hit the limitation of label size, when blob data - // is used as the very, very common layer. - if err := labels.Validate(key, value); err != nil { - log.G(ctx).Warnf("skip to append distribution label: %s", err) - return nil, nil - } - - info = content.Info{ - Digest: desc.Digest, - Labels: map[string]string{ - key: value, - }, - } - _, err = manager.Update(ctx, info, fmt.Sprintf("labels.%s", key)) - return nil, err - }, nil -} - -func appendDistributionSourceLabel(originLabel, repo string) string { - repos := []string{} - if originLabel != "" { - repos = strings.Split(originLabel, ",") - } - repos = append(repos, repo) - - // use empty string to present duplicate items - for i := 1; i < len(repos); i++ { - tmp, j := repos[i], i-1 - for ; j >= 0 && repos[j] >= tmp; j-- { - if repos[j] == tmp { - tmp = "" - } - repos[j+1] = repos[j] - } - repos[j+1] = tmp - } - - i := 0 - for ; i < len(repos) && repos[i] == ""; i++ { - } - - return strings.Join(repos[i:], ",") -} - -func distributionSourceLabelKey(source string) string { - return fmt.Sprintf("%s.%s", labelDistributionSource, source) -} - -// selectRepositoryMountCandidate will select the repo which has longest -// common prefix components as the candidate. -func selectRepositoryMountCandidate(refspec reference.Spec, sources map[string]string) string { - u, err := url.Parse("dummy://" + refspec.Locator) - if err != nil { - // NOTE: basically, it won't be error here - return "" - } - - source, target := u.Hostname(), strings.TrimPrefix(u.Path, "/") - repoLabel, ok := sources[distributionSourceLabelKey(source)] - if !ok || repoLabel == "" { - return "" - } - - n, match := 0, "" - components := strings.Split(target, "/") - for _, repo := range strings.Split(repoLabel, ",") { - // the target repo is not a candidate - if repo == target { - continue - } - - if l := commonPrefixComponents(components, repo); l >= n { - n, match = l, repo - } - } - return match -} - -func commonPrefixComponents(components []string, target string) int { - targetComponents := strings.Split(target, "/") - - i := 0 - for ; i < len(components) && i < len(targetComponents); i++ { - if components[i] != targetComponents[i] { - break - } - } - return i -} diff --git a/api/tech/docker/httpreadseeker.go b/api/tech/docker/httpreadseeker.go deleted file mode 100644 index c6b803810b..0000000000 --- a/api/tech/docker/httpreadseeker.go +++ /dev/null @@ -1,157 +0,0 @@ -package docker - -import ( - "bytes" - "io" - - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/log" - "github.com/pkg/errors" -) - -const maxRetry = 3 - -type httpReadSeeker struct { - size int64 - offset int64 - rc io.ReadCloser - open func(offset int64) (io.ReadCloser, error) - closed bool - - errsWithNoProgress int -} - -func newHTTPReadSeeker(size int64, open func(offset int64) (io.ReadCloser, error)) (io.ReadCloser, error) { - return &httpReadSeeker{ - size: size, - open: open, - }, nil -} - -func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { - if hrs.closed { - return 0, io.EOF - } - - rd, err := hrs.reader() - if err != nil { - return 0, err - } - - n, err = rd.Read(p) - hrs.offset += int64(n) - if n > 0 || err == nil { - hrs.errsWithNoProgress = 0 - } - - if !errors.Is(err, io.ErrUnexpectedEOF) { - return - } - // connection closed unexpectedly. try reconnecting. - if n == 0 { - hrs.errsWithNoProgress++ - if hrs.errsWithNoProgress > maxRetry { - return // too many retries for this offset with no progress - } - } - - if hrs.rc != nil { - if clsErr := hrs.rc.Close(); clsErr != nil { - log.L.WithError(clsErr).Errorf("httpReadSeeker: failed to close ReadCloser") - } - hrs.rc = nil - } - - if _, err2 := hrs.reader(); err2 == nil { - return n, nil - } - - return n, err -} - -func (hrs *httpReadSeeker) Close() error { - if hrs.closed { - return nil - } - hrs.closed = true - if hrs.rc != nil { - return hrs.rc.Close() - } - - return nil -} - -func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { - if hrs.closed { - return 0, errors.Wrap(errdefs.ErrUnavailable, "Fetcher.Seek: closed") - } - - abs := hrs.offset - switch whence { - case io.SeekStart: - abs = offset - case io.SeekCurrent: - abs += offset - case io.SeekEnd: - if hrs.size == -1 { - return 0, errors.Wrap(errdefs.ErrUnavailable, "Fetcher.Seek: unknown size, cannot seek from end") - } - abs = hrs.size + offset - default: - return 0, errors.Wrap(errdefs.ErrInvalidArgument, "Fetcher.Seek: invalid whence") - } - - if abs < 0 { - return 0, errors.Wrapf(errdefs.ErrInvalidArgument, "Fetcher.Seek: negative offset") - } - - if abs != hrs.offset { - if hrs.rc != nil { - if err := hrs.rc.Close(); err != nil { - log.L.WithError(err).Errorf("Fetcher.Seek: failed to close ReadCloser") - } - - hrs.rc = nil - } - - hrs.offset = abs - } - - return hrs.offset, nil -} - -func (hrs *httpReadSeeker) reader() (io.Reader, error) { - if hrs.rc != nil { - return hrs.rc, nil - } - - if hrs.size == -1 || hrs.offset < hrs.size { - // only try to reopen the body request if we are seeking to a value - // less than the actual size. - if hrs.open == nil { - return nil, errors.Wrapf(errdefs.ErrNotImplemented, "cannot open") - } - - rc, err := hrs.open(hrs.offset) - if err != nil { - return nil, errors.Wrapf(err, "httpReadSeeker: failed open") - } - - if hrs.rc != nil { - if err := hrs.rc.Close(); err != nil { - log.L.WithError(err).Errorf("httpReadSeeker: failed to close ReadCloser") - } - } - hrs.rc = rc - } else { - // There is an edge case here where offset == size of the content. If - // we seek, we will probably get an error for content that cannot be - // sought (?). In that case, we should err on committing the content, - // as the length is already satisfied but we just return the empty - // reader instead. - - hrs.rc = io.NopCloser(bytes.NewReader([]byte{})) - } - - return hrs.rc, nil -} diff --git a/api/tech/docker/lister.go b/api/tech/docker/lister.go deleted file mode 100644 index efd3b8e1e2..0000000000 --- a/api/tech/docker/lister.go +++ /dev/null @@ -1,130 +0,0 @@ -package docker - -import ( - "context" - "encoding/json" - "io" - "net/http" - - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/log" - "github.com/pkg/errors" - - "ocm.software/ocm/api/tech/docker/resolve" -) - -var ErrObjectNotRequired = errors.New("object not required") - -type TagList struct { - Name string `json:"name"` - Tags []string `json:"tags"` -} - -type dockerLister struct { - dockerBase *dockerBase -} - -func (r *dockerResolver) Lister(ctx context.Context, ref string) (resolve.Lister, error) { - base, err := r.resolveDockerBase(ref) - if err != nil { - return nil, err - } - if base.refspec.Object != "" { - return nil, ErrObjectNotRequired - } - - return &dockerLister{ - dockerBase: base, - }, nil -} - -func (r *dockerLister) List(ctx context.Context) ([]string, error) { - refspec := r.dockerBase.refspec - base := r.dockerBase - var ( - firstErr error - paths [][]string - caps = HostCapabilityPull - ) - - // turns out, we have a valid digest, make a url. - paths = append(paths, []string{"tags/list"}) - caps |= HostCapabilityResolve - - hosts := base.filterHosts(caps) - if len(hosts) == 0 { - return nil, errors.Wrap(errdefs.ErrNotFound, "no list hosts") - } - - ctx, err := ContextWithRepositoryScope(ctx, refspec, false) - if err != nil { - return nil, err - } - - for _, u := range paths { - for _, host := range hosts { - ctxWithLogger := log.WithLogger(ctx, log.G(ctx).WithField("host", host.Host)) - - req := base.request(host, http.MethodGet, u...) - if err := req.addNamespace(base.refspec.Hostname()); err != nil { - return nil, err - } - - req.header["Accept"] = []string{"application/json"} - - log.G(ctxWithLogger).Debug("listing") - resp, err := req.doWithRetries(ctxWithLogger, nil) - if err != nil { - if errors.Is(err, ErrInvalidAuthorization) { - err = errors.Wrapf(err, "pull access denied, repository does not exist or may require authorization") - } - // Store the error for referencing later - if firstErr == nil { - firstErr = err - } - log.G(ctxWithLogger).WithError(err).Info("trying next host") - continue // try another host - } - - if resp.StatusCode > 299 { - resp.Body.Close() - if resp.StatusCode == http.StatusNotFound { - log.G(ctxWithLogger).Info("trying next host - response was http.StatusNotFound") - continue - } - if resp.StatusCode > 399 { - // Set firstErr when encountering the first non-404 status code. - if firstErr == nil { - firstErr = errors.Errorf("pulling from host %s failed with status code %v: %v", host.Host, u, resp.Status) - } - continue // try another host - } - return nil, errors.Errorf("taglist from host %s failed with unexpected status code %v: %v", host.Host, u, resp.Status) - } - - data, err := io.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - return nil, err - } - - tags := &TagList{} - - err = json.Unmarshal(data, tags) - if err != nil { - return nil, err - } - return tags.Tags, nil - } - } - - // If above loop terminates without return, then there was an error. - // "firstErr" contains the first non-404 error. That is, "firstErr == nil" - // means that either no registries were given or each registry returned 404. - - if firstErr == nil { - firstErr = errors.Wrap(errdefs.ErrNotFound, base.refspec.Locator) - } - - return nil, firstErr -} diff --git a/api/tech/docker/orig.go b/api/tech/docker/orig.go deleted file mode 100644 index c9b2468fba..0000000000 --- a/api/tech/docker/orig.go +++ /dev/null @@ -1,44 +0,0 @@ -package docker - -import ( - "github.com/containerd/containerd/remotes/docker" -) - -var ( - ContextWithRepositoryScope = docker.ContextWithRepositoryScope - ContextWithAppendPullRepositoryScope = docker.ContextWithAppendPullRepositoryScope - NewInMemoryTracker = docker.NewInMemoryTracker - NewDockerAuthorizer = docker.NewDockerAuthorizer - WithAuthClient = docker.WithAuthClient - WithAuthHeader = docker.WithAuthHeader - WithAuthCreds = docker.WithAuthCreds -) - -type ( - Errors = docker.Errors - StatusTracker = docker.StatusTracker - Status = docker.Status - StatusTrackLocker = docker.StatusTrackLocker -) - -func ConvertHosts(hosts docker.RegistryHosts) RegistryHosts { - return func(host string) ([]RegistryHost, error) { - list, err := hosts(host) - if err != nil { - return nil, err - } - result := make([]RegistryHost, len(list)) - for i, v := range list { - result[i] = RegistryHost{ - Client: v.Client, - Authorizer: v.Authorizer, - Host: v.Host, - Scheme: v.Scheme, - Path: v.Path, - Capabilities: HostCapabilities(v.Capabilities), - Header: v.Header, - } - } - return result, nil - } -} diff --git a/api/tech/docker/pusher.go b/api/tech/docker/pusher.go deleted file mode 100644 index 708ad0f349..0000000000 --- a/api/tech/docker/pusher.go +++ /dev/null @@ -1,433 +0,0 @@ -package docker - -import ( - "context" - "io" - "net/http" - "net/url" - "strings" - "time" - - "github.com/containerd/containerd/content" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/remotes" - "github.com/opencontainers/go-digest" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - - remoteserrors "ocm.software/ocm/api/tech/docker/errors" - "ocm.software/ocm/api/tech/docker/resolve" - "ocm.software/ocm/api/utils/accessio" -) - -func init() { - l := logrus.New() - l.Level = logrus.WarnLevel - log.L = logrus.NewEntry(l) -} - -type dockerPusher struct { - *dockerBase - object string - - // TODO: namespace tracker - tracker StatusTracker -} - -func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor, src resolve.Source) (resolve.PushRequest, error) { - return p.push(ctx, desc, src, remotes.MakeRefKey(ctx, desc), false) -} - -func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, src resolve.Source, ref string, unavailableOnFail bool) (resolve.PushRequest, error) { - if l, ok := p.tracker.(StatusTrackLocker); ok { - l.Lock(ref) - defer l.Unlock(ref) - } - ctx, err := ContextWithRepositoryScope(ctx, p.refspec, true) - if err != nil { - return nil, err - } - status, err := p.tracker.GetStatus(ref) - if err == nil { - if status.Committed && status.Offset == status.Total { - return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "ref %v", ref) - } - if unavailableOnFail { - // Another push of this ref is happening elsewhere. The rest of function - // will continue only when `errdefs.IsNotFound(err) == true` (i.e. there - // is no actively-tracked ref already). - return nil, errors.Wrap(errdefs.ErrUnavailable, "push is on-going") - } - // TODO: Handle incomplete status - } else if !errdefs.IsNotFound(err) { - return nil, errors.Wrap(err, "failed to get status") - } - - hosts := p.filterHosts(HostCapabilityPush) - if len(hosts) == 0 { - return nil, errors.Wrap(errdefs.ErrNotFound, "no push hosts") - } - - var ( - isManifest bool - existCheck []string - host = hosts[0] - ) - - switch desc.MediaType { - case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList, - ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: - isManifest = true - existCheck = getManifestPath(p.object, desc.Digest) - default: - existCheck = []string{"blobs", desc.Digest.String()} - } - - req := p.request(host, http.MethodHead, existCheck...) - req.header.Set("Accept", strings.Join([]string{desc.MediaType, `*/*`}, ", ")) - - log.G(ctx).WithField("url", req.String()).Debugf("checking and pushing to") - - headResp, err := req.doWithRetries(ctx, nil) - if err != nil { - if !errors.Is(err, ErrInvalidAuthorization) { - return nil, err - } - log.G(ctx).WithError(err).Debugf("Unable to check existence, continuing with push") - } else { - defer headResp.Body.Close() - - if headResp.StatusCode == http.StatusOK { - var exists bool - if isManifest && existCheck[1] != desc.Digest.String() { - dgstHeader := digest.Digest(headResp.Header.Get("Docker-Content-Digest")) - if dgstHeader == desc.Digest { - exists = true - } - } else { - exists = true - } - - if exists { - p.tracker.SetStatus(ref, Status{ - Committed: true, - Status: content.Status{ - Ref: ref, - Total: desc.Size, - Offset: desc.Size, - // TODO: Set updated time? - }, - }) - - return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v on remote", desc.Digest) - } - } else if headResp.StatusCode != http.StatusNotFound { - err := remoteserrors.NewUnexpectedStatusErr(headResp) - - var statusError remoteserrors.ErrUnexpectedStatus - if errors.As(err, &statusError) { - log.G(ctx). - WithField("resp", headResp). - WithField("body", string(statusError.Body)). - Debug("unexpected response") - } - - return nil, accessio.RetriableError(err) - } - } - - if isManifest { - putPath := getManifestPath(p.object, desc.Digest) - req = p.request(host, http.MethodPut, putPath...) - req.header.Add("Content-Type", desc.MediaType) - } else { - // Start upload request - req = p.request(host, http.MethodPost, "blobs", "uploads/") - - var resp *http.Response - if fromRepo := selectRepositoryMountCandidate(p.refspec, desc.Annotations); fromRepo != "" { - preq := requestWithMountFrom(req, desc.Digest.String(), fromRepo) - pctx := ContextWithAppendPullRepositoryScope(ctx, fromRepo) - - // NOTE: the fromRepo might be private repo and - // auth service still can grant token without error. - // but the post request will fail because of 401. - // - // for the private repo, we should remove mount-from - // query and send the request again. - resp, err = preq.doWithRetries(pctx, nil) - if err != nil { - return nil, accessio.RetriableError(err) - } - - if resp.StatusCode == http.StatusUnauthorized { - log.G(ctx).Debugf("failed to mount from repository %s", fromRepo) - - resp.Body.Close() - resp = nil - } - } - - if resp == nil { - resp, err = req.doWithRetries(ctx, nil) - if err != nil { - return nil, accessio.RetriableError(err) - } - } - defer resp.Body.Close() - - switch resp.StatusCode { - case http.StatusOK, http.StatusAccepted, http.StatusNoContent: - case http.StatusCreated: - p.tracker.SetStatus(ref, Status{ - Committed: true, - Status: content.Status{ - Ref: ref, - Total: desc.Size, - Offset: desc.Size, - }, - }) - return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v on remote", desc.Digest) - default: - err := remoteserrors.NewUnexpectedStatusErr(resp) - - var statusError remoteserrors.ErrUnexpectedStatus - if errors.As(err, &statusError) { - log.G(ctx). - WithField("resp", resp). - WithField("body", string(statusError.Body)). - Debug("unexpected response") - } - - return nil, err - } - - var ( - location = resp.Header.Get("Location") - lurl *url.URL - lhost = host - ) - // Support paths without host in location - if strings.HasPrefix(location, "/") { - lurl, err = url.Parse(lhost.Scheme + "://" + lhost.Host + location) - if err != nil { - return nil, errors.Wrapf(err, "unable to parse location %v", location) - } - } else { - if !strings.Contains(location, "://") { - location = lhost.Scheme + "://" + location - } - lurl, err = url.Parse(location) - if err != nil { - return nil, errors.Wrapf(err, "unable to parse location %v", location) - } - - if lurl.Host != lhost.Host || lhost.Scheme != lurl.Scheme { - lhost.Scheme = lurl.Scheme - lhost.Host = lurl.Host - log.G(ctx).WithField("host", lhost.Host).WithField("scheme", lhost.Scheme).Debug("upload changed destination") - - // Strip authorizer if change to host or scheme - lhost.Authorizer = nil - } - } - q := lurl.Query() - q.Add("digest", desc.Digest.String()) - - req = p.request(lhost, http.MethodPut) - req.header.Set("Content-Type", "application/octet-stream") - req.path = lurl.Path + "?" + q.Encode() - } - p.tracker.SetStatus(ref, Status{ - Status: content.Status{ - Ref: ref, - Total: desc.Size, - Expected: desc.Digest, - StartedAt: time.Now(), - }, - }) - - // TODO: Support chunked upload - - respC := make(chan response, 1) - - preq := &pushRequest{ - base: p.dockerBase, - ref: ref, - responseC: respC, - source: src, - isManifest: isManifest, - expected: desc.Digest, - tracker: p.tracker, - } - - req.body = preq.Reader - req.size = desc.Size - - go func() { - defer close(respC) - resp, err := req.doWithRetries(ctx, nil) - if err != nil { - respC <- response{err: err} - return - } - - switch resp.StatusCode { - case http.StatusOK, http.StatusCreated, http.StatusNoContent: - default: - err := remoteserrors.NewUnexpectedStatusErr(resp) - - var statusError remoteserrors.ErrUnexpectedStatus - if errors.As(err, &statusError) { - log.G(ctx). - WithField("resp", resp). - WithField("body", string(statusError.Body)). - Debug("unexpected response") - } - } - respC <- response{Response: resp} - }() - - return preq, nil -} - -func getManifestPath(object string, dgst digest.Digest) []string { - if i := strings.IndexByte(object, '@'); i >= 0 { - if object[i+1:] != dgst.String() { - // use digest, not tag - object = "" - } else { - // strip @ for registry path to make tag - object = object[:i] - } - } - - if object == "" { - return []string{"manifests", dgst.String()} - } - - return []string{"manifests", object} -} - -type response struct { - *http.Response - err error -} - -type pushRequest struct { - base *dockerBase - ref string - - responseC <-chan response - source resolve.Source - isManifest bool - - expected digest.Digest - tracker StatusTracker -} - -func (pw *pushRequest) Status() (content.Status, error) { - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return content.Status{}, err - } - return status.Status, nil -} - -func (pw *pushRequest) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { - // TODO: timeout waiting for response - resp := <-pw.responseC - if resp.err != nil { - return resp.err - } - defer resp.Response.Body.Close() - - // 201 is specified return status, some registries return - // 200, 202 or 204. - switch resp.StatusCode { - case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: - default: - return remoteserrors.NewUnexpectedStatusErr(resp.Response) - } - - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return errors.Wrap(err, "failed to get status") - } - - if size > 0 && size != status.Offset { - return errors.Errorf("unexpected size %d, expected %d", status.Offset, size) - } - - if expected == "" { - expected = status.Expected - } - - actual, err := digest.Parse(resp.Header.Get("Docker-Content-Digest")) - if err != nil { - return errors.Wrap(err, "invalid content digest in response") - } - - if actual != expected { - return errors.Errorf("got digest %s, expected %s", actual, expected) - } - - status.Committed = true - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - - return nil -} - -func (pw *pushRequest) Reader() (io.ReadCloser, error) { - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return nil, err - } - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - - r, err := pw.source.Reader() - if err != nil { - return nil, err - } - return &sizeTrackingReader{pw, r}, nil -} - -type sizeTrackingReader struct { - pw *pushRequest - io.ReadCloser -} - -func (t *sizeTrackingReader) Read(in []byte) (int, error) { - // fmt.Printf("reading next...\n") - n, err := t.ReadCloser.Read(in) - if n > 0 { - status, err := t.pw.tracker.GetStatus(t.pw.ref) - // fmt.Printf("read %d[%d] bytes\n", n, status.Offset) - if err != nil { - return n, err - } - status.Offset += int64(n) - status.UpdatedAt = time.Now() - t.pw.tracker.SetStatus(t.pw.ref, status) - } - return n, err -} - -func requestWithMountFrom(req *request, mount, from string) *request { - creq := *req - - sep := "?" - if strings.Contains(creq.path, sep) { - sep = "&" - } - - creq.path = creq.path + sep + "mount=" + mount + "&from=" + from - - return &creq -} diff --git a/api/tech/docker/registry.go b/api/tech/docker/registry.go deleted file mode 100644 index 795dd6e244..0000000000 --- a/api/tech/docker/registry.go +++ /dev/null @@ -1,234 +0,0 @@ -package docker - -import ( - "net" - "net/http" - - "github.com/pkg/errors" -) - -// HostCapabilities represent the capabilities of the registry -// host. This also represents the set of operations for which -// the registry host may be trusted to perform. -// -// For example pushing is a capability which should only be -// performed on an upstream source, not a mirror. -// Resolving (the process of converting a name into a digest) -// must be considered a trusted operation and only done by -// a host which is trusted (or more preferably by secure process -// which can prove the provenance of the mapping). A public -// mirror should never be trusted to do a resolve action. -// -// | Registry Type | Pull | Resolve | Push | -// |------------------|------|---------|------| -// | Public Registry | yes | yes | yes | -// | Private Registry | yes | yes | yes | -// | Public Mirror | yes | no | no | -// | Private Mirror | yes | yes | no |. -type HostCapabilities uint8 - -const ( - // HostCapabilityPull represents the capability to fetch manifests - // and blobs by digest. - HostCapabilityPull HostCapabilities = 1 << iota - - // HostCapabilityResolve represents the capability to fetch manifests - // by name. - HostCapabilityResolve - - // HostCapabilityPush represents the capability to push blobs and - // manifests. - HostCapabilityPush - - // Reserved for future capabilities (i.e. search, catalog, remove). -) - -// Has checks whether the capabilities list has the provide capability. -func (c HostCapabilities) Has(t HostCapabilities) bool { - return c&t == t -} - -// RegistryHost represents a complete configuration for a registry -// host, representing the capabilities, authorizations, connection -// configuration, and location. -type RegistryHost struct { - Client *http.Client - Authorizer Authorizer - Host string - Scheme string - Path string - Capabilities HostCapabilities - Header http.Header -} - -const ( - dockerHostname = "docker.io" - dockerRegistryHostname = "registry-1.docker.io" -) - -func (h RegistryHost) isProxy(refhost string) bool { - if refhost != h.Host { - if refhost != dockerHostname || h.Host != dockerRegistryHostname { - return true - } - } - return false -} - -// RegistryHosts fetches the registry hosts for a given namespace, -// provided by the host component of an distribution image reference. -type RegistryHosts func(string) ([]RegistryHost, error) - -// Registries joins multiple registry configuration functions, using the same -// order as provided within the arguments. When an empty registry configuration -// is returned with a nil error, the next function will be called. -// NOTE: This function will not join configurations, as soon as a non-empty -// configuration is returned from a configuration function, it will be returned -// to the caller. -func Registries(registries ...RegistryHosts) RegistryHosts { - return func(host string) ([]RegistryHost, error) { - for _, registry := range registries { - config, err := registry(host) - if err != nil { - return config, err - } - if len(config) > 0 { - return config, nil - } - } - return nil, nil - } -} - -type registryOpts struct { - authorizer Authorizer - plainHTTP func(string) (bool, error) - host func(string) (string, error) - client *http.Client -} - -// RegistryOpt defines a registry default option. -type RegistryOpt func(*registryOpts) - -// WithPlainHTTP configures registries to use plaintext http scheme -// for the provided host match function. -func WithPlainHTTP(f func(string) (bool, error)) RegistryOpt { - return func(opts *registryOpts) { - opts.plainHTTP = f - } -} - -// WithAuthorizer configures the default authorizer for a registry. -func WithAuthorizer(a Authorizer) RegistryOpt { - return func(opts *registryOpts) { - opts.authorizer = a - } -} - -// WithHostTranslator defines the default translator to use for registry hosts. -func WithHostTranslator(h func(string) (string, error)) RegistryOpt { - return func(opts *registryOpts) { - opts.host = h - } -} - -// WithClient configures the default http client for a registry. -func WithClient(c *http.Client) RegistryOpt { - return func(opts *registryOpts) { - opts.client = c - } -} - -// ConfigureDefaultRegistries is used to create a default configuration for -// registries. For more advanced configurations or per-domain setups, -// the RegistryHosts interface should be used directly. -// NOTE: This function will always return a non-empty value or error. -func ConfigureDefaultRegistries(ropts ...RegistryOpt) RegistryHosts { - var opts registryOpts - for _, opt := range ropts { - opt(&opts) - } - - return func(host string) ([]RegistryHost, error) { - config := RegistryHost{ - Client: opts.client, - Authorizer: opts.authorizer, - Host: host, - Scheme: "https", - Path: "/v2", - Capabilities: HostCapabilityPull | HostCapabilityResolve | HostCapabilityPush, - } - - if config.Client == nil { - config.Client = http.DefaultClient - } - - if opts.plainHTTP != nil { - match, err := opts.plainHTTP(host) - if err != nil { - return nil, err - } - if match { - config.Scheme = "http" - } - } - - if opts.host != nil { - var err error - config.Host, err = opts.host(config.Host) - if err != nil { - return nil, err - } - } else if host == dockerHostname { - config.Host = dockerRegistryHostname - } - - return []RegistryHost{config}, nil - } -} - -// MatchAllHosts is a host match function which is always true. -func MatchAllHosts(string) (bool, error) { - return true, nil -} - -// MatchLocalhost is a host match function which returns true for -// localhost. -// -// Note: this does not handle matching of ip addresses in octal, -// decimal or hex form. -func MatchLocalhost(host string) (bool, error) { - switch { - case host == "::1": - return true, nil - case host == "[::1]": - return true, nil - } - h, p, err := net.SplitHostPort(host) - - // addrError helps distinguish between errors of form - // "no colon in address" and "too many colons in address". - // The former is fine as the host string need not have a - // port. Latter needs to be handled. - addrError := &net.AddrError{ - Err: "missing port in address", - Addr: host, - } - if err != nil { - if err.Error() != addrError.Error() { - return false, err - } - // host string without any port specified - h = host - } else if len(p) == 0 { - return false, errors.New("invalid host name format") - } - - // use ipv4 dotted decimal for further checking - if h == "localhost" { - h = "127.0.0.1" - } - ip := net.ParseIP(h) - - return ip.IsLoopback(), nil -} diff --git a/api/tech/docker/resolver.go b/api/tech/docker/resolver.go deleted file mode 100644 index 292df03ae3..0000000000 --- a/api/tech/docker/resolver.go +++ /dev/null @@ -1,656 +0,0 @@ -package docker - -import ( - "context" - "fmt" - "io" - "net/http" - "net/url" - "path" - "strings" - - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/reference" - "github.com/containerd/containerd/remotes/docker/schema1" - "github.com/containerd/containerd/version" - "github.com/opencontainers/go-digest" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "golang.org/x/net/context/ctxhttp" - - "ocm.software/ocm/api/tech/docker/resolve" - "ocm.software/ocm/api/utils/accessio" -) - -var ( - // ErrInvalidAuthorization is used when credentials are passed to a server but - // those credentials are rejected. - ErrInvalidAuthorization = errors.New("authorization failed") - - // MaxManifestSize represents the largest size accepted from a registry - // during resolution. Larger manifests may be accepted using a - // resolution method other than the registry. - // - // NOTE: The max supported layers by some runtimes is 128 and individual - // layers will not contribute more than 256 bytes, making a - // reasonable limit for a large image manifests of 32K bytes. - // 4M bytes represents a much larger upper bound for images which may - // contain large annotations or be non-images. A proper manifest - // design puts large metadata in subobjects, as is consistent the - // intent of the manifest design. - MaxManifestSize int64 = 4 * 1048 * 1048 -) - -// Authorizer is used to authorize HTTP requests based on 401 HTTP responses. -// An Authorizer is responsible for caching tokens or credentials used by -// requests. -type Authorizer interface { - // Authorize sets the appropriate `Authorization` header on the given - // request. - // - // If no authorization is found for the request, the request remains - // unmodified. It may also add an `Authorization` header as - // "bearer " - // "basic " - Authorize(context.Context, *http.Request) error - - // AddResponses adds a 401 response for the authorizer to consider when - // authorizing requests. The last response should be unauthorized and - // the previous requests are used to consider redirects and retries - // that may have led to the 401. - // - // If response is not handled, returns `ErrNotImplemented` - AddResponses(context.Context, []*http.Response) error -} - -// ResolverOptions are used to configured a new Docker register resolver. -type ResolverOptions struct { - // Hosts returns registry host configurations for a namespace. - Hosts RegistryHosts - - // Headers are the HTTP request header fields sent by the resolver - Headers http.Header - - // Tracker is used to track uploads to the registry. This is used - // since the registry does not have upload tracking and the existing - // mechanism for getting blob upload status is expensive. - Tracker StatusTracker - - // Authorizer is used to authorize registry requests - // Deprecated: use Hosts - Authorizer Authorizer - - // Credentials provides username and secret given a host. - // If username is empty but a secret is given, that secret - // is interpreted as a long lived token. - // Deprecated: use Hosts - Credentials func(string) (string, string, error) - - // Host provides the hostname given a namespace. - // Deprecated: use Hosts - Host func(string) (string, error) - - // PlainHTTP specifies to use plain http and not https - // Deprecated: use Hosts - PlainHTTP bool - - // Client is the http client to used when making registry requests - // Deprecated: use Hosts - Client *http.Client -} - -// DefaultHost is the default host function. -func DefaultHost(ns string) (string, error) { - if ns == "docker.io" { - return "registry-1.docker.io", nil - } - return ns, nil -} - -type dockerResolver struct { - hosts RegistryHosts - header http.Header - resolveHeader http.Header - tracker StatusTracker -} - -// NewResolver returns a new resolver to a Docker registry. -func NewResolver(options ResolverOptions) resolve.Resolver { - if options.Tracker == nil { - options.Tracker = NewInMemoryTracker() - } - - if options.Headers == nil { - options.Headers = make(http.Header) - } - if _, ok := options.Headers["User-Agent"]; !ok { - options.Headers.Set("User-Agent", "containerd/"+version.Version) - } - - resolveHeader := http.Header{} - if _, ok := options.Headers["Accept"]; !ok { - // set headers for all the types we support for resolution. - resolveHeader.Set("Accept", strings.Join([]string{ - images.MediaTypeDockerSchema2Manifest, - images.MediaTypeDockerSchema2ManifestList, - ocispec.MediaTypeImageManifest, - ocispec.MediaTypeImageIndex, "*/*", - }, ", ")) - } else { - resolveHeader["Accept"] = options.Headers["Accept"] - delete(options.Headers, "Accept") - } - - if options.Hosts == nil { - opts := []RegistryOpt{} - if options.Host != nil { - opts = append(opts, WithHostTranslator(options.Host)) - } - - if options.Authorizer == nil { - options.Authorizer = NewDockerAuthorizer( - WithAuthClient(options.Client), - WithAuthHeader(options.Headers), - WithAuthCreds(options.Credentials)) - } - opts = append(opts, WithAuthorizer(options.Authorizer)) - - if options.Client != nil { - opts = append(opts, WithClient(options.Client)) - } - if options.PlainHTTP { - opts = append(opts, WithPlainHTTP(MatchAllHosts)) - } else { - opts = append(opts, WithPlainHTTP(MatchLocalhost)) - } - options.Hosts = ConfigureDefaultRegistries(opts...) - } - return &dockerResolver{ - hosts: options.Hosts, - header: options.Headers, - resolveHeader: resolveHeader, - tracker: options.Tracker, - } -} - -func getManifestMediaType(resp *http.Response) string { - // Strip encoding data (manifests should always be ascii JSON) - contentType := resp.Header.Get("Content-Type") - if sp := strings.IndexByte(contentType, ';'); sp != -1 { - contentType = contentType[0:sp] - } - - // As of Apr 30 2019 the registry.access.redhat.com registry does not specify - // the content type of any data but uses schema1 manifests. - if contentType == "text/plain" { - contentType = images.MediaTypeDockerSchema1Manifest - } - return contentType -} - -type countingReader struct { - reader io.Reader - bytesRead int64 -} - -func (r *countingReader) Read(p []byte) (int, error) { - n, err := r.reader.Read(p) - r.bytesRead += int64(n) - return n, err -} - -var _ resolve.Resolver = &dockerResolver{} - -func (r *dockerResolver) Resolve(ctx context.Context, ref string) (string, ocispec.Descriptor, error) { - base, err := r.resolveDockerBase(ref) - if err != nil { - return "", ocispec.Descriptor{}, err - } - refspec := base.refspec - if refspec.Object == "" { - return "", ocispec.Descriptor{}, reference.ErrObjectRequired - } - - var ( - firstErr error - paths [][]string - dgst = refspec.Digest() - caps = HostCapabilityPull - ) - - if dgst != "" { - if err := dgst.Validate(); err != nil { - // need to fail here, since we can't actually resolve the invalid - // digest. - return "", ocispec.Descriptor{}, err - } - - // turns out, we have a valid digest, make a url. - paths = append(paths, []string{"manifests", dgst.String()}) - - // fallback to blobs on not found. - paths = append(paths, []string{"blobs", dgst.String()}) - } else { - // Add - paths = append(paths, []string{"manifests", refspec.Object}) - caps |= HostCapabilityResolve - } - - hosts := base.filterHosts(caps) - if len(hosts) == 0 { - return "", ocispec.Descriptor{}, errors.Wrap(errdefs.ErrNotFound, "no resolve hosts") - } - - ctx, err = ContextWithRepositoryScope(ctx, refspec, false) - if err != nil { - return "", ocispec.Descriptor{}, err - } - - for _, u := range paths { - for _, host := range hosts { - ctxWithLogger := log.WithLogger(ctx, log.G(ctx).WithField("host", host.Host)) - - req := base.request(host, http.MethodHead, u...) - if err := req.addNamespace(base.refspec.Hostname()); err != nil { - return "", ocispec.Descriptor{}, err - } - - for key, value := range r.resolveHeader { - req.header[key] = append(req.header[key], value...) - } - - log.G(ctxWithLogger).Debug("resolving") - resp, err := req.doWithRetries(ctxWithLogger, nil) - if err != nil { - if errors.Is(err, ErrInvalidAuthorization) { - err = errors.Wrapf(err, "pull access denied, repository does not exist or may require authorization") - } else { - err = accessio.RetriableError(err) - } - // Store the error for referencing later - if firstErr == nil { - firstErr = err - } - log.G(ctxWithLogger).WithError(err).Info("trying next host") - continue // try another host - } - resp.Body.Close() // don't care about body contents. - - if resp.StatusCode > 299 { - if resp.StatusCode == http.StatusNotFound { - // log.G(ctxWithLogger).Info("trying next host - response was http.StatusNotFound") - continue - } - if resp.StatusCode > 399 { - // Set firstErr when encountering the first non-404 status code. - if firstErr == nil { - firstErr = errors.Errorf("pulling from host %s failed with status code %v: %v", host.Host, u, resp.Status) - } - continue // try another host - } - return "", ocispec.Descriptor{}, errors.Errorf("pulling from host %s failed with unexpected status code %v: %v", host.Host, u, resp.Status) - } - size := resp.ContentLength - contentType := getManifestMediaType(resp) - - // if no digest was provided, then only a resolve - // trusted registry was contacted, in this case use - // the digest header (or content from GET) - if dgst == "" { - // this is the only point at which we trust the registry. we use the - // content headers to assemble a descriptor for the name. when this becomes - // more robust, we mostly get this information from a secure trust store. - dgstHeader := digest.Digest(resp.Header.Get("Docker-Content-Digest")) - - if dgstHeader != "" && size != -1 { - if err := dgstHeader.Validate(); err != nil { - return "", ocispec.Descriptor{}, errors.Wrapf(err, "%q in header not a valid digest", dgstHeader) - } - dgst = dgstHeader - } - } - if dgst == "" || size == -1 { - log.G(ctxWithLogger).Debug("no Docker-Content-Digest header, fetching manifest instead") - - req = base.request(host, http.MethodGet, u...) - if err := req.addNamespace(base.refspec.Hostname()); err != nil { - return "", ocispec.Descriptor{}, err - } - - for key, value := range r.resolveHeader { - req.header[key] = append(req.header[key], value...) - } - - resp, err := req.doWithRetries(ctxWithLogger, nil) - if err != nil { - return "", ocispec.Descriptor{}, accessio.RetriableError(err) - } - defer resp.Body.Close() - - bodyReader := countingReader{reader: resp.Body} - - contentType = getManifestMediaType(resp) - if dgst == "" { - if contentType == images.MediaTypeDockerSchema1Manifest { - b, err := schema1.ReadStripSignature(&bodyReader) - if err != nil { - return "", ocispec.Descriptor{}, accessio.RetriableError(err) - } - - dgst = digest.FromBytes(b) - } else { - dgst, err = digest.FromReader(&bodyReader) - if err != nil { - return "", ocispec.Descriptor{}, accessio.RetriableError(err) - } - } - } else if _, err := io.Copy(io.Discard, &bodyReader); err != nil { - return "", ocispec.Descriptor{}, accessio.RetriableError(err) - } - size = bodyReader.bytesRead - } - // Prevent resolving to excessively large manifests - if size > MaxManifestSize { - if firstErr == nil { - firstErr = errors.Wrapf(errdefs.ErrNotFound, "rejecting %d byte manifest for %s", size, ref) - } - continue - } - - desc := ocispec.Descriptor{ - Digest: dgst, - MediaType: contentType, - Size: size, - } - - log.G(ctxWithLogger).WithField("desc.digest", desc.Digest).Debug("resolved") - return ref, desc, nil - } - } - - // If above loop terminates without return, then there was an error. - // "firstErr" contains the first non-404 error. That is, "firstErr == nil" - // means that either no registries were given or each registry returned 404. - - if firstErr == nil { - firstErr = errors.Wrap(errdefs.ErrNotFound, ref) - } - - return "", ocispec.Descriptor{}, firstErr -} - -func (r *dockerResolver) Fetcher(ctx context.Context, ref string) (resolve.Fetcher, error) { - base, err := r.resolveDockerBase(ref) - if err != nil { - return nil, err - } - - return dockerFetcher{ - dockerBase: base, - }, nil -} - -func (r *dockerResolver) Pusher(ctx context.Context, ref string) (resolve.Pusher, error) { - base, err := r.resolveDockerBase(ref) - if err != nil { - return nil, err - } - - return dockerPusher{ - dockerBase: base, - object: base.refspec.Object, - tracker: r.tracker, - }, nil -} - -func (r *dockerResolver) resolveDockerBase(ref string) (*dockerBase, error) { - refspec, err := reference.Parse(ref) - if err != nil { - return nil, err - } - - return r.base(refspec) -} - -type dockerBase struct { - refspec reference.Spec - repository string - hosts []RegistryHost - header http.Header -} - -func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) { - host := refspec.Hostname() - hosts, err := r.hosts(host) - if err != nil { - return nil, err - } - return &dockerBase{ - refspec: refspec, - repository: strings.TrimPrefix(refspec.Locator, host+"/"), - hosts: hosts, - header: r.header, - }, nil -} - -func (r *dockerBase) filterHosts(caps HostCapabilities) (hosts []RegistryHost) { - for _, host := range r.hosts { - if host.Capabilities.Has(caps) { - hosts = append(hosts, host) - } - } - return -} - -func (r *dockerBase) request(host RegistryHost, method string, ps ...string) *request { - header := r.header.Clone() - if header == nil { - header = http.Header{} - } - - for key, value := range host.Header { - header[key] = append(header[key], value...) - } - parts := append([]string{"/", host.Path, r.repository}, ps...) - p := path.Join(parts...) - // Join strips trailing slash, re-add ending "/" if included - if len(parts) > 0 && strings.HasSuffix(parts[len(parts)-1], "/") { - p += "/" - } - return &request{ - method: method, - path: p, - header: header, - host: host, - } -} - -func (r *request) authorize(ctx context.Context, req *http.Request) error { - // Check if has header for host - if r.host.Authorizer != nil { - if err := r.host.Authorizer.Authorize(ctx, req); err != nil { - return err - } - } - - return nil -} - -func (r *request) addNamespace(ns string) (err error) { - if !r.host.isProxy(ns) { - return nil - } - var q url.Values - // Parse query - if i := strings.IndexByte(r.path, '?'); i > 0 { - r.path = r.path[:i+1] - q, err = url.ParseQuery(r.path[i+1:]) - if err != nil { - return - } - } else { - r.path += "?" - q = url.Values{} - } - q.Add("ns", ns) - - r.path += q.Encode() - - return -} - -type request struct { - method string - path string - header http.Header - host RegistryHost - body func() (io.ReadCloser, error) - size int64 -} - -func (r *request) do(ctx context.Context) (*http.Response, error) { - u := r.host.Scheme + "://" + r.host.Host + r.path - req, err := http.NewRequestWithContext(ctx, r.method, u, nil) - if err != nil { - return nil, err - } - req.Header = http.Header{} // headers need to be copied to avoid concurrent map access - for k, v := range r.header { - req.Header[k] = v - } - if r.body != nil { - body, err := r.body() - if err != nil { - return nil, err - } - req.Body = body - req.GetBody = r.body - if r.size > 0 { - req.ContentLength = r.size - } - defer body.Close() - } - - ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u)) - log.G(ctx).WithFields(sanitizedRequestFields(req)).Debug("do request") - if err := r.authorize(ctx, req); err != nil { - return nil, errors.Wrap(err, "failed to authorize") - } - - client := &http.Client{} - if r.host.Client != nil { - *client = *r.host.Client - } - if client.CheckRedirect == nil { - client.CheckRedirect = func(req *http.Request, via []*http.Request) error { - if len(via) >= 10 { - return errors.New("stopped after 10 redirects") - } - return errors.Wrap(r.authorize(ctx, req), "failed to authorize redirect") - } - } - - resp, err := ctxhttp.Do(ctx, client, req) - if err != nil { - return nil, errors.Wrap(err, "failed to do request") - } - log.G(ctx).WithFields(responseFields(resp)).Debug("fetch response received") - return resp, nil -} - -func (r *request) doWithRetries(ctx context.Context, responses []*http.Response) (*http.Response, error) { - resp, err := r.do(ctx) - if err != nil { - return nil, err - } - - responses = append(responses, resp) - retry, err := r.retryRequest(ctx, responses) - if err != nil { - resp.Body.Close() - return nil, err - } - if retry { - resp.Body.Close() - return r.doWithRetries(ctx, responses) - } - return resp, err -} - -func (r *request) retryRequest(ctx context.Context, responses []*http.Response) (bool, error) { - if len(responses) > 5 { - return false, nil - } - last := responses[len(responses)-1] - switch last.StatusCode { - case http.StatusUnauthorized: - log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized") - if r.host.Authorizer != nil { - if err := r.host.Authorizer.AddResponses(ctx, responses); err == nil { - return true, nil - } else if !errdefs.IsNotImplemented(err) { - return false, err - } - } - - return false, nil - case http.StatusMethodNotAllowed: - // Support registries which have not properly implemented the HEAD method for - // manifests endpoint - if r.method == http.MethodHead && strings.Contains(r.path, "/manifests/") { - r.method = http.MethodGet - return true, nil - } - case http.StatusRequestTimeout, http.StatusTooManyRequests: - return true, nil - } - - // TODO: Handle 50x errors accounting for attempt history - return false, nil -} - -func (r *request) String() string { - return r.host.Scheme + "://" + r.host.Host + r.path -} - -func sanitizedRequestFields(req *http.Request) logrus.Fields { - fields := map[string]interface{}{ - "request.method": req.Method, - } - for k, vals := range req.Header { - k = strings.ToLower(k) - if k == "authorization" { - continue - } - for i, v := range vals { - field := "request.header." + k - if i > 0 { - field = fmt.Sprintf("%s.%d", field, i) - } - fields[field] = v - } - } - - return logrus.Fields(fields) -} - -func responseFields(resp *http.Response) logrus.Fields { - fields := map[string]interface{}{ - "response.status": resp.Status, - } - for k, vals := range resp.Header { - k = strings.ToLower(k) - for i, v := range vals { - field := "response.header." + k - if i > 0 { - field = fmt.Sprintf("%s.%d", field, i) - } - fields[field] = v - } - } - - return logrus.Fields(fields) -} diff --git a/api/tech/oras/client.go b/api/tech/oras/client.go new file mode 100644 index 0000000000..a73e0003a9 --- /dev/null +++ b/api/tech/oras/client.go @@ -0,0 +1,201 @@ +package oras + +import ( + "context" + "errors" + "fmt" + "io" + "strings" + "sync" + + "github.com/containerd/containerd/errdefs" + ociv1 "github.com/opencontainers/image-spec/specs-go/v1" + oraserr "oras.land/oras-go/v2/errdef" + "oras.land/oras-go/v2/registry/remote" + "oras.land/oras-go/v2/registry/remote/auth" +) + +type ClientOptions struct { + Client *auth.Client + PlainHTTP bool +} + +type Client struct { + client *auth.Client + plainHTTP bool + ref string + mu sync.RWMutex +} + +var ( + _ Resolver = &Client{} + _ Fetcher = &Client{} + _ Pusher = &Client{} + _ Lister = &Client{} +) + +func New(opts ClientOptions) *Client { + return &Client{client: opts.Client, plainHTTP: opts.PlainHTTP} +} + +func (c *Client) Fetcher(ctx context.Context, ref string) (Fetcher, error) { + c.mu.Lock() + defer c.mu.Unlock() + + c.ref = ref + return c, nil +} + +func (c *Client) Pusher(ctx context.Context, ref string) (Pusher, error) { + c.mu.Lock() + defer c.mu.Unlock() + + c.ref = ref + return c, nil +} + +func (c *Client) Lister(ctx context.Context, ref string) (Lister, error) { + c.mu.Lock() + defer c.mu.Unlock() + + c.ref = ref + return c, nil +} + +func (c *Client) Resolve(ctx context.Context, ref string) (string, ociv1.Descriptor, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + src, err := c.createRepository(ref) + if err != nil { + return "", ociv1.Descriptor{}, err + } + + // We try to first resolve a manifest. + // _Note_: If there is an error like not found, but we know that the digest exists + // we can add src.Blobs().Resolve in here. If we do that, note that + // for Blobs().Resolve `not found` is actually `invalid checksum digest format`. + // Meaning it will throw that error instead of not found. + desc, err := src.Resolve(ctx, ref) + if err != nil { + if errors.Is(err, oraserr.ErrNotFound) { + return "", ociv1.Descriptor{}, errdefs.ErrNotFound + } + + return "", ociv1.Descriptor{}, fmt.Errorf("failed to resolve manifest %q: %w", ref, err) + } + + return "", desc, nil +} + +func (c *Client) Push(ctx context.Context, d ociv1.Descriptor, src Source) error { + c.mu.RLock() + defer c.mu.RUnlock() + + reader, err := src.Reader() + if err != nil { + return err + } + + repository, err := c.createRepository(c.ref) + if err != nil { + return err + } + + if split := strings.Split(c.ref, ":"); len(split) == 2 { + // Once we get a reference that contains a tag, we need to re-push that + // layer with the reference included. PushReference then will tag + // that layer resulting in the created tag pointing to the right + // blob data. + if err := repository.PushReference(ctx, d, reader, c.ref); err != nil { + return fmt.Errorf("failed to push tag: %w", err) + } + + return nil + } + + // We have a digest, so we use plain push for the digest. + // Push here decides if it's a Manifest or a Blob. + if err := repository.Push(ctx, d, reader); err != nil { + return fmt.Errorf("failed to push: %w, %s", err, c.ref) + } + + return nil +} + +func (c *Client) Fetch(ctx context.Context, desc ociv1.Descriptor) (io.ReadCloser, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + src, err := c.createRepository(c.ref) + if err != nil { + return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) + } + + // oras requires a Resolve to happen before a fetch because + // -1 is an invalid size and results in a content-length mismatch error by design. + // This is a security consideration on ORAS' side. + // manifest is not set in the descriptor + // We explicitly call resolve on manifest first because it might be + // that the mediatype is not set at this point so we don't want ORAS to try to + // select the wrong layer to fetch from. + rdesc, err := src.Manifests().Resolve(ctx, desc.Digest.String()) + if errors.Is(err, oraserr.ErrNotFound) { + rdesc, err = src.Blobs().Resolve(ctx, desc.Digest.String()) + if err != nil { + return nil, fmt.Errorf("failed to resolve fetch blob %q: %w", desc.Digest.String(), err) + } + + delayer := func() (io.ReadCloser, error) { + return src.Blobs().Fetch(ctx, rdesc) + } + + return newDelayedReader(delayer) + } + + if err != nil { + return nil, fmt.Errorf("failed to resolve fetch manifest %q: %w", desc.Digest.String(), err) + } + + // lastly, try a manifest fetch. + fetch, err := src.Fetch(ctx, rdesc) + if err != nil { + return nil, fmt.Errorf("failed to fetch manifest: %w", err) + } + + return fetch, err +} + +func (c *Client) List(ctx context.Context) ([]string, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + src, err := c.createRepository(c.ref) + if err != nil { + return nil, fmt.Errorf("failed to resolve ref %q: %w", c.ref, err) + } + + var result []string + if err := src.Tags(ctx, "", func(tags []string) error { + result = append(result, tags...) + return nil + }); err != nil { + return nil, fmt.Errorf("failed to list tags: %w", err) + } + + return result, nil +} + +// createRepository creates a new repository representation using the passed in ref. +// This is a cheap operation. +func (c *Client) createRepository(ref string) (*remote.Repository, error) { + src, err := remote.NewRepository(ref) + if err != nil { + return nil, fmt.Errorf("failed to create new repository: %w", err) + } + + src.Client = c.client // set up authenticated client. + src.PlainHTTP = c.plainHTTP + + return src, nil +} diff --git a/api/tech/oras/delayed_reader.go b/api/tech/oras/delayed_reader.go new file mode 100644 index 0000000000..d07a6f870b --- /dev/null +++ b/api/tech/oras/delayed_reader.go @@ -0,0 +1,57 @@ +package oras + +import ( + "io" +) + +// delayedReader sets up a reader that only fetches a blob +// upon explicit reading request, otherwise, it stores the +// way of getting the reader. +type delayedReader struct { + open func() (io.ReadCloser, error) + rc io.ReadCloser + closed bool +} + +func newDelayedReader(open func() (io.ReadCloser, error)) (*delayedReader, error) { + return &delayedReader{ + open: open, + }, nil +} + +func (d *delayedReader) Read(p []byte) (n int, err error) { + if d.closed { + return 0, io.EOF + } + + reader, err := d.reader() + if err != nil { + return 0, err + } + + return reader.Read(p) +} + +func (d *delayedReader) reader() (io.ReadCloser, error) { + if d.rc != nil { + return d.rc, nil + } + + rc, err := d.open() + if err != nil { + return nil, err + } + + d.rc = rc + return rc, nil +} + +func (d *delayedReader) Close() error { + if d.closed { + return nil + } + + // we close regardless of an error + d.closed = true + return d.rc.Close() +} diff --git a/api/tech/docker/resolve/interface.go b/api/tech/oras/interface.go similarity index 73% rename from api/tech/docker/resolve/interface.go rename to api/tech/oras/interface.go index 8476000012..5020f68d21 100644 --- a/api/tech/docker/resolve/interface.go +++ b/api/tech/oras/interface.go @@ -1,11 +1,9 @@ -package resolve +package oras import ( "context" "io" - "github.com/containerd/containerd/content" - "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -54,22 +52,9 @@ type Fetcher interface { type Pusher interface { // Push returns a push request for the given resource identified // by the descriptor and the given data source. - Push(ctx context.Context, d ocispec.Descriptor, src Source) (PushRequest, error) + Push(ctx context.Context, d ocispec.Descriptor, src Source) error } type Lister interface { List(context.Context) ([]string, error) } - -// PushRequest handles the result of a push request -// replaces containerd content.Writer. -type PushRequest interface { - // Commit commits the blob (but no roll-back is guaranteed on an error). - // size and expected can be zero-value when unknown. - // Commit always closes the writer, even on error. - // ErrAlreadyExists aborts the writer. - Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error - - // Status returns the current state of write - Status() (content.Status, error) -} diff --git a/go.mod b/go.mod index 8c3a1bf220..10e02fada5 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( k8s.io/apimachinery v0.32.0 k8s.io/cli-runtime v0.32.0 k8s.io/client-go v0.32.0 + oras.land/oras-go/v2 v2.5.0 sigs.k8s.io/controller-runtime v0.19.3 sigs.k8s.io/yaml v1.4.0 ) @@ -276,7 +277,6 @@ require ( github.com/opencontainers/runtime-spec v1.2.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pborman/uuid v1.2.1 // indirect - github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect diff --git a/go.sum b/go.sum index cb07a1b822..cfe049224b 100644 --- a/go.sum +++ b/go.sum @@ -1358,6 +1358,8 @@ k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJ k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= oras.land/oras-go v1.2.6 h1:z8cmxQXBU8yZ4mkytWqXfo6tZcamPwjsuxYU81xJ8Lk= oras.land/oras-go v1.2.6/go.mod h1:OVPc1PegSEe/K8YiLfosrlqlqTN9PUyFvOw5Y9gwrT8= +oras.land/oras-go/v2 v2.5.0 h1:o8Me9kLY74Vp5uw07QXPiitjsw7qNXi8Twd+19Zf02c= +oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZHg= sigs.k8s.io/controller-runtime v0.19.3 h1:XO2GvC9OPftRst6xWCpTgBZO04S2cbp0Qqkj8bX1sPw= sigs.k8s.io/controller-runtime v0.19.3/go.mod h1:j4j87DqtsThvwTv5/Tc5NFRyyF/RF0ip4+62tbTSIUM= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= diff --git a/hack/generate.sh b/hack/generate.sh index d574eb5828..d2e9864987 100755 --- a/hack/generate.sh +++ b/hack/generate.sh @@ -4,4 +4,4 @@ set -e echo "> Generate" -GO111MODULE=on go generate -mod=mod $@ \ No newline at end of file +GO111MODULE=on go generate -mod=mod $@