Skip to content

Commit

Permalink
db: fix TestCrashOpenCrashAfterWALCreation flake
Browse files Browse the repository at this point in the history
The errorfs-based instrumentation had a race between the wal creation
actually happening on the underlying FS and a dir sync (and/or between
the dir sync actually happening on the underlying FS and a subsequent
operation).

We use a custom instrumented FS that acts *after* the operation
happens on the MemFS.

Fixes #4306
  • Loading branch information
RaduBerinde committed Jan 30, 2025
1 parent 63439f1 commit 906e431
Showing 1 changed file with 61 additions and 20 deletions.
81 changes: 61 additions & 20 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,33 +994,18 @@ func TestCrashOpenCrashAfterWALCreation(t *testing.T) {
// file is created and after a subsequent directory sync occurs. This
// simulates a crash after the new log file is created and synced.
crashFS = nil
var crashFSAtomic atomic.Pointer[vfs.MemFS]
instrumentedFS := &crashAfterLogCreationFS{MemFS: fs}
{
var walCreated, dirSynced atomic.Bool
d, err := Open("", &Options{
FS: errorfs.Wrap(fs, errorfs.InjectorFunc(func(op errorfs.Op) error {
if dirSynced.Load() {
crashFSAtomic.CompareAndSwap(nil, fs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0}))
}
if op.Kind == errorfs.OpCreate && filepath.Ext(op.Path) == ".log" {
walCreated.Store(true)
}
// Record when there's a sync of the data directory after the
// WAL was created. The data directory will have an empty
// path because that's what we passed into Open.
if op.Kind == errorfs.OpFileSync && op.Path == "" && walCreated.Load() {
dirSynced.Store(true)
}
return nil
})),
FS: instrumentedFS,
Logger: testLogger{t},
})
require.NoError(t, err)
require.NoError(t, d.Close())
}

require.NotNil(t, crashFSAtomic.Load())
fs = crashFSAtomic.Load()
fs = instrumentedFS.saved
require.NotNil(t, fs)

newLogs := getLogs()
if n := len(newLogs); n > 2 || n < 1 {
Expand All @@ -1033,7 +1018,10 @@ func TestCrashOpenCrashAfterWALCreation(t *testing.T) {
require.NoError(t, err)
curLogNum, err := strconv.Atoi(strings.Split(newLogs[0], ".")[0])
require.NoError(t, err)
require.Greater(t, curLogNum, origLogNum)
if curLogNum == origLogNum {
t.Logf("logs: %v newLogs: %v", logs, newLogs)
}
require.Greater(t, curLogNum, origLogNum, "%v %v", logs, newLogs)
}

// Finally, open the database with syncs enabled.
Expand All @@ -1042,6 +1030,59 @@ func TestCrashOpenCrashAfterWALCreation(t *testing.T) {
require.NoError(t, d.Close())
}

// crashAfterLogCreationFS wraps a MemFS and extracts a crash clone after a WAL
// is created and the directory is subsequently synced.
// TODO(radu): replace this with errorfs predicates; see
// https://github.com/cockroachdb/pebble/pull/4308#pullrequestreview-2584817912
type crashAfterLogCreationFS struct {
*vfs.MemFS
// state is 0 before the first log is created, 1 after the first log is
// created, 2 after the dir is synced.
state atomic.Int32
saved *vfs.MemFS
}

// crashAfterLogCreationDir wraps the File for the store directory to instrument
// its Sync method.
type crashAfterLogCreationDir struct {
vfs.File

fs *crashAfterLogCreationFS
}

func (fs *crashAfterLogCreationFS) Create(
name string, category vfs.DiskWriteCategory,
) (vfs.File, error) {
f, err := fs.MemFS.Create(name, category)
if err == nil && filepath.Ext(name) == ".log" {
fs.state.CompareAndSwap(0, 1)
}
return f, err
}

func (fs *crashAfterLogCreationFS) OpenDir(fullname string) (vfs.File, error) {
f, err := fs.MemFS.OpenDir(fullname)
// The store directory will have an empty name (this is what we pass to Open).
if err != nil || fullname != "" {
return f, err
}
// Wrap the store directory File.
return &crashAfterLogCreationDir{File: f, fs: fs}, nil
}

func (d *crashAfterLogCreationDir) Sync() error {
if err := d.File.Sync(); err != nil {
return err
}
// Record the first time there's a sync of the data directory after the WAL
// was created.
if d.fs.state.CompareAndSwap(1, 2) {
// The atomic 1->2 transition can only happen once, so there is no race.
d.fs.saved = d.fs.MemFS.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
}
return nil
}

// TestOpenWALReplayReadOnlySeqNums tests opening a database:
// - in read-only mode
// - with multiple unflushed log files that must replayed
Expand Down

0 comments on commit 906e431

Please sign in to comment.