Skip to content

Commit

Permalink
[CHANGED] PurgeDeletes() will now keep markers that are less than 30m…
Browse files Browse the repository at this point in the history
…in old

There is a breaking change where PurgeDeletes() accepts now a list
of PurgeOpt, not WatchOpt.

We needed from WatchOpt only the context, and as it standed, it was
bad since user could have passed the IncludeHistory() option to
PurgeDeletes(), which would likely "break" the functionality.

Also, when invoking PurgeDeletes(), the delete markers than are older than
a default of 30 minutes will be deleted, however more recent ones will
be kept. The data is always removed, even if a marker is not.

The user can change the 30 minutes threshold using a new purge
option called DeleteMarkersOlderThan(duration). If set to -1,
it restores the old behavior of deleting delete/purge markers,
regardless of their age.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 15, 2022
1 parent 3ead809 commit 4afdb30
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 6 deletions.
69 changes: 64 additions & 5 deletions kv.go
Expand Up @@ -68,7 +68,7 @@ type KeyValue interface {
// Bucket returns the current bucket name.
Bucket() string
// PurgeDeletes will remove all current delete markers.
PurgeDeletes(opts ...WatchOpt) error
PurgeDeletes(opts ...PurgeOpt) error
// Status retrieves the status and configuration of a bucket
Status() (KeyValueStatus, error)
}
Expand Down Expand Up @@ -149,6 +149,34 @@ func MetaOnly() WatchOpt {
})
}

type PurgeOpt interface {
configurePurge(opts *purgeOpts) error
}

type purgeOpts struct {
dmthr time.Duration // Delete markers threshold
ctx context.Context
}

// DeleteMarkersOlderThan indicates that delete or purge markers older than that
// will be deleted as part of PurgeDeletes() operation, otherwise, only the data
// will be removed but markers that are recent will be kept.
// Note that if no option is specified, the default is 30 minutes. You can set
// this option to a negative value to instruct to always remove the markers,
// regardless of their age.
type DeleteMarkersOlderThan time.Duration

func (ttl DeleteMarkersOlderThan) configurePurge(opts *purgeOpts) error {
opts.dmthr = time.Duration(ttl)
return nil
}

// For nats.Context() support.
func (ctx ContextOpt) configurePurge(opts *purgeOpts) error {
opts.ctx = ctx
return nil
}

// KeyValueConfig is for configuring a KeyValue store.
type KeyValueConfig struct {
Bucket string
Expand Down Expand Up @@ -536,15 +564,43 @@ func (kv *kvs) delete(key string, purge bool) error {
return err
}

const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute

// PurgeDeletes will remove all current delete markers.
// This is a maintenance option if there is a larger buildup of delete markers.
func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
watcher, err := kv.WatchAll(opts...)
// See DeleteMarkersOlderThan() option for more information.
func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error {
var o purgeOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configurePurge(&o); err != nil {
return err
}
}
}
// Transfer possible context purge option to the watcher. This is the
// only option that matters for the PurgeDeletes() feature.
var wopts []WatchOpt
if o.ctx != nil {
wopts = append(wopts, Context(o.ctx))
}
watcher, err := kv.WatchAll(wopts...)
if err != nil {
return err
}
defer watcher.Stop()

var limit time.Time
olderThan := o.dmthr
// Negative value is used to instruct to always remove markers, regardless
// of age. If set to 0 (or not set), use our default value.
if olderThan == 0 {
olderThan = kvDefaultPurgeDeletesMarkerThreshold
}
if olderThan > 0 {
limit = time.Now().Add(-olderThan)
}

var deleteMarkers []KeyValueEntry
for entry := range watcher.Updates() {
if entry == nil {
Expand All @@ -564,8 +620,11 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
b.WriteString(kv.pre)
b.WriteString(entry.Key())
pr.Subject = b.String()
err := kv.js.purgeStream(kv.stream, &pr)
if err != nil {
pr.Keep = 0
if olderThan > 0 && entry.Created().After(limit) {
pr.Keep = 1
}
if err := kv.js.purgeStream(kv.stream, &pr); err != nil {
return err
}
b.Reset()
Expand Down
54 changes: 53 additions & 1 deletion test/kv_test.go
Expand Up @@ -420,14 +420,66 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
}

// Now cleanup.
err = kv.PurgeDeletes()
err = kv.PurgeDeletes(nats.DeleteMarkersOlderThan(-1))
expectOk(t, err)

si, err := js.StreamInfo("KV_KVS")
expectOk(t, err)
if si.State.Msgs != 0 {
t.Fatalf("Expected no stream msgs to be left, got %d", si.State.Msgs)
}

// Try with context
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err = kv.PurgeDeletes(nats.Context(ctx))
expectOk(t, err)
}

func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

put("foo", "foo1")
put("bar", "bar1")
put("foo", "foo2")
err = kv.Delete("foo")
expectOk(t, err)

time.Sleep(200 * time.Millisecond)

err = kv.Delete("bar")
expectOk(t, err)

err = kv.PurgeDeletes(nats.DeleteMarkersOlderThan(100 * time.Millisecond))
expectOk(t, err)

// The key foo should have been completely cleared of the data
// and the delete marker.
fooEntries, err := kv.History("foo")
if err != nats.ErrKeyNotFound {
t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries)
}
barEntries, err := kv.History("bar")
expectOk(t, err)
if len(barEntries) != 1 {
t.Fatalf("Expected 1 entry, got %v", barEntries)
}
if e := barEntries[0]; e.Operation() != nats.KeyValueDelete {
t.Fatalf("Unexpected entry: %+v", e)
}
}

func TestKeyValueKeys(t *testing.T) {
Expand Down

0 comments on commit 4afdb30

Please sign in to comment.