diff --git a/db.go b/db.go index e4ae5c7a..edfd0791 100644 --- a/db.go +++ b/db.go @@ -13,6 +13,7 @@ import ( "github.com/bwmarrin/snowflake" "github.com/gofrs/flock" + "github.com/robfig/cron/v3" "github.com/rosedblabs/rosedb/v2/index" "github.com/rosedblabs/rosedb/v2/utils" "github.com/rosedblabs/wal" @@ -52,7 +53,8 @@ type DB struct { encodeHeader []byte watchCh chan *Event // user consume channel for watch events watcher *Watcher - expiredCursorKey []byte // the location to which DeleteExpiredKeys executes. + expiredCursorKey []byte // the location to which DeleteExpiredKeys executes. + cronScheduler *cron.Cron // cron scheduler for auto merge task } // Stat represents the statistics of the database. @@ -127,6 +129,17 @@ func Open(options Options) (*DB, error) { go db.watcher.sendEvent(db.watchCh) } + if len(options.AutoMergeCronExpr) > 0 { + db.cronScheduler = cron.New(cron.WithParser( + cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor))) + db.cronScheduler.AddFunc(options.AutoMergeCronExpr, func() { + // maybe we should deal with different errors with different logic, but a background task can't omit its error. + // after auto merge, we should close and reopen the db. + _ = db.Merge(true) + }) + db.cronScheduler.Start() + } + return db, nil } @@ -179,6 +192,11 @@ func (db *DB) Close() error { close(db.watchCh) } + // close auto merge cron scheduler + if db.cronScheduler != nil { + db.cronScheduler.Stop() + } + db.closed = true return nil } @@ -552,6 +570,14 @@ func checkOptions(options Options) error { if options.SegmentSize <= 0 { return errors.New("database data file size must be greater than 0") } + + if len(options.AutoMergeCronExpr) > 0 { + if _, err := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor). + Parse(options.AutoMergeCronExpr); err != nil { + return fmt.Errorf("databse auto merge cron expression is invalid, err: %s", err) + } + } + return nil } diff --git a/db_test.go b/db_test.go index fb8fadeb..ac017352 100644 --- a/db_test.go +++ b/db_test.go @@ -1,7 +1,10 @@ package rosedb import ( + "errors" + "io" "math/rand" + "os" "sync" "testing" "time" @@ -704,3 +707,105 @@ func TestDB_Persist(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, val2) } + +func TestDB_Invalid_Cron_Expression(t *testing.T) { + options := DefaultOptions + options.AutoMergeCronExpr = "*/1 * * * * * *" + _, err := Open(options) + assert.NotNil(t, err) +} + +func TestDB_Valid_Cron_Expression(t *testing.T) { + options := DefaultOptions + { + options.AutoMergeCronExpr = "* */1 * * * *" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } + + { + options.AutoMergeCronExpr = "*/1 * * * *" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } + + { + options.AutoMergeCronExpr = "5 0 * 8 *" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } + + { + options.AutoMergeCronExpr = "*/2 14 1 * *" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } + + { + options.AutoMergeCronExpr = "@hourly" + db, err := Open(options) + assert.Nil(t, err) + destroyDB(db) + } +} + +func TestDB_Auto_Merge(t *testing.T) { + options := DefaultOptions + db, err := Open(options) + assert.Nil(t, err) + defer destroyDB(db) + + for i := 0; i < 2000; i++ { + delKey := utils.GetTestKey(rand.Int()) + err := db.Put(delKey, utils.RandomValue(128)) + assert.Nil(t, err) + err = db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(2*KB)) + assert.Nil(t, err) + err = db.Delete(delKey) + assert.Nil(t, err) + } + + { + reader := db.dataFiles.NewReader() + var keyCnt int + for { + if _, _, err := reader.Next(); errors.Is(err, io.EOF) { + break + } + keyCnt++ + } + // each record has one data wal and commit at end of batch with wal + // so totally is 2000 * 3 * 2 = 12000 + assert.Equal(t, 12000, keyCnt) + } + + mergeDirPath := mergeDirPath(options.DirPath) + if _, err := os.Stat(mergeDirPath); err != nil { + assert.True(t, os.IsNotExist(err)) + } + assert.NoError(t, db.Close()) + + { + options.AutoMergeCronExpr = "* * * * * *" // every second + db, err := Open(options) + assert.Nil(t, err) + { + <-time.After(time.Second * 2) + reader := db.dataFiles.NewReader() + var keyCnt int + for { + if _, _, err := reader.Next(); errors.Is(err, io.EOF) { + break + } + keyCnt++ + } + // after merge records are only valid data, so totally is 2000 + assert.Equal(t, 2000, keyCnt) + } + destroyDB(db) + } +} diff --git a/go.mod b/go.mod index 4a60da5d..c4b03360 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/robfig/cron/v3 v3.0.0 // indirect golang.org/x/sys v0.11.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 52fddef1..83493971 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020 h1:EA8XGCVg1FDM6Dh4MP4sTsmH3gvjhRtp/N+lbnBwtJE= github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= diff --git a/options.go b/options.go index 5109c499..e8be46e3 100644 --- a/options.go +++ b/options.go @@ -31,6 +31,17 @@ type Options struct { // WatchQueueSize the cache length of the watch queue. // if the size greater than 0, which means enable the watch. WatchQueueSize uint64 + + // AutoMergeEnable enable the auto merge. + // auto merge will be triggered when cron expr is satisfied. + // cron expression follows the standard cron expression. + // e.g. "0 0 * * *" means merge at 00:00:00 every day. + // it also supports seconds optionally. + // when enable the second field, the cron expression will be like this: "0/10 * * * * *" (every 10 seconds). + // when auto merge is enabled, the db will be closed and reopened after merge done. + // do not set this shecule too frequently, it will affect the performance. + // refer to https://en.wikipedia.org/wiki/Cron + AutoMergeCronExpr string } // BatchOptions specifies the options for creating a batch. @@ -49,12 +60,13 @@ const ( ) var DefaultOptions = Options{ - DirPath: tempDBDir(), - SegmentSize: 1 * GB, - BlockCache: 0, - Sync: false, - BytesPerSync: 0, - WatchQueueSize: 0, + DirPath: tempDBDir(), + SegmentSize: 1 * GB, + BlockCache: 0, + Sync: false, + BytesPerSync: 0, + WatchQueueSize: 0, + AutoMergeCronExpr: "", } var DefaultBatchOptions = BatchOptions{