diff --git a/pkg/api/routes.go b/pkg/api/routes.go index b8cf0a3627..2c2507df42 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -957,8 +957,8 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re } else { var lockLatency time.Time - imgStore.RLock(&lockLatency) - defer imgStore.RUnlock(&lockLatency) + imgStore.RLockRepo(name, &lockLatency) + defer imgStore.RUnlockRepo(name, &lockLatency) ok, blen, _, err = imgStore.StatBlob(name, digest) } diff --git a/pkg/extensions/sync/destination.go b/pkg/extensions/sync/destination.go index 3384e62702..c7234bedd0 100644 --- a/pkg/extensions/sync/destination.go +++ b/pkg/extensions/sync/destination.go @@ -136,9 +136,9 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer } for _, manifest := range indexManifest.Manifests { - tempImageStore.RLock(&lockLatency) + tempImageStore.RLockRepo(repo, &lockLatency) manifestBuf, err := tempImageStore.GetBlobContent(repo, manifest.Digest) - tempImageStore.RUnlock(&lockLatency) + tempImageStore.RUnlockRepo(repo, &lockLatency) if err != nil { registry.log.Error().Str("errorType", common.TypeOf(err)). diff --git a/pkg/meta/parse.go b/pkg/meta/parse.go index ab671c0084..5045caab2b 100644 --- a/pkg/meta/parse.go +++ b/pkg/meta/parse.go @@ -109,8 +109,8 @@ func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreCo var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) indexBlob, err := imageStore.GetIndexContent(repo) if err != nil { @@ -223,8 +223,8 @@ func getCosignSignatureLayersInfo( var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) for _, layer := range manifestContent.Layers { layerContent, err := imageStore.GetBlobContent(repo, layer.Digest) @@ -280,8 +280,8 @@ func getNotationSignatureLayersInfo( var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) layerContent, err := imageStore.GetBlobContent(repo, layer) if err != nil { diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index e372888576..2bda9b4a21 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -106,8 +106,8 @@ func (gc GarbageCollect) cleanRepo(ctx context.Context, repo string) error { return zerr.ErrRepoNotFound } - gc.imgStore.Lock(&lockLatency) - defer gc.imgStore.Unlock(&lockLatency) + gc.imgStore.LockRepo(repo, &lockLatency) + defer gc.imgStore.UnlockRepo(repo, &lockLatency) /* this index (which represents the index.json of this repo) is the root point from which we search for dangling manifests/blobs diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index be4b79d9e7..485e88581f 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -10,7 +10,6 @@ import ( "path" "path/filepath" "strings" - "sync" "time" "unicode/utf8" @@ -42,7 +41,7 @@ const ( type ImageStore struct { rootDir string storeDriver storageTypes.Driver - lock *sync.RWMutex + lock *ImageStoreLock log zlog.Logger metrics monitoring.MetricServer cache cache.Cache @@ -78,7 +77,7 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo imgStore := &ImageStore{ rootDir: rootDir, storeDriver: storeDriver, - lock: &sync.RWMutex{}, + lock: NewImageStoreLock(), log: log, metrics: metrics, dedupe: dedupe, @@ -124,6 +123,40 @@ func (is *ImageStore) Unlock(lockStart *time.Time) { monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram } +// RLock read-lock for specific repo +func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) { + *lockStart = time.Now() + + is.lock.RLockRepo(repo) +} + +// RUnlock read-unlock for specific repo. +func (is *ImageStore) RUnlockRepo(repo string, lockStart *time.Time) { + is.lock.RUnlockRepo(repo) + + lockEnd := time.Now() + // includes time spent in acquiring and holding a lock + latency := lockEnd.Sub(*lockStart) + monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram +} + +// Lock write-lock for specific repo.. +func (is *ImageStore) LockRepo(repo string, lockStart *time.Time) { + *lockStart = time.Now() + + is.lock.LockRepo(repo) +} + +// Unlock write-unlock for specific repo.. +func (is *ImageStore) UnlockRepo(repo string, lockStart *time.Time) { + is.lock.UnlockRepo(repo) + + lockEnd := time.Now() + // includes time spent in acquiring and holding a lock + latency := lockEnd.Sub(*lockStart) + monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram +} + func (is *ImageStore) initRepo(name string) error { repoDir := path.Join(is.rootDir, name) @@ -200,8 +233,8 @@ func (is *ImageStore) initRepo(name string) error { func (is *ImageStore) InitRepo(name string) error { var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(name, &lockLatency) + defer is.UnlockRepo(name, &lockLatency) return is.initRepo(name) } @@ -392,8 +425,8 @@ func (is *ImageStore) GetImageTags(repo string) ([]string, error) { return nil, zerr.ErrRepoNotFound } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) index, err := common.GetIndex(is, repo, is.log) if err != nil { @@ -414,9 +447,9 @@ func (is *ImageStore) GetImageManifest(repo, reference string) ([]byte, godigest var err error - is.RLock(&lockLatency) + is.RLockRepo(repo, &lockLatency) defer func() { - is.RUnlock(&lockLatency) + is.RUnlockRepo(repo, &lockLatency) if err == nil { monitoring.IncDownloadCounter(is.metrics, repo) @@ -466,9 +499,9 @@ func (is *ImageStore) PutImageManifest(repo, reference, mediaType string, //noli var err error - is.Lock(&lockLatency) + is.LockRepo(repo, &lockLatency) defer func() { - is.Unlock(&lockLatency) + is.UnlockRepo(repo, &lockLatency) if err == nil { if is.storeDriver.Name() == storageConstants.LocalStorageDriverName { @@ -596,8 +629,8 @@ func (is *ImageStore) DeleteImageManifest(repo, reference string, detectCollisio var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) err := is.deleteImageManifest(repo, reference, detectCollisions) if err != nil { @@ -885,8 +918,8 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { err = is.DedupeBlob(src, dstDigest, repo, dst) @@ -965,8 +998,8 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) dst := is.BlobPath(repo, dstDigest) @@ -1168,11 +1201,11 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6 blobPath := is.BlobPath(repo, digest) if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) } else { - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) } binfo, err := is.storeDriver.Stat(blobPath) @@ -1304,8 +1337,8 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT return nil, -1, -1, err } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) binfo, err := is.originalBlobInfo(repo, digest) if err != nil { @@ -1381,8 +1414,8 @@ func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType str return nil, -1, err } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) binfo, err := is.originalBlobInfo(repo, digest) if err != nil { @@ -1458,8 +1491,8 @@ func (is *ImageStore) GetReferrers(repo string, gdigest godigest.Digest, artifac ) (ispec.Index, error) { var lockLatency time.Time - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) return common.GetReferrers(is, repo, gdigest, artifactTypes, is.log) } @@ -1532,8 +1565,8 @@ func (is *ImageStore) DeleteBlob(repo string, digest godigest.Digest) error { return err } - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) return is.deleteBlob(repo, digest) } diff --git a/pkg/storage/imagestore/lock.go b/pkg/storage/imagestore/lock.go new file mode 100644 index 0000000000..f97df24ee1 --- /dev/null +++ b/pkg/storage/imagestore/lock.go @@ -0,0 +1,98 @@ +package imagestore + +import ( + "sync" +) + +type ImageStoreLock struct { + // locks per repository paths + repoLocks sync.Map + // lock for the entire storage, needed in case all repos need to be processed + // including blocking creating new repos + globalLock *sync.RWMutex +} + +func NewImageStoreLock() *ImageStoreLock { + return &ImageStoreLock{ + repoLocks: sync.Map{}, + globalLock: &sync.RWMutex{}, + } +} + +func (sl *ImageStoreLock) RLock() { + // block reads and writes to the entire storage, including new repos + sl.globalLock.RLock() +} + +func (sl *ImageStoreLock) RUnlock() { + // unlock to the storage in general + sl.globalLock.RUnlock() +} + +func (sl *ImageStoreLock) Lock() { + // block reads and writes to the entire storage, including new repos + sl.globalLock.Lock() +} + +func (sl *ImageStoreLock) Unlock() { + // unlock to the storage in general + sl.globalLock.Unlock() +} + +func (sl *ImageStoreLock) RLockRepo(repo string) { + // besides the individual repo increment the read counter for the + // global lock, this will make sure the storage cannot be + // write-locked at global level while individual repos are accessed + sl.globalLock.RLock() + + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) + + // lock individual repo + repoLock := val.(*sync.RWMutex) + repoLock.RLock() +} + +func (sl *ImageStoreLock) RUnlockRepo(repo string) { + // decrement the global read counter + sl.globalLock.RUnlock() + + val, ok := sl.repoLocks.Load(repo) + if !ok { + // somehow the unlock is called for repo that was never locked + return + } + + // read-unlock individual repo + repoLock := val.(*sync.RWMutex) + repoLock.RUnlock() +} + +func (sl *ImageStoreLock) LockRepo(repo string) { + // besides the individual repo increment the read counter for the + // global lock, this will make sure the storage cannot be + // write-locked at global level while individual repos are accessed + // we are not using the write lock here, as that would make all repos + // wait for one another + sl.globalLock.RLock() + + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) + + // write-lock individual repo + repoLock := val.(*sync.RWMutex) + repoLock.Lock() +} + +func (sl *ImageStoreLock) UnlockRepo(repo string) { + // decrement the global read counter + sl.globalLock.RUnlock() + + val, ok := sl.repoLocks.Load(repo) + if !ok { + // somehow the unlock is called for a repo that was never locked + return + } + + // write-unlock individual repo + repoLock := val.(*sync.RWMutex) + repoLock.Unlock() +} diff --git a/pkg/storage/scrub.go b/pkg/storage/scrub.go index 859252cbe7..af633a5af1 100644 --- a/pkg/storage/scrub.go +++ b/pkg/storage/scrub.go @@ -134,8 +134,8 @@ func checkImage( ) ([]ispec.Descriptor, error) { var lockLatency time.Time - imgStore.RLock(&lockLatency) - defer imgStore.RUnlock(&lockLatency) + imgStore.RLockRepo(imageName, &lockLatency) + defer imgStore.RUnlockRepo(imageName, &lockLatency) manifestContent, err := imgStore.GetBlobContent(imageName, manifest.Digest) if err != nil { @@ -149,8 +149,8 @@ func checkImage( func getIndex(imageName string, imgStore storageTypes.ImageStore) ([]byte, error) { var lockLatency time.Time - imgStore.RLock(&lockLatency) - defer imgStore.RUnlock(&lockLatency) + imgStore.RLockRepo(imageName, &lockLatency) + defer imgStore.RUnlockRepo(imageName, &lockLatency) // check image structure / layout ok, err := imgStore.ValidateRepo(imageName) diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 8d5b38bfae..9e2967e447 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -26,6 +26,10 @@ type ImageStore interface { //nolint:interfacebloat RUnlock(*time.Time) Lock(*time.Time) Unlock(*time.Time) + RLockRepo(repo string, lockStart *time.Time) + RUnlockRepo(repo string, lockStart *time.Time) + LockRepo(repo string, lockStart *time.Time) + UnlockRepo(repo string, lockStart *time.Time) InitRepo(name string) error ValidateRepo(name string) (bool, error) GetRepositories() ([]string, error) diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index ff18afab32..b57bae4985 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -80,6 +80,18 @@ func (is MockedImageStore) RUnlock(t *time.Time) { func (is MockedImageStore) RLock(t *time.Time) { } +func (is MockedImageStore) LockRepo(repo string, t *time.Time) { +} + +func (is MockedImageStore) UnlockRepo(repo string, t *time.Time) { +} + +func (is MockedImageStore) RUnlockRepo(repo string, t *time.Time) { +} + +func (is MockedImageStore) RLockRepo(repo string, t *time.Time) { +} + func (is MockedImageStore) Name() string { if is.NameFn != nil { return is.NameFn()