Skip to content

Commit

Permalink
feat(storage): enable parallel writes by using per-repo locking
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Aaron <[email protected]>
  • Loading branch information
andaaron committed Aug 15, 2024
1 parent 2dea22f commit 32544c3
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,8 +957,8 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
} else {
var lockLatency time.Time

imgStore.RLock(&lockLatency)
defer imgStore.RUnlock(&lockLatency)
imgStore.RLockRepo(name, &lockLatency)
defer imgStore.RUnlockRepo(name, &lockLatency)

ok, blen, _, err = imgStore.StatBlob(name, digest)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/extensions/sync/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
}

for _, manifest := range indexManifest.Manifests {
tempImageStore.RLock(&lockLatency)
tempImageStore.RLockRepo(repo, &lockLatency)
manifestBuf, err := tempImageStore.GetBlobContent(repo, manifest.Digest)
tempImageStore.RUnlock(&lockLatency)
tempImageStore.RUnlockRepo(repo, &lockLatency)

if err != nil {
registry.log.Error().Str("errorType", common.TypeOf(err)).
Expand Down
12 changes: 6 additions & 6 deletions pkg/meta/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreCo

var lockLatency time.Time

imageStore.RLock(&lockLatency)
defer imageStore.RUnlock(&lockLatency)
imageStore.RLockRepo(repo, &lockLatency)
defer imageStore.RUnlockRepo(repo, &lockLatency)

indexBlob, err := imageStore.GetIndexContent(repo)
if err != nil {
Expand Down Expand Up @@ -223,8 +223,8 @@ func getCosignSignatureLayersInfo(

var lockLatency time.Time

imageStore.RLock(&lockLatency)
defer imageStore.RUnlock(&lockLatency)
imageStore.RLockRepo(repo, &lockLatency)
defer imageStore.RUnlockRepo(repo, &lockLatency)

for _, layer := range manifestContent.Layers {
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
Expand Down Expand Up @@ -280,8 +280,8 @@ func getNotationSignatureLayersInfo(

var lockLatency time.Time

imageStore.RLock(&lockLatency)
defer imageStore.RUnlock(&lockLatency)
imageStore.RLockRepo(repo, &lockLatency)
defer imageStore.RUnlockRepo(repo, &lockLatency)

layerContent, err := imageStore.GetBlobContent(repo, layer)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (gc GarbageCollect) cleanRepo(ctx context.Context, repo string) error {
return zerr.ErrRepoNotFound
}

gc.imgStore.Lock(&lockLatency)
defer gc.imgStore.Unlock(&lockLatency)
gc.imgStore.LockRepo(repo, &lockLatency)
defer gc.imgStore.UnlockRepo(repo, &lockLatency)

/* this index (which represents the index.json of this repo) is the root point from which we
search for dangling manifests/blobs
Expand Down
91 changes: 62 additions & 29 deletions pkg/storage/imagestore/imagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -42,7 +41,7 @@ const (
type ImageStore struct {
rootDir string
storeDriver storageTypes.Driver
lock *sync.RWMutex
lock *ImageStoreLock
log zlog.Logger
metrics monitoring.MetricServer
cache cache.Cache
Expand Down Expand Up @@ -78,7 +77,7 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo
imgStore := &ImageStore{
rootDir: rootDir,
storeDriver: storeDriver,
lock: &sync.RWMutex{},
lock: NewImageStoreLock(),
log: log,
metrics: metrics,
dedupe: dedupe,
Expand Down Expand Up @@ -124,6 +123,40 @@ func (is *ImageStore) Unlock(lockStart *time.Time) {
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram
}

// RLock read-lock for specific repo

Check failure on line 126 in pkg/storage/imagestore/imagestore.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) {
*lockStart = time.Now()

is.lock.RLockRepo(repo)
}

// RUnlock read-unlock for specific repo.
func (is *ImageStore) RUnlockRepo(repo string, lockStart *time.Time) {
is.lock.RUnlockRepo(repo)

lockEnd := time.Now()
// includes time spent in acquiring and holding a lock
latency := lockEnd.Sub(*lockStart)
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram
}

// Lock write-lock for specific repo..
func (is *ImageStore) LockRepo(repo string, lockStart *time.Time) {
*lockStart = time.Now()

is.lock.LockRepo(repo)
}

// Unlock write-unlock for specific repo..
func (is *ImageStore) UnlockRepo(repo string, lockStart *time.Time) {
is.lock.UnlockRepo(repo)

lockEnd := time.Now()
// includes time spent in acquiring and holding a lock
latency := lockEnd.Sub(*lockStart)
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram
}

func (is *ImageStore) initRepo(name string) error {
repoDir := path.Join(is.rootDir, name)

Expand Down Expand Up @@ -200,8 +233,8 @@ func (is *ImageStore) initRepo(name string) error {
func (is *ImageStore) InitRepo(name string) error {
var lockLatency time.Time

is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
is.LockRepo(name, &lockLatency)
defer is.UnlockRepo(name, &lockLatency)

return is.initRepo(name)
}
Expand Down Expand Up @@ -392,8 +425,8 @@ func (is *ImageStore) GetImageTags(repo string) ([]string, error) {
return nil, zerr.ErrRepoNotFound
}

is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
is.RLockRepo(repo, &lockLatency)
defer is.RUnlockRepo(repo, &lockLatency)

index, err := common.GetIndex(is, repo, is.log)
if err != nil {
Expand All @@ -414,9 +447,9 @@ func (is *ImageStore) GetImageManifest(repo, reference string) ([]byte, godigest

var err error

is.RLock(&lockLatency)
is.RLockRepo(repo, &lockLatency)
defer func() {

Check failure on line 451 in pkg/storage/imagestore/imagestore.go

View workflow job for this annotation

GitHub Actions / lint

defer statements should only be cuddled with expressions on same variable (wsl)
is.RUnlock(&lockLatency)
is.RUnlockRepo(repo, &lockLatency)

if err == nil {
monitoring.IncDownloadCounter(is.metrics, repo)
Expand Down Expand Up @@ -466,9 +499,9 @@ func (is *ImageStore) PutImageManifest(repo, reference, mediaType string, //noli

var err error

is.Lock(&lockLatency)
is.LockRepo(repo, &lockLatency)
defer func() {

Check failure on line 503 in pkg/storage/imagestore/imagestore.go

View workflow job for this annotation

GitHub Actions / lint

defer statements should only be cuddled with expressions on same variable (wsl)
is.Unlock(&lockLatency)
is.UnlockRepo(repo, &lockLatency)

if err == nil {
if is.storeDriver.Name() == storageConstants.LocalStorageDriverName {
Expand Down Expand Up @@ -596,8 +629,8 @@ func (is *ImageStore) DeleteImageManifest(repo, reference string, detectCollisio

var lockLatency time.Time

is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
is.LockRepo(repo, &lockLatency)
defer is.UnlockRepo(repo, &lockLatency)

err := is.deleteImageManifest(repo, reference, detectCollisions)
if err != nil {
Expand Down Expand Up @@ -885,8 +918,8 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig

var lockLatency time.Time

is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
is.LockRepo(repo, &lockLatency)
defer is.UnlockRepo(repo, &lockLatency)

if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
err = is.DedupeBlob(src, dstDigest, repo, dst)
Expand Down Expand Up @@ -965,8 +998,8 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi

var lockLatency time.Time

is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
is.LockRepo(repo, &lockLatency)
defer is.UnlockRepo(repo, &lockLatency)

dst := is.BlobPath(repo, dstDigest)

Expand Down Expand Up @@ -1168,11 +1201,11 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6
blobPath := is.BlobPath(repo, digest)

if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
is.LockRepo(repo, &lockLatency)
defer is.UnlockRepo(repo, &lockLatency)
} else {
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
is.RLockRepo(repo, &lockLatency)
defer is.RUnlockRepo(repo, &lockLatency)
}

binfo, err := is.storeDriver.Stat(blobPath)
Expand Down Expand Up @@ -1304,8 +1337,8 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT
return nil, -1, -1, err
}

is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
is.RLockRepo(repo, &lockLatency)
defer is.RUnlockRepo(repo, &lockLatency)

binfo, err := is.originalBlobInfo(repo, digest)
if err != nil {
Expand Down Expand Up @@ -1381,8 +1414,8 @@ func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType str
return nil, -1, err
}

is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
is.LockRepo(repo, &lockLatency)
defer is.UnlockRepo(repo, &lockLatency)

binfo, err := is.originalBlobInfo(repo, digest)
if err != nil {
Expand Down Expand Up @@ -1458,8 +1491,8 @@ func (is *ImageStore) GetReferrers(repo string, gdigest godigest.Digest, artifac
) (ispec.Index, error) {
var lockLatency time.Time

is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
is.RLockRepo(repo, &lockLatency)
defer is.RUnlockRepo(repo, &lockLatency)

return common.GetReferrers(is, repo, gdigest, artifactTypes, is.log)
}
Expand Down Expand Up @@ -1532,8 +1565,8 @@ func (is *ImageStore) DeleteBlob(repo string, digest godigest.Digest) error {
return err
}

is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
is.LockRepo(repo, &lockLatency)
defer is.UnlockRepo(repo, &lockLatency)

Check warning on line 1569 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1569

Added line #L1569 was not covered by tests

return is.deleteBlob(repo, digest)
}
Expand Down
98 changes: 98 additions & 0 deletions pkg/storage/imagestore/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package imagestore

import (
"sync"
)

type ImageStoreLock struct {
// locks per repository paths
repoLocks sync.Map
// lock for the entire storage, needed in case all repos need to be processed
// including blocking creating new repos
globalLock *sync.RWMutex
}

func NewImageStoreLock() *ImageStoreLock {
return &ImageStoreLock{
repoLocks: sync.Map{},
globalLock: &sync.RWMutex{},
}
}

func (sl *ImageStoreLock) RLock() {
// block reads and writes to the entire storage, including new repos
sl.globalLock.RLock()
}

func (sl *ImageStoreLock) RUnlock() {
// unlock to the storage in general
sl.globalLock.RUnlock()
}

func (sl *ImageStoreLock) Lock() {
// block reads and writes to the entire storage, including new repos
sl.globalLock.Lock()
}

func (sl *ImageStoreLock) Unlock() {
// unlock to the storage in general
sl.globalLock.Unlock()
}

func (sl *ImageStoreLock) RLockRepo(repo string) {
// besides the individual repo increment the read counter for the
// global lock, this will make sure the storage cannot be
// write-locked at global level while individual repos are accessed
sl.globalLock.RLock()

val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})

// lock individual repo
repoLock := val.(*sync.RWMutex)

Check failure on line 51 in pkg/storage/imagestore/lock.go

View workflow job for this annotation

GitHub Actions / lint

type assertion must be checked (forcetypeassert)
repoLock.RLock()
}

func (sl *ImageStoreLock) RUnlockRepo(repo string) {
// decrement the global read counter
sl.globalLock.RUnlock()

val, ok := sl.repoLocks.Load(repo)
if !ok {
// somehow the unlock is called for repo that was never locked
return

Check warning on line 62 in pkg/storage/imagestore/lock.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/lock.go#L62

Added line #L62 was not covered by tests
}

// read-unlock individual repo
repoLock := val.(*sync.RWMutex)

Check failure on line 66 in pkg/storage/imagestore/lock.go

View workflow job for this annotation

GitHub Actions / lint

type assertion must be checked (forcetypeassert)
repoLock.RUnlock()
}

func (sl *ImageStoreLock) LockRepo(repo string) {
// besides the individual repo increment the read counter for the
// global lock, this will make sure the storage cannot be
// write-locked at global level while individual repos are accessed
// we are not using the write lock here, as that would make all repos
// wait for one another
sl.globalLock.RLock()

val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})

// write-lock individual repo
repoLock := val.(*sync.RWMutex)

Check failure on line 81 in pkg/storage/imagestore/lock.go

View workflow job for this annotation

GitHub Actions / lint

type assertion must be checked (forcetypeassert)
repoLock.Lock()
}

func (sl *ImageStoreLock) UnlockRepo(repo string) {
// decrement the global read counter
sl.globalLock.RUnlock()

val, ok := sl.repoLocks.Load(repo)
if !ok {
// somehow the unlock is called for a repo that was never locked
return

Check warning on line 92 in pkg/storage/imagestore/lock.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/lock.go#L92

Added line #L92 was not covered by tests
}

// write-unlock individual repo
repoLock := val.(*sync.RWMutex)
repoLock.Unlock()
}
8 changes: 4 additions & 4 deletions pkg/storage/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func checkImage(
) ([]ispec.Descriptor, error) {
var lockLatency time.Time

imgStore.RLock(&lockLatency)
defer imgStore.RUnlock(&lockLatency)
imgStore.RLockRepo(imageName, &lockLatency)
defer imgStore.RUnlockRepo(imageName, &lockLatency)

manifestContent, err := imgStore.GetBlobContent(imageName, manifest.Digest)
if err != nil {
Expand All @@ -149,8 +149,8 @@ func checkImage(
func getIndex(imageName string, imgStore storageTypes.ImageStore) ([]byte, error) {
var lockLatency time.Time

imgStore.RLock(&lockLatency)
defer imgStore.RUnlock(&lockLatency)
imgStore.RLockRepo(imageName, &lockLatency)
defer imgStore.RUnlockRepo(imageName, &lockLatency)

// check image structure / layout
ok, err := imgStore.ValidateRepo(imageName)
Expand Down
Loading

0 comments on commit 32544c3

Please sign in to comment.