Skip to content

Commit

Permalink
add Persist to remove the ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Sep 16, 2023
1 parent 93ddbb4 commit 3fa26af
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 1 deletion.
51 changes: 51 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,57 @@ func (b *Batch) TTL(key []byte) (time.Duration, error) {
return -1, nil
}

// Persist removes the ttl of the key.
func (b *Batch) Persist(key []byte) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
if b.db.closed {
return ErrDBClosed
}
if b.options.ReadOnly {
return ErrReadOnlyBatch
}

b.mu.Lock()
defer b.mu.Unlock()

// if the key exists in pendingWrites, update the expiry time directly
pendingRecord := b.pendingWrites[string(key)]
if pendingRecord != nil && pendingRecord.Type != LogRecordDeleted {
pendingRecord.Expire = 0
} else {
// check if the key exists in index
position := b.db.index.Get(key)
if position == nil {
return ErrKeyNotFound
}
chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}

record := decodeLogRecord(chunk)
now := time.Now().UnixNano()
// check if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now) {
b.db.index.Delete(record.Key)
return ErrKeyNotFound
}
// if the expiration time is 0, it means that the key has no expiration time,
// so we can return directly
if record.Expire == 0 {
return nil
}

// set the expiration time to 0, and rewrite the record to wal
record.Expire = 0
b.pendingWrites[string(key)] = record
}

return nil
}

// Commit commits the batch, if the batch is readonly or empty, it will return directly.
//
// It will iterate the pendingWrites and write the data to the database,
Expand Down
21 changes: 20 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (db *DB) Expire(key []byte, ttl time.Duration) error {
batch.reset()
db.batchPool.Put(batch)
}()
// This is a single put operation, we can set Sync to false.
// This is a single expire operation, we can set Sync to false.
// Because the data will be written to the WAL,
// and the WAL file will be synced to disk according to the DB options.
batch.init(false, false, db).withPendingWrites()
Expand All @@ -336,6 +336,25 @@ func (db *DB) TTL(key []byte) (time.Duration, error) {
return batch.TTL(key)
}

// Persist removes the ttl of the key.
// If the key does not exist or expired, it will return ErrKeyNotFound.
func (db *DB) Persist(key []byte) error {
batch := db.batchPool.Get().(*Batch)
defer func() {
batch.reset()
db.batchPool.Put(batch)
}()
// This is a single persist operation, we can set Sync to false.
// Because the data will be written to the WAL,
// and the WAL file will be synced to disk according to the DB options.
batch.init(false, false, db).withPendingWrites()
if err := batch.Persist(key); err != nil {
_ = batch.Rollback()
return err
}
return batch.Commit()
}

func (db *DB) Watch() (chan *Event, error) {
if db.options.WatchQueueSize <= 0 {
return nil, ErrWatchDisabled
Expand Down
43 changes: 43 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,46 @@ func TestDB_Multi_DeleteExpiredKeys(t *testing.T) {
assert.Equal(t, 10000, db.Stat().KeysNum)
}
}

func TestDB_Persist(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)

// not exist
err = db.Persist(utils.GetTestKey(1))
assert.Equal(t, err, ErrKeyNotFound)

err = db.PutWithTTL(utils.GetTestKey(1), utils.RandomValue(10), time.Second*1)
assert.Nil(t, err)

// exist
err = db.Persist(utils.GetTestKey(1))
assert.Nil(t, err)
time.Sleep(time.Second * 2)
// check ttl
ttl, err := db.TTL(utils.GetTestKey(1))
assert.Nil(t, err)
assert.Equal(t, ttl, time.Duration(-1))
val1, err := db.Get(utils.GetTestKey(1))
assert.Nil(t, err)
assert.NotNil(t, val1)

// restart
err = db.Close()
assert.Nil(t, err)

db2, err := Open(options)
assert.Nil(t, err)
defer func() {
_ = db2.Close()
}()

ttl2, err := db2.TTL(utils.GetTestKey(1))
assert.Nil(t, err)
assert.Equal(t, ttl2, time.Duration(-1))
val2, err := db2.Get(utils.GetTestKey(1))
assert.Nil(t, err)
assert.NotNil(t, val2)
}

0 comments on commit 3fa26af

Please sign in to comment.