Skip to content

Commit

Permalink
fix concurrent read
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Jul 6, 2024
1 parent 1788891 commit 38f06dc
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 6 deletions.
2 changes: 2 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ func (db *DB) loadIndexFromWAL() error {
now := time.Now().UnixNano()
// get a reader for WAL
reader := db.dataFiles.NewReader()
db.dataFiles.SetIsStartupTraversal(true)
for {
// if the current segment id is less than the mergeFinSegmentId,
// we can skip this segment because it has been merged,
Expand Down Expand Up @@ -656,6 +657,7 @@ func (db *DB) loadIndexFromWAL() error {
})
}
}
db.dataFiles.SetIsStartupTraversal(false)
return nil
}

Expand Down
36 changes: 33 additions & 3 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,36 @@ func TestDB_Concurrent_Put(t *testing.T) {
assert.Equal(t, count, db.index.Size())
}

func TestDB_Concurrent_Get(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)

for i := 0; i < 10000; i++ {
err = db.Put(utils.GetTestKey(i), utils.RandomValue(128))
assert.Nil(t, err)
}
for i := 10000; i < 20000; i++ {
err = db.Put(utils.GetTestKey(i), utils.RandomValue(4096))
assert.Nil(t, err)
}

var wg sync.WaitGroup
wg.Add(50)
for i := 0; i < 50; i++ {
go func() {
defer wg.Done()
db.Ascend(func(key []byte, value []byte) (bool, error) {
assert.NotNil(t, key)
assert.NotNil(t, value)
return true, nil
})
}()
}
wg.Wait()
}

func TestDB_Ascend(t *testing.T) {
// Create a test database instance
options := DefaultOptions
Expand Down Expand Up @@ -791,11 +821,11 @@ func TestDB_Auto_Merge(t *testing.T) {

{
options.AutoMergeCronExpr = "* * * * * *" // every second
db, err := Open(options)
db2, err := Open(options)
assert.Nil(t, err)
{
<-time.After(time.Second * 2)
reader := db.dataFiles.NewReader()
reader := db2.dataFiles.NewReader()
var keyCnt int
for {
if _, _, err := reader.Next(); errors.Is(err, io.EOF) {
Expand All @@ -806,6 +836,6 @@ func TestDB_Auto_Merge(t *testing.T) {
// after merge records are only valid data, so totally is 2000
assert.Equal(t, 2000, keyCnt)
}
destroyDB(db)
_ = db2.Close()
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

require (
github.com/google/btree v1.1.2
github.com/rosedblabs/wal v1.3.7
github.com/rosedblabs/wal v1.3.8
github.com/valyala/bytebufferpool v1.0.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rosedblabs/wal v1.3.7 h1:ZB/xczf+/fEwbjbPnC/A6DLZRx0rxKgtQsWw2+SxKDg=
github.com/rosedblabs/wal v1.3.7/go.mod h1:DFvhrmTTeiXvn2btXXT2MW9Nvu99PU0g/pKGgh0+T+o=
github.com/rosedblabs/wal v1.3.8 h1:tErpD9JT/ICiyV3mv5l7qUH6lybn5XF1TbI0e8kvH8M=
github.com/rosedblabs/wal v1.3.8/go.mod h1:DFvhrmTTeiXvn2btXXT2MW9Nvu99PU0g/pKGgh0+T+o=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
Expand Down
2 changes: 2 additions & 0 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ func (db *DB) loadIndexFromHintFile() error {

// read all the hint records from the hint file
reader := hintFile.NewReader()
hintFile.SetIsStartupTraversal(true)
for {
chunk, _, err := reader.Next()
if err != nil {
Expand All @@ -349,5 +350,6 @@ func (db *DB) loadIndexFromHintFile() error {
// So just put them into the index without checking.
db.index.Put(key, position)
}
hintFile.SetIsStartupTraversal(false)
return nil
}

0 comments on commit 38f06dc

Please sign in to comment.