Skip to content

Commit

Permalink
[ADDED] KV: ability to delete purges older than a certain time
Browse files Browse the repository at this point in the history
Did not feel like this would be an option of WatchOpt so I had
to add a new function, but could be convinced otherwise.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 11, 2022
1 parent 3ead809 commit 8accd34
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
26 changes: 25 additions & 1 deletion kv.go
Expand Up @@ -69,6 +69,9 @@ type KeyValue interface {
Bucket() string
// PurgeDeletes will remove all current delete markers.
PurgeDeletes(opts ...WatchOpt) error
// PurgeDeletesOlderThan will remove delete markers that have been present
// for longer than the `olderThan` duration.
PurgeDeletesOlderThan(olderThan time.Duration, opts ...WatchOpt) error
// Status retrieves the status and configuration of a bucket
Status() (KeyValueStatus, error)
}
Expand Down Expand Up @@ -539,19 +542,40 @@ func (kv *kvs) delete(key string, purge bool) error {
// PurgeDeletes will remove all current delete markers.
// This is a maintenance option if there is a larger buildup of delete markers.
func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
return kv.purgeDeletes(0, opts...)
}

// PurgeDeletes will remove all delete markers that have been in the key
// more more than the given `olderThan`.
// If `olderThan` is negative or 0, all delete markers are removed, similar
// to `PurgeDeletes()`.
// This is a maintenance option if there is a larger buildup of delete markers.
func (kv *kvs) PurgeDeletesOlderThan(olderThan time.Duration, opts ...WatchOpt) error {
return kv.purgeDeletes(olderThan, opts...)
}

func (kv *kvs) purgeDeletes(olderThan time.Duration, opts ...WatchOpt) error {
watcher, err := kv.WatchAll(opts...)
if err != nil {
return err
}
defer watcher.Stop()

var limit time.Time
if olderThan > 0 {
limit = time.Now().Add(-olderThan)
}

var deleteMarkers []KeyValueEntry
for entry := range watcher.Updates() {
if entry == nil {
break
}
if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge {
deleteMarkers = append(deleteMarkers, entry)
// Add entries if there is no time limit or if the entry is older than the limit.
if limit.IsZero() || entry.Created().Before(limit) {
deleteMarkers = append(deleteMarkers, entry)
}
}
}

Expand Down
51 changes: 51 additions & 0 deletions test/kv_test.go
Expand Up @@ -430,6 +430,57 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
}
}

func TestKeyValueDeleteTombstonesOlderThan(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

put("foo", "foo1")
put("foo", "foo2")
err = kv.Delete("foo")
expectOk(t, err)

time.Sleep(100 * time.Millisecond)

put("bar", "bar1")
err = kv.Delete("bar")
expectOk(t, err)

err = kv.PurgeDeletesOlderThan(50 * time.Millisecond)
expectOk(t, err)

si, err := js.StreamInfo("KV_KVS")
expectOk(t, err)
// There should be the bar key left, with its delete marker,
// so that's 2 messages.
if si.State.Msgs != 2 {
t.Fatalf("Expected 2 stream msgs to be left, got %d", si.State.Msgs)
}
barEntries, err := kv.History("bar")
expectOk(t, err)
if len(barEntries) != 2 {
t.Fatalf("Expected 2 entries, got %v", barEntries)
}
for i, e := range barEntries {
if i == 0 && e.Operation() != nats.KeyValuePut && string(e.Value()) != "bar1" {
t.Fatalf("Unexpected first entry: %+v", e)
} else if i == 1 && e.Operation() != nats.KeyValueDelete {
t.Fatalf("Unexpected second entry: %+v", e)
}
}
}

func TestKeyValueKeys(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit 8accd34

Please sign in to comment.