Skip to content

Commit

Permalink
use wal write batch
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Sep 19, 2023
1 parent 3c078d9 commit 9d9ec86
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
34 changes: 25 additions & 9 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/bwmarrin/snowflake"
"github.com/rosedblabs/wal"
)

// Batch is a batch operations of the database.
Expand Down Expand Up @@ -437,29 +436,41 @@ func (b *Batch) Commit() error {
return ErrBatchRollbacked
}

batchId := b.batchId.Generate()
positions := make(map[string]*wal.ChunkPosition)
var (
batchId = b.batchId.Generate()
posIndex = make(map[string]int)
idx = 0
)

now := time.Now().UnixNano()
// write to wal
// write to wal buffer
for _, record := range b.pendingWrites {
record.BatchId = uint64(batchId)
encRecord := encodeLogRecord(record)
pos, err := b.db.dataFiles.Write(encRecord)
if err != nil {
if err := b.db.dataFiles.PendingWrites(encRecord); err != nil {
return err
}
positions[string(record.Key)] = pos
posIndex[string(record.Key)] = idx
idx++
}

// write a record to indicate the end of the batch
endRecord := encodeLogRecord(&LogRecord{
Key: batchId.Bytes(),
Type: LogRecordBatchFinished,
})
if _, err := b.db.dataFiles.Write(endRecord); err != nil {
if err := b.db.dataFiles.PendingWrites(endRecord); err != nil {
return err
}
// write to wal file
chunkPositions, err := b.db.dataFiles.WriteAll()
if err != nil {
b.db.dataFiles.ClearPendingWrites()
return err
}
if len(chunkPositions) != len(b.pendingWrites)+1 {
panic("chunk positions length is not equal to pending writes length")
}

// flush wal if necessary
if b.options.Sync && !b.db.options.Sync {
Expand All @@ -470,10 +481,15 @@ func (b *Batch) Commit() error {

// write to index
for key, record := range b.pendingWrites {
i, ok := posIndex[key]
if !ok || chunkPositions[i] == nil {
panic("position index not found")
}

if record.Type == LogRecordDeleted || record.IsExpired(now) {
b.db.index.Delete(record.Key)
} else {
b.db.index.Put(record.Key, positions[key])
b.db.index.Put(record.Key, chunkPositions[i])
}

if b.db.options.WatchQueueSize > 0 {
Expand Down
18 changes: 17 additions & 1 deletion batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,23 @@ func TestBatch_Put_Normal(t *testing.T) {
}

func TestBatch_Put_IncrSegmentFile(t *testing.T) {
batchPutAndIterate(t, 64*MB, 5000, 32*KB)
batchPutAndIterate(t, 64*MB, 2000, 32*KB)
options := DefaultOptions
options.SegmentSize = 64 * MB
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)

generateData(t, db, 1, 2000, 32*KB)

// write more data to rotate new segment file
batch := db.NewBatch(DefaultBatchOptions)
for i := 0; i < 1000; i++ {
err := batch.Put(utils.GetTestKey(i*100), utils.RandomValue(32*KB))
assert.Nil(t, err)
}
err = batch.Commit()
assert.Nil(t, err)
}

func TestBatch_Get_Normal(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/google/btree v1.1.2
github.com/rosedblabs/wal v1.3.3
github.com/rosedblabs/wal v1.3.5
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rosedblabs/wal v1.3.3 h1:HBZdmvSpgsuw90IQLY80W0Ht+fNmtwJ83hrSAIxV0d4=
github.com/rosedblabs/wal v1.3.3/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM=
github.com/rosedblabs/wal v1.3.5 h1:ISjHEqe2ukTNAOSfd6bW2Aj9Y4gDOQFxwbh18Jpd/S0=
github.com/rosedblabs/wal v1.3.5/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
Expand Down

0 comments on commit 9d9ec86

Please sign in to comment.