Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] add auto merge background task #298

Merged
merged 3 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/bwmarrin/snowflake"
"github.com/gofrs/flock"
"github.com/robfig/cron/v3"
"github.com/rosedblabs/rosedb/v2/index"
"github.com/rosedblabs/rosedb/v2/utils"
"github.com/rosedblabs/wal"
Expand Down Expand Up @@ -52,7 +53,8 @@ type DB struct {
encodeHeader []byte
watchCh chan *Event // user consume channel for watch events
watcher *Watcher
expiredCursorKey []byte // the location to which DeleteExpiredKeys executes.
expiredCursorKey []byte // the location to which DeleteExpiredKeys executes.
cronScheduler *cron.Cron // cron scheduler for auto merge task
}

// Stat represents the statistics of the database.
Expand Down Expand Up @@ -127,6 +129,17 @@ func Open(options Options) (*DB, error) {
go db.watcher.sendEvent(db.watchCh)
}

if len(options.AutoMergeCronExpr) > 0 {
db.cronScheduler = cron.New(cron.WithParser(
cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)))
db.cronScheduler.AddFunc(options.AutoMergeCronExpr, func() {
// maybe we should deal with different errors with different logic, but a background task can't omit its error.
// after auto merge, we should close and reopen the db.
_ = db.Merge(true)
})
db.cronScheduler.Start()
}

return db, nil
}

Expand Down Expand Up @@ -179,6 +192,11 @@ func (db *DB) Close() error {
close(db.watchCh)
}

// close auto merge cron scheduler
if db.cronScheduler != nil {
db.cronScheduler.Stop()
}

db.closed = true
return nil
}
Expand Down Expand Up @@ -552,6 +570,14 @@ func checkOptions(options Options) error {
if options.SegmentSize <= 0 {
return errors.New("database data file size must be greater than 0")
}

if len(options.AutoMergeCronExpr) > 0 {
if _, err := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor).
Parse(options.AutoMergeCronExpr); err != nil {
return fmt.Errorf("databse auto merge cron expression is invalid, err: %s", err)
}
}

return nil
}

Expand Down
105 changes: 105 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package rosedb

import (
"errors"
"io"
"math/rand"
"os"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -704,3 +707,105 @@ func TestDB_Persist(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, val2)
}

func TestDB_Invalid_Cron_Expression(t *testing.T) {
options := DefaultOptions
options.AutoMergeCronExpr = "*/1 * * * * * *"
_, err := Open(options)
assert.NotNil(t, err)
}

func TestDB_Valid_Cron_Expression(t *testing.T) {
options := DefaultOptions
{
options.AutoMergeCronExpr = "* */1 * * * *"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}

{
options.AutoMergeCronExpr = "*/1 * * * *"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}

{
options.AutoMergeCronExpr = "5 0 * 8 *"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}

{
options.AutoMergeCronExpr = "*/2 14 1 * *"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}

{
options.AutoMergeCronExpr = "@hourly"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}
}

func TestDB_Auto_Merge(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)

for i := 0; i < 2000; i++ {
delKey := utils.GetTestKey(rand.Int())
err := db.Put(delKey, utils.RandomValue(128))
assert.Nil(t, err)
err = db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(2*KB))
assert.Nil(t, err)
err = db.Delete(delKey)
assert.Nil(t, err)
}

{
reader := db.dataFiles.NewReader()
var keyCnt int
for {
if _, _, err := reader.Next(); errors.Is(err, io.EOF) {
break
}
keyCnt++
}
// each record has one data wal and commit at end of batch with wal
// so totally is 2000 * 3 * 2 = 12000
assert.Equal(t, 12000, keyCnt)
}

mergeDirPath := mergeDirPath(options.DirPath)
if _, err := os.Stat(mergeDirPath); err != nil {
assert.True(t, os.IsNotExist(err))
}
assert.NoError(t, db.Close())

{
options.AutoMergeCronExpr = "* * * * * *" // every second
db, err := Open(options)
assert.Nil(t, err)
{
<-time.After(time.Second * 2)
reader := db.dataFiles.NewReader()
var keyCnt int
for {
if _, _, err := reader.Next(); errors.Is(err, io.EOF) {
break
}
keyCnt++
}
// after merge records are only valid data, so totally is 2000
assert.Equal(t, 2000, keyCnt)
}
destroyDB(db)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.0 // indirect
golang.org/x/sys v0.11.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020 h1:EA8XGCVg1FDM6Dh4MP4sTsmH3gvjhRtp/N+lbnBwtJE=
github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand Down
24 changes: 18 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ type Options struct {
// WatchQueueSize the cache length of the watch queue.
// if the size greater than 0, which means enable the watch.
WatchQueueSize uint64

// AutoMergeEnable enable the auto merge.
// auto merge will be triggered when cron expr is satisfied.
// cron expression follows the standard cron expression.
// e.g. "0 0 * * *" means merge at 00:00:00 every day.
// it also supports seconds optionally.
// when enable the second field, the cron expression will be like this: "0/10 * * * * *" (every 10 seconds).
// when auto merge is enabled, the db will be closed and reopened after merge done.
// do not set this shecule too frequently, it will affect the performance.
// refer to https://en.wikipedia.org/wiki/Cron
AutoMergeCronExpr string
}

// BatchOptions specifies the options for creating a batch.
Expand All @@ -49,12 +60,13 @@ const (
)

var DefaultOptions = Options{
DirPath: tempDBDir(),
SegmentSize: 1 * GB,
BlockCache: 0,
Sync: false,
BytesPerSync: 0,
WatchQueueSize: 0,
DirPath: tempDBDir(),
SegmentSize: 1 * GB,
BlockCache: 0,
Sync: false,
BytesPerSync: 0,
WatchQueueSize: 0,
AutoMergeCronExpr: "",
}

var DefaultBatchOptions = BatchOptions{
Expand Down
Loading