Skip to content

Commit

Permalink
[Feature] add auto merge background task (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
LindaSummer committed Jan 4, 2024
1 parent 431de72 commit f31d45e
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 7 deletions.
28 changes: 27 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/bwmarrin/snowflake"
"github.com/gofrs/flock"
"github.com/robfig/cron/v3"
"github.com/rosedblabs/rosedb/v2/index"
"github.com/rosedblabs/rosedb/v2/utils"
"github.com/rosedblabs/wal"
Expand Down Expand Up @@ -52,7 +53,8 @@ type DB struct {
encodeHeader []byte
watchCh chan *Event // user consume channel for watch events
watcher *Watcher
expiredCursorKey []byte // the location to which DeleteExpiredKeys executes.
expiredCursorKey []byte // the location to which DeleteExpiredKeys executes.
cronScheduler *cron.Cron // cron scheduler for auto merge task
}

// Stat represents the statistics of the database.
Expand Down Expand Up @@ -127,6 +129,17 @@ func Open(options Options) (*DB, error) {
go db.watcher.sendEvent(db.watchCh)
}

if len(options.AutoMergeCronExpr) > 0 {
db.cronScheduler = cron.New(cron.WithParser(
cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)))
db.cronScheduler.AddFunc(options.AutoMergeCronExpr, func() {
// maybe we should deal with different errors with different logic, but a background task can't omit its error.
// after auto merge, we should close and reopen the db.
_ = db.Merge(true)
})
db.cronScheduler.Start()
}

return db, nil
}

Expand Down Expand Up @@ -179,6 +192,11 @@ func (db *DB) Close() error {
close(db.watchCh)
}

// close auto merge cron scheduler
if db.cronScheduler != nil {
db.cronScheduler.Stop()
}

db.closed = true
return nil
}
Expand Down Expand Up @@ -552,6 +570,14 @@ func checkOptions(options Options) error {
if options.SegmentSize <= 0 {
return errors.New("database data file size must be greater than 0")
}

if len(options.AutoMergeCronExpr) > 0 {
if _, err := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor).
Parse(options.AutoMergeCronExpr); err != nil {
return fmt.Errorf("databse auto merge cron expression is invalid, err: %s", err)
}
}

return nil
}

Expand Down
105 changes: 105 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package rosedb

import (
"errors"
"io"
"math/rand"
"os"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -704,3 +707,105 @@ func TestDB_Persist(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, val2)
}

func TestDB_Invalid_Cron_Expression(t *testing.T) {
options := DefaultOptions
options.AutoMergeCronExpr = "*/1 * * * * * *"
_, err := Open(options)
assert.NotNil(t, err)
}

func TestDB_Valid_Cron_Expression(t *testing.T) {
options := DefaultOptions
{
options.AutoMergeCronExpr = "* */1 * * * *"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}

{
options.AutoMergeCronExpr = "*/1 * * * *"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}

{
options.AutoMergeCronExpr = "5 0 * 8 *"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}

{
options.AutoMergeCronExpr = "*/2 14 1 * *"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}

{
options.AutoMergeCronExpr = "@hourly"
db, err := Open(options)
assert.Nil(t, err)
destroyDB(db)
}
}

func TestDB_Auto_Merge(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)

for i := 0; i < 2000; i++ {
delKey := utils.GetTestKey(rand.Int())
err := db.Put(delKey, utils.RandomValue(128))
assert.Nil(t, err)
err = db.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(2*KB))
assert.Nil(t, err)
err = db.Delete(delKey)
assert.Nil(t, err)
}

{
reader := db.dataFiles.NewReader()
var keyCnt int
for {
if _, _, err := reader.Next(); errors.Is(err, io.EOF) {
break
}
keyCnt++
}
// each record has one data wal and commit at end of batch with wal
// so totally is 2000 * 3 * 2 = 12000
assert.Equal(t, 12000, keyCnt)
}

mergeDirPath := mergeDirPath(options.DirPath)
if _, err := os.Stat(mergeDirPath); err != nil {
assert.True(t, os.IsNotExist(err))
}
assert.NoError(t, db.Close())

{
options.AutoMergeCronExpr = "* * * * * *" // every second
db, err := Open(options)
assert.Nil(t, err)
{
<-time.After(time.Second * 2)
reader := db.dataFiles.NewReader()
var keyCnt int
for {
if _, _, err := reader.Next(); errors.Is(err, io.EOF) {
break
}
keyCnt++
}
// after merge records are only valid data, so totally is 2000
assert.Equal(t, 2000, keyCnt)
}
destroyDB(db)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.0 // indirect
golang.org/x/sys v0.11.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020 h1:EA8XGCVg1FDM6Dh4MP4sTsmH3gvjhRtp/N+lbnBwtJE=
github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand Down
24 changes: 18 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ type Options struct {
// WatchQueueSize the cache length of the watch queue.
// if the size greater than 0, which means enable the watch.
WatchQueueSize uint64

// AutoMergeEnable enable the auto merge.
// auto merge will be triggered when cron expr is satisfied.
// cron expression follows the standard cron expression.
// e.g. "0 0 * * *" means merge at 00:00:00 every day.
// it also supports seconds optionally.
// when enable the second field, the cron expression will be like this: "0/10 * * * * *" (every 10 seconds).
// when auto merge is enabled, the db will be closed and reopened after merge done.
// do not set this shecule too frequently, it will affect the performance.
// refer to https://en.wikipedia.org/wiki/Cron
AutoMergeCronExpr string
}

// BatchOptions specifies the options for creating a batch.
Expand All @@ -49,12 +60,13 @@ const (
)

var DefaultOptions = Options{
DirPath: tempDBDir(),
SegmentSize: 1 * GB,
BlockCache: 0,
Sync: false,
BytesPerSync: 0,
WatchQueueSize: 0,
DirPath: tempDBDir(),
SegmentSize: 1 * GB,
BlockCache: 0,
Sync: false,
BytesPerSync: 0,
WatchQueueSize: 0,
AutoMergeCronExpr: "",
}

var DefaultBatchOptions = BatchOptions{
Expand Down

0 comments on commit f31d45e

Please sign in to comment.