Skip to content

Commit

Permalink
Merge pull request #8796 from dolthub/aaron/gc-read-dependencies-oldg…
Browse files Browse the repository at this point in the history
…en-fixup

[no-release-notes] go/store/nbs: generational_chunk_store.go: In GCMode_Full, also take dependencies on chunks read from OldGen.
  • Loading branch information
reltuk authored Jan 30, 2025
2 parents caaa4bd + 1b59420 commit 3dd50f5
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 60 deletions.
10 changes: 8 additions & 2 deletions go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ type ChunkStoreGarbageCollector interface {
//
// This function should not block indefinitely and should return an
// error if a GC is already in progress.
BeginGC(addChunk func(hash.Hash) bool) error
BeginGC(addChunk func(hash.Hash) bool, mode GCMode) error

// EndGC indicates that the GC is over. The previously provided
// addChunk function must not be called after this function.
EndGC()
EndGC(mode GCMode)

// MarkAndSweepChunks returns a handle that can be used to supply
// hashes which should be saved into |dest|. The hashes are
Expand Down Expand Up @@ -257,6 +257,12 @@ type GenerationalCS interface {
NewGen() ChunkStoreGarbageCollector
OldGen() ChunkStoreGarbageCollector
GhostGen() ChunkStore

// Has the same return values as OldGen().HasMany, but should be used by a
// generational GC process as the filter function instead of
// OldGen().HasMany. This function never takes read dependencies on the
// chunks that it queries.
OldGenGCFilter() HasManyFunc
}

var ErrUnsupportedOperation = errors.New("operation not supported")
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,11 @@ func (ms *MemoryStoreView) Commit(ctx context.Context, current, last hash.Hash)
return success, nil
}

