Skip to content

Commit

Permalink
added delayed reader and blob head check
Browse files Browse the repository at this point in the history
  • Loading branch information
Skarlso committed Dec 12, 2024
1 parent 8ad52af commit 7cadd7b
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 29 deletions.
2 changes: 0 additions & 2 deletions api/oci/artdesc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package artdesc

import (
"encoding/json"
"fmt"

"github.com/mandelsoft/goutils/errors"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -105,7 +104,6 @@ func (i *Index) AddManifest(d *Descriptor) {
////////////////////////////////////////////////////////////////////////////////

func DecodeIndex(data []byte) (*Index, error) {
fmt.Println("index handler")
var d Index

if err := json.Unmarshal(data, &d); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions api/oci/cpi/state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cpi

import (
"fmt"
"reflect"

"ocm.software/ocm/api/oci/artdesc"
Expand Down Expand Up @@ -77,7 +76,6 @@ func (i ArtifactStateHandler) Encode(d interface{}) ([]byte, error) {
}

func (i ArtifactStateHandler) Decode(data []byte) (interface{}, error) {
fmt.Println("artifact state handler")
return artdesc.Decode(data)
}

Expand Down
3 changes: 0 additions & 3 deletions api/tech/docker/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ type dockerFetcher struct {
func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", desc.Digest))

fmt.Println("Fetching: ", r.refspec, desc)

hosts := r.filterHosts(HostCapabilityPull)
if len(hosts) == 0 {
return nil, errors.Wrap(errdefs.ErrNotFound, "no pull hosts")
Expand All @@ -41,7 +39,6 @@ func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R
// firstly try fetch via external urls
for _, us := range desc.URLs {
ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", us))
fmt.Println("us: ", r.refspec, desc, us)
u, err := url.Parse(us)
if err != nil {
log.G(ctx).WithError(err).Debug("failed to parse")
Expand Down
31 changes: 19 additions & 12 deletions api/tech/regclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

type ClientOptions struct {
// TODO: add multiple hosts parsing
Host []config.Host
Version string
}
Expand All @@ -40,8 +39,9 @@ type pushRequest struct {
ref regref.Ref
}

// Commit and Status are actually not really used. Commit is a second stage operation and Status is never called in
// the library.
// Commit and Status are actually not really used in the library. Commit is a second stage operation and Status is never called in
// the library. The Status was a thing mostly in docker being used to track chunk reads. But that's taken care of
// by regclient internally.
func (p *pushRequest) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
return p.rc.Close(ctx, p.ref)
}
Expand Down Expand Up @@ -133,7 +133,6 @@ func (c *Client) convertDescriptorToRegClient(desc ociv1.Descriptor) descriptor.
}

func (c *Client) Resolve(ctx context.Context, ref string) (string, ociv1.Descriptor, error) {
// TODO: figure out what to do about closing c.rc.
r, err := regref.New(ref)
if err != nil {
return "", ociv1.Descriptor{}, err
Expand Down Expand Up @@ -257,12 +256,13 @@ func (c *Client) Fetch(ctx context.Context, desc ociv1.Descriptor) (_ io.ReadClo
desc.Size = 0
}

fmt.Println("in fetch: ", desc, c.ref)

if c.isManifest(desc) {
fmt.Println("in manifest: ", desc, c.ref)
manifestContent, err := c.rc.ManifestGet(ctx, c.ref, regclient.WithManifestDesc(c.convertDescriptorToRegClient(desc)))
if err != nil {
if errors.Is(err, regerr.ErrNotFound) {
return nil, errdefs.ErrNotFound
}

return nil, err
}

Expand All @@ -274,14 +274,21 @@ func (c *Client) Fetch(ctx context.Context, desc ociv1.Descriptor) (_ io.ReadClo
return io.NopCloser(bytes.NewReader(body)), nil
}

fmt.Println("in blob get: ", desc, c.ref)

reader, err := c.rc.BlobGet(ctx, c.ref, c.convertDescriptorToRegClient(desc))
// check if the blob exists so we can bail early
_, err = c.rc.BlobHead(ctx, c.ref, c.convertDescriptorToRegClient(desc))
if err != nil {
return nil, fmt.Errorf("failed to get the blob reader: %w", err)
if errors.Is(err, regerr.ErrNotFound) {
return nil, errdefs.ErrNotFound
}

return nil, err
}

delayer := func() (io.ReadCloser, error) {
return c.rc.BlobGet(ctx, c.ref, c.convertDescriptorToRegClient(desc))
}

return reader, nil
return newDelayedReader(delayer)
}

func (c *Client) List(ctx context.Context) (_ []string, err error) {
Expand Down
56 changes: 56 additions & 0 deletions api/tech/regclient/delayed_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package regclient

import "io"

// delayedReader sets up a reader that only fetches a blob
// upon explicit reading request, otherwise, it stores the
// way of getting the reader.
type delayedReader struct {
open func() (io.ReadCloser, error)
rc io.ReadCloser
closed bool
}

func newDelayedReader(open func() (io.ReadCloser, error)) (io.ReadCloser, error) {
return &delayedReader{
open: open,
}, nil
}

func (d *delayedReader) Read(p []byte) (n int, err error) {
if d.closed {
return 0, io.EOF
}

reader, err := d.reader()
if err != nil {
return 0, err

}

Check failure on line 29 in api/tech/regclient/delayed_reader.go

View workflow job for this annotation

GitHub Actions / Lint Golang

unnecessary trailing newline (whitespace)

return reader.Read(p)
}

func (d *delayedReader) reader() (io.ReadCloser, error) {
if d.rc != nil {
return d.rc, nil
}

rc, err := d.open()
if err != nil {
return nil, err
}

d.rc = rc
return rc, nil
}

func (d *delayedReader) Close() error {
if d.closed {
return nil
}

// we close regardless of an error
d.closed = true
return d.rc.Close()
}
1 change: 0 additions & 1 deletion api/utils/accessobj/accessstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func newState(mode AccessMode, a StateAccess, p StateHandler) (*state, error) {
return nil, fmt.Errorf("failed to get blob data: %w", err)
}

fmt.Println("data: ", blob.MimeType())
blob = blobaccess.ForData(blob.MimeType(), data) // cache original data
current, err = p.Decode(data)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions examples/lib/tour/07-resource-management/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func HandleResources(resources []Resource) error {
// --- end handle ---

func GatherResources(ctx ocm.Context, factory ResourceFactory) ([]Resource, error) {
fmt.Println("BEGIN GATHERING")
var resources []Resource

spec := ocireg.NewRepositorySpec("ghcr.io/open-component-model/ocm")
Expand All @@ -173,7 +172,6 @@ func GatherResources(ctx ocm.Context, factory ResourceFactory) ([]Resource, erro
if err != nil {
return nil, errors.Wrapf(err, "cannot setup repository")
}
fmt.Println("GOT REPO:", spec)

// to release potentially allocated temporary resources,
// many objects must be closed, if they should not be used
Expand All @@ -187,15 +185,12 @@ func GatherResources(ctx ocm.Context, factory ResourceFactory) ([]Resource, erro
// All kinds of repositories, regardless of their type
// feature the same interface to work with OCM content.
// --- begin lookup component ---
fmt.Println("Looking up component...")
c, err := repo.LookupComponent("ocm.software/ocmcli")
if err != nil {
return nil, errors.Wrapf(err, "cannot lookup component")
}
defer c.Close()

fmt.Println("Lookued up Component: ", c)

// --- end lookup component ---

// Now we look for the versions of the component
Expand All @@ -205,8 +200,6 @@ func GatherResources(ctx ocm.Context, factory ResourceFactory) ([]Resource, erro
return nil, errors.Wrapf(err, "cannot query version names")
}

fmt.Println("Versions:", versions)

// OCM version names must follow the SemVer rules.
// Therefore, we can simply order the versions and print them.
err = semverutils.SortVersions(versions)
Expand All @@ -222,7 +215,6 @@ func GatherResources(ctx ocm.Context, factory ResourceFactory) ([]Resource, erro
// to retrieve the latest version use
// cv, err := c.LookupVersion(versions[len(versions)-1])

fmt.Println("looking up component version: v0.17.0")
cv, err := c.LookupVersion("0.17.0")
if err != nil {
return nil, errors.Wrapf(err, "cannot get latest version")
Expand All @@ -242,7 +234,6 @@ func GatherResources(ctx ocm.Context, factory ResourceFactory) ([]Resource, erro
// like the repository specification.
// --- begin resources ---
for _, r := range cv.GetResources() {
fmt.Println("trying to fetch resource: ", r.Meta())
res := factory.Create(
r.Meta().GetIdentity(cv.GetDescriptor().Resources),
r.Meta().GetType(),
Expand Down

0 comments on commit 7cadd7b

Please sign in to comment.