From b6acaa05ddb8ce5c4b70106397264ec842bfe376 Mon Sep 17 00:00:00 2001 From: edward Date: Tue, 26 Dec 2023 22:21:14 +0800 Subject: [PATCH 1/3] add auto merge background task --- db.go | 27 +++++++++++++++++++- db_test.go | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 ++ options.go | 27 +++++++++++++++----- 5 files changed, 124 insertions(+), 7 deletions(-) diff --git a/db.go b/db.go index e4ae5c7a..b4f53f54 100644 --- a/db.go +++ b/db.go @@ -13,6 +13,7 @@ import ( "github.com/bwmarrin/snowflake" "github.com/gofrs/flock" + "github.com/robfig/cron/v3" "github.com/rosedblabs/rosedb/v2/index" "github.com/rosedblabs/rosedb/v2/utils" "github.com/rosedblabs/wal" @@ -52,7 +53,8 @@ type DB struct { encodeHeader []byte watchCh chan *Event // user consume channel for watch events watcher *Watcher - expiredCursorKey []byte // the location to which DeleteExpiredKeys executes. + expiredCursorKey []byte // the location to which DeleteExpiredKeys executes. + cronScheduler *cron.Cron // cron scheduler for auto merge task } // Stat represents the statistics of the database. @@ -127,6 +129,16 @@ func Open(options Options) (*DB, error) { go db.watcher.sendEvent(db.watchCh) } + if len(options.AutoMergeCronExpr) > 0 { + db.cronScheduler = cron.New(cron.WithParser( + cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor))) + db.cronScheduler.AddFunc(options.AutoMergeCronExpr, func() { + // maybe we should deal with different errors with different logic, but a background task can't omit its error + _ = db.Merge(options.AutoMergeReopenAfterDone) + }) + db.cronScheduler.Start() + } + return db, nil } @@ -179,6 +191,11 @@ func (db *DB) Close() error { close(db.watchCh) } + // close auto merge cron scheduler + if db.cronScheduler != nil { + db.cronScheduler.Stop() + } + db.closed = true return nil } @@ -552,6 +569,14 @@ func checkOptions(options Options) error { if options.SegmentSize <= 0 { return errors.New("database data file size must be greater than 0") } + + if len(options.AutoMergeCronExpr) > 0 { + if _, err := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor). + Parse(options.AutoMergeCronExpr); err != nil { + return fmt.Errorf("databse auto merge cron expression is invalid, err: %s", err) + } + } + return nil } diff --git a/db_test.go b/db_test.go index fb8fadeb..33f6c1c5 100644 --- a/db_test.go +++ b/db_test.go @@ -704,3 +704,77 @@ func TestDB_Persist(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, val2) } + +func TestDB_invalid_cron_expression(t *testing.T) { + options := DefaultOptions + options.AutoMergeCronExpr = "*/1 * * * * * *" + _, err := Open(options) + assert.NotNil(t, err) +} + +func TestDB_valid_cron_expression(t *testing.T) { + options := DefaultOptions + options.AutoMergeCronExpr = "* */1 * * * *" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + + options.AutoMergeCronExpr = "*/1 * * * *" + db, err = Open(options) + assert.Nil(t, err) + destroyDB(db) + + options.AutoMergeCronExpr = "5 0 * 8 *" + db, err = Open(options) + assert.Nil(t, err) + destroyDB(db) + + options.AutoMergeCronExpr = "*/2 14 1 * *" + db, err = Open(options) + assert.Nil(t, err) + destroyDB(db) + + options.AutoMergeCronExpr = "@hourly" + db, err = Open(options) + assert.Nil(t, err) + destroyDB(db) +} + +func TestDB_autoMerge(t *testing.T) { + options := DefaultOptions + options.AutoMergeCronExpr = "* * * * * *" + options.AutoMergeReopenAfterDone = true + db, err := Open(options) + assert.Nil(t, err) + defer destroyDB(db) + + var recordSize int + timeAfter := time.After(time.Second * 2) +WriteLoop: + for { + select { + case <-timeAfter: + break WriteLoop + + default: + err := db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(128)) + assert.Nil(t, err) + err = db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(KB)) + assert.Nil(t, err) + err = db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(5*KB)) + assert.Nil(t, err) + recordSize += 3 + } + } + + // reopen + err = db.Close() + assert.Nil(t, err) + db2, err := Open(options) + assert.Nil(t, err) + defer func() { + _ = db2.Close() + }() + stat := db2.Stat() + assert.Equal(t, recordSize, stat.KeysNum) +} diff --git a/go.mod b/go.mod index 4a60da5d..c4b03360 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/robfig/cron/v3 v3.0.0 // indirect golang.org/x/sys v0.11.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 52fddef1..83493971 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020 h1:EA8XGCVg1FDM6Dh4MP4sTsmH3gvjhRtp/N+lbnBwtJE= github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= diff --git a/options.go b/options.go index 5109c499..4533b971 100644 --- a/options.go +++ b/options.go @@ -31,6 +31,19 @@ type Options struct { // WatchQueueSize the cache length of the watch queue. // if the size greater than 0, which means enable the watch. WatchQueueSize uint64 + + // AutoMergeEnable enable the auto merge. + // auto merge will be triggered when cron expr is satisfied. + // cron expression follows the standard cron expression. + // e.g. "0 0 * * *" means merge at 00:00:00 every day. + // it also supports seconds optionally. + // when enable the second field, the cron expression will be like this: "0/10 * * * * *" (every 10 seconds). + // refer to https://en.wikipedia.org/wiki/Cron + AutoMergeCronExpr string + + // AutoMergeReopenAfterDone reopen the db after auto merge done. + // refer to function mergeDB() in db.go + AutoMergeReopenAfterDone bool } // BatchOptions specifies the options for creating a batch. @@ -49,12 +62,14 @@ const ( ) var DefaultOptions = Options{ - DirPath: tempDBDir(), - SegmentSize: 1 * GB, - BlockCache: 0, - Sync: false, - BytesPerSync: 0, - WatchQueueSize: 0, + DirPath: tempDBDir(), + SegmentSize: 1 * GB, + BlockCache: 0, + Sync: false, + BytesPerSync: 0, + WatchQueueSize: 0, + AutoMergeCronExpr: "", + AutoMergeReopenAfterDone: false, } var DefaultBatchOptions = BatchOptions{ From 75e0fbc7cbe83827b6acdd3a4772acfc5bf675e8 Mon Sep 17 00:00:00 2001 From: edward Date: Tue, 2 Jan 2024 20:14:46 +0800 Subject: [PATCH 2/3] add unit test for auto merge and make reopen after merge as default action --- db.go | 5 ++-- db_test.go | 76 +++++++++++++++++++++++++++++++++--------------------- options.go | 21 +++++++-------- 3 files changed, 59 insertions(+), 43 deletions(-) diff --git a/db.go b/db.go index b4f53f54..edfd0791 100644 --- a/db.go +++ b/db.go @@ -133,8 +133,9 @@ func Open(options Options) (*DB, error) { db.cronScheduler = cron.New(cron.WithParser( cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor))) db.cronScheduler.AddFunc(options.AutoMergeCronExpr, func() { - // maybe we should deal with different errors with different logic, but a background task can't omit its error - _ = db.Merge(options.AutoMergeReopenAfterDone) + // maybe we should deal with different errors with different logic, but a background task can't omit its error. + // after auto merge, we should close and reopen the db. + _ = db.Merge(true) }) db.cronScheduler.Start() } diff --git a/db_test.go b/db_test.go index 33f6c1c5..8cf75541 100644 --- a/db_test.go +++ b/db_test.go @@ -1,7 +1,10 @@ package rosedb import ( + "errors" + "io" "math/rand" + "os" "sync" "testing" "time" @@ -705,14 +708,14 @@ func TestDB_Persist(t *testing.T) { assert.NotNil(t, val2) } -func TestDB_invalid_cron_expression(t *testing.T) { +func TestDB_Invalid_Cron_Expression(t *testing.T) { options := DefaultOptions options.AutoMergeCronExpr = "*/1 * * * * * *" _, err := Open(options) assert.NotNil(t, err) } -func TestDB_valid_cron_expression(t *testing.T) { +func TestDB_Valid_Cron_Expression(t *testing.T) { options := DefaultOptions options.AutoMergeCronExpr = "* */1 * * * *" db, err := Open(options) @@ -740,41 +743,56 @@ func TestDB_valid_cron_expression(t *testing.T) { destroyDB(db) } -func TestDB_autoMerge(t *testing.T) { +func TestDB_Auto_Merge(t *testing.T) { options := DefaultOptions - options.AutoMergeCronExpr = "* * * * * *" - options.AutoMergeReopenAfterDone = true db, err := Open(options) assert.Nil(t, err) defer destroyDB(db) - var recordSize int - timeAfter := time.After(time.Second * 2) -WriteLoop: - for { - select { - case <-timeAfter: - break WriteLoop + for i := 0; i < 2000; i++ { + delKey := utils.GetTestKey(rand.Int()) + err := db.Put(delKey, utils.RandomValue(128)) + assert.Nil(t, err) + err = db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(2*KB)) + assert.Nil(t, err) + err = db.Delete(delKey) + assert.Nil(t, err) + } - default: - err := db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(128)) - assert.Nil(t, err) - err = db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(KB)) - assert.Nil(t, err) - err = db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(5*KB)) - assert.Nil(t, err) - recordSize += 3 + { + reader := db.dataFiles.NewReader() + var keyCnt int + for { + if _, _, err := reader.Next(); errors.Is(err, io.EOF) { + break + } + keyCnt++ } + // each record has one data wal and commit at end of batch with wal + // so totally is 2000 * 3 * 2 = 12000 + assert.Equal(t, 12000, keyCnt) } - // reopen - err = db.Close() - assert.Nil(t, err) - db2, err := Open(options) + mergeDirPath := mergeDirPath(options.DirPath) + if _, err := os.Stat(mergeDirPath); err != nil { + assert.True(t, os.IsNotExist(err)) + } + assert.NoError(t, db.Close()) + + options.AutoMergeCronExpr = "* * * * * *" // every second + db, err = Open(options) assert.Nil(t, err) - defer func() { - _ = db2.Close() - }() - stat := db2.Stat() - assert.Equal(t, recordSize, stat.KeysNum) + { + <-time.After(time.Second * 2) + reader := db.dataFiles.NewReader() + var keyCnt int + for { + if _, _, err := reader.Next(); errors.Is(err, io.EOF) { + break + } + keyCnt++ + } + // after merge records are only valid data, so totally is 2000 + assert.Equal(t, 2000, keyCnt) + } } diff --git a/options.go b/options.go index 4533b971..e8be46e3 100644 --- a/options.go +++ b/options.go @@ -38,12 +38,10 @@ type Options struct { // e.g. "0 0 * * *" means merge at 00:00:00 every day. // it also supports seconds optionally. // when enable the second field, the cron expression will be like this: "0/10 * * * * *" (every 10 seconds). + // when auto merge is enabled, the db will be closed and reopened after merge done. + // do not set this shecule too frequently, it will affect the performance. // refer to https://en.wikipedia.org/wiki/Cron AutoMergeCronExpr string - - // AutoMergeReopenAfterDone reopen the db after auto merge done. - // refer to function mergeDB() in db.go - AutoMergeReopenAfterDone bool } // BatchOptions specifies the options for creating a batch. @@ -62,14 +60,13 @@ const ( ) var DefaultOptions = Options{ - DirPath: tempDBDir(), - SegmentSize: 1 * GB, - BlockCache: 0, - Sync: false, - BytesPerSync: 0, - WatchQueueSize: 0, - AutoMergeCronExpr: "", - AutoMergeReopenAfterDone: false, + DirPath: tempDBDir(), + SegmentSize: 1 * GB, + BlockCache: 0, + Sync: false, + BytesPerSync: 0, + WatchQueueSize: 0, + AutoMergeCronExpr: "", } var DefaultBatchOptions = BatchOptions{ From becd1548de9527a8a86b2fd15ad92bb9d7505627 Mon Sep 17 00:00:00 2001 From: edward Date: Wed, 3 Jan 2024 18:30:06 +0800 Subject: [PATCH 3/3] fix unit test clean up problem --- db_test.go | 77 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/db_test.go b/db_test.go index 8cf75541..ac017352 100644 --- a/db_test.go +++ b/db_test.go @@ -717,30 +717,40 @@ func TestDB_Invalid_Cron_Expression(t *testing.T) { func TestDB_Valid_Cron_Expression(t *testing.T) { options := DefaultOptions - options.AutoMergeCronExpr = "* */1 * * * *" - db, err := Open(options) - assert.Nil(t, err) - destroyDB(db) + { + options.AutoMergeCronExpr = "* */1 * * * *" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } - options.AutoMergeCronExpr = "*/1 * * * *" - db, err = Open(options) - assert.Nil(t, err) - destroyDB(db) + { + options.AutoMergeCronExpr = "*/1 * * * *" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } - options.AutoMergeCronExpr = "5 0 * 8 *" - db, err = Open(options) - assert.Nil(t, err) - destroyDB(db) + { + options.AutoMergeCronExpr = "5 0 * 8 *" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } - options.AutoMergeCronExpr = "*/2 14 1 * *" - db, err = Open(options) - assert.Nil(t, err) - destroyDB(db) + { + options.AutoMergeCronExpr = "*/2 14 1 * *" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } - options.AutoMergeCronExpr = "@hourly" - db, err = Open(options) - assert.Nil(t, err) - destroyDB(db) + { + options.AutoMergeCronExpr = "@hourly" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } } func TestDB_Auto_Merge(t *testing.T) { @@ -779,20 +789,23 @@ func TestDB_Auto_Merge(t *testing.T) { } assert.NoError(t, db.Close()) - options.AutoMergeCronExpr = "* * * * * *" // every second - db, err = Open(options) - assert.Nil(t, err) { - <-time.After(time.Second * 2) - reader := db.dataFiles.NewReader() - var keyCnt int - for { - if _, _, err := reader.Next(); errors.Is(err, io.EOF) { - break + options.AutoMergeCronExpr = "* * * * * *" // every second + db, err := Open(options) + assert.Nil(t, err) + { + <-time.After(time.Second * 2) + reader := db.dataFiles.NewReader() + var keyCnt int + for { + if _, _, err := reader.Next(); errors.Is(err, io.EOF) { + break + } + keyCnt++ } - keyCnt++ + // after merge records are only valid data, so totally is 2000 + assert.Equal(t, 2000, keyCnt) } - // after merge records are only valid data, so totally is 2000 - assert.Equal(t, 2000, keyCnt) + destroyDB(db) } }