func (ms *MemoryStoreView) BeginGC(keeper func(hash.Hash) bool) error {
func (ms *MemoryStoreView) BeginGC(keeper func(hash.Hash) bool, _ GCMode) error {
return ms.transitionToGC(keeper)
}

func (ms *MemoryStoreView) EndGC() {
func (ms *MemoryStoreView) EndGC(_ GCMode) {
ms.transitionToNoGC()
}

Expand Down
8 changes: 4 additions & 4 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,20 @@ func (s *TestStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCurry
return s.ChunkStore.Put(ctx, c, getAddrs)
}

func (s *TestStoreView) BeginGC(keeper func(hash.Hash) bool) error {
func (s *TestStoreView) BeginGC(keeper func(hash.Hash) bool, mode GCMode) error {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok {
return ErrUnsupportedOperation
}
return collector.BeginGC(keeper)
return collector.BeginGC(keeper, mode)
}

func (s *TestStoreView) EndGC() {
func (s *TestStoreView) EndGC(mode GCMode) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok {
panic(ErrUnsupportedOperation)
}
collector.EndGC()
collector.EndGC(mode)
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
Expand Down
33 changes: 29 additions & 4 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,37 @@ func (gcs *GenerationalNBS) UpdateManifest(ctx context.Context, updates map[hash
return gcs.newGen.UpdateManifest(ctx, updates)
}

func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool) error {
return gcs.newGen.BeginGC(keeper)
func (gcs *GenerationalNBS) OldGenGCFilter() chunks.HasManyFunc {
return func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
return gcs.oldGen.hasManyDep(ctx, hashes, gcDependencyMode_NoDependency)
}
}

func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCMode) error {
err := gcs.newGen.BeginGC(keeper, mode)
if err != nil {
return err
}
// In GCMode_Full, the OldGen is also being collected. In normal
// operation, the OldGen is not being collected because it is
// still growing monotonically and nothing in it is at risk of
// going away. In Full mode, we want to take read dependencies
// from the OldGen as well.
if mode == chunks.GCMode_Full {
err = gcs.oldGen.BeginGC(keeper, mode)
if err != nil {
gcs.newGen.EndGC(mode)
return err
}
}
return nil
}

func (gcs *GenerationalNBS) EndGC(mode chunks.GCMode) {
if mode == chunks.GCMode_Full {
gcs.oldGen.EndGC(mode)
}
gcs.newGen.EndGC(mode)
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
Expand Down
8 changes: 4 additions & 4 deletions go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ func (nbsMW *NBSMetricWrapper) SupportedOperations() chunks.TableFileStoreOps {
return nbsMW.nbs.SupportedOperations()
}

func (nbsMW *NBSMetricWrapper) BeginGC(keeper func(hash.Hash) bool) error {
return nbsMW.nbs.BeginGC(keeper)
func (nbsMW *NBSMetricWrapper) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCMode) error {
return nbsMW.nbs.BeginGC(keeper, mode)
}

func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
func (nbsMW *NBSMetricWrapper) EndGC(mode chunks.GCMode) {
nbsMW.nbs.EndGC(mode)
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
Expand Down
32 changes: 14 additions & 18 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ func (nbs *NomsBlockStore) conjoinIfRequired(ctx context.Context) (bool, error)
func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
err = nbs.waitForGC(ctx)
if err != nil {
return
}

err = nbs.checkAllManifestUpdatesExist(ctx, updates)
if err != nil {
return
Expand Down Expand Up @@ -361,11 +356,6 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
err = nbs.waitForGC(ctx)
if err != nil {
return
}

err = nbs.checkAllManifestUpdatesExist(ctx, updates)
if err != nil {
return
Expand Down Expand Up @@ -517,11 +507,6 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root hash.Hash, tableFiles map[hash.Hash]uint32, appendixTableFiles map[hash.Hash]uint32) (err error) {
store.mu.Lock()
defer store.mu.Unlock()
err = store.waitForGC(ctx)
if err != nil {
return
}

contents := manifestContents{
root: root,
nbfVers: store.upstream.nbfVers,
Expand Down Expand Up @@ -1128,6 +1113,10 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
}

func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
return nbs.hasManyDep(ctx, hashes, gcDependencyMode_TakeDependency)
}

func (nbs *NomsBlockStore) hasManyDep(ctx context.Context, hashes hash.HashSet, gcDepMode gcDependencyMode) (hash.HashSet, error) {
if hashes.Size() == 0 {
return nil, nil
}
Expand All @@ -1143,7 +1132,11 @@ func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha

nbs.mu.Lock()
if nbs.mt != nil {
remaining, gcb, err := nbs.mt.hasMany(reqs, nbs.keeperFunc)
keeper := nbs.keeperFunc
if gcDepMode == gcDependencyMode_NoDependency {
keeper = nil
}
remaining, gcb, err := nbs.mt.hasMany(reqs, keeper)
if err != nil {
nbs.mu.Unlock()
return nil, err
Expand All @@ -1162,6 +1155,9 @@ func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
}
}
tables, keeper, endRead := nbs.tables, nbs.keeperFunc, nbs.beginRead()
if gcDepMode == gcDependencyMode_NoDependency {
keeper = nil
}
nbs.mu.Unlock()

remaining, gcb, err := tables.hasMany(reqs, keeper)
Expand Down Expand Up @@ -1730,7 +1726,7 @@ func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context) (err error) {
}, mtime)
}

func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool) error {
func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool, _ chunks.GCMode) error {
nbs.cond.L.Lock()
defer nbs.cond.L.Unlock()
if nbs.gcInProgress {
Expand All @@ -1742,7 +1738,7 @@ func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool) error {
return nil
}

func (nbs *NomsBlockStore) EndGC() {
func (nbs *NomsBlockStore) EndGC(_ chunks.GCMode) {
nbs.cond.L.Lock()
defer nbs.cond.L.Unlock()
if !nbs.gcInProgress {
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func TestNBSCopyGC(t *testing.T) {
require.NoError(t, err)
require.True(t, ok)

require.NoError(t, st.BeginGC(nil))
require.NoError(t, st.BeginGC(nil, chunks.GCMode_Full))
noopFilter := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
return hashes, nil
}
Expand All @@ -349,7 +349,7 @@ func TestNBSCopyGC(t *testing.T) {
require.NoError(t, err)
require.NoError(t, sweeper.Close(ctx))
require.NoError(t, finalizer.SwapChunksInStore(ctx))
st.EndGC()
st.EndGC(chunks.GCMode_Full)

for h, c := range keepers {
out, err := st.Get(ctx, h)
Expand Down
12 changes: 6 additions & 6 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
var oldGenHasMany chunks.HasManyFunc
switch mode {
case GCModeDefault:
oldGenHasMany = oldGen.HasMany
oldGenHasMany = gcs.OldGenGCFilter()
chksMode = chunks.GCMode_Default
case GCModeFull:
oldGenHasMany = unfilteredHashFunc
Expand All @@ -601,11 +601,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}

err := func() error {
err := collector.BeginGC(lvs.gcAddChunk)
err := collector.BeginGC(lvs.gcAddChunk, chksMode)
if err != nil {
return err
}
defer collector.EndGC()
defer collector.EndGC(chksMode)

var callCancelSafepoint bool
if safepoint != nil {
Expand Down Expand Up @@ -650,7 +650,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}

if mode == GCModeDefault {
oldGenHasMany = oldGen.HasMany
oldGenHasMany = gcs.OldGenGCFilter()
} else {
oldGenHasMany = newFileHasMany
}
Expand Down Expand Up @@ -685,11 +685,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.InsertAll(oldGenRefs)

err := func() error {
err := collector.BeginGC(lvs.gcAddChunk)
err := collector.BeginGC(lvs.gcAddChunk, chunks.GCMode_Full)
if err != nil {
return err
}
defer collector.EndGC()
defer collector.EndGC(chunks.GCMode_Full)

var callCancelSafepoint bool
if safepoint != nil {
Expand Down
56 changes: 38 additions & 18 deletions integration-tests/go-sql-server-driver/concurrent_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,49 @@ import (
)

func TestConcurrentGC(t *testing.T) {
var gct gcTest
gct.numThreads = 8
gct.duration = 10 * time.Second
t.Run("NoCommits", func(t *testing.T) {
gct.run(t)
t.Run("Normal", func(t *testing.T) {
var gct = gcTest{
numThreads: 8,
duration: 10 * time.Second,
}
gct.run(t)
})
t.Run("Full", func(t *testing.T) {
var gct = gcTest{
numThreads: 8,
duration: 10 * time.Second,
full: true,
}
gct.run(t)
})
})
gct.commit = true
t.Run("WithCommits", func(t *testing.T) {
gct.run(t)
t.Run("Normal", func(t *testing.T) {
var gct = gcTest{
numThreads: 8,
duration: 10 * time.Second,
commit: true,
}
gct.run(t)
})
t.Run("Full", func(t *testing.T) {
var gct = gcTest{
numThreads: 8,
duration: 10 * time.Second,
commit: true,
full: true,
}
gct.run(t)
})
})
}

type gcTest struct {
numThreads int
duration time.Duration
commit bool
full bool
}

func (gct gcTest) createDB(t *testing.T, ctx context.Context, db *sql.DB) {
Expand Down Expand Up @@ -118,19 +145,12 @@ func (gct gcTest) doGC(t *testing.T, ctx context.Context, db *sql.DB) error {
})
}()
b := time.Now()
_, err = conn.ExecContext(ctx, "call dolt_gc()")
if err != nil {
if !assert.NotContains(t, err.Error(), "dangling ref") {
return err
}
if !assert.NotContains(t, err.Error(), "is unexpected noms value") {
return err
}
if !assert.NotContains(t, err.Error(), "interface conversion: types.Value is nil") {
return err
}
t.Logf("err in Exec dolt_gc: %v", err)
if !gct.full {
_, err = conn.ExecContext(ctx, "call dolt_gc()")
} else {
_, err = conn.ExecContext(ctx, `call dolt_gc("--full")`)
}
if assert.NoError(t, err) {
t.Logf("successful dolt_gc took %v", time.Since(b))
}
return nil
Expand Down

0 comments on commit 3dd50f5

Please sign in to comment.