From 4afdb30a3bb009628fb480c70fa313f0bf6e7b8e Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 10 Feb 2022 18:06:44 -0700 Subject: [PATCH] [CHANGED] PurgeDeletes() will now keep markers that are less than 30min old There is a breaking change where PurgeDeletes() accepts now a list of PurgeOpt, not WatchOpt. We needed from WatchOpt only the context, and as it standed, it was bad since user could have passed the IncludeHistory() option to PurgeDeletes(), which would likely "break" the functionality. Also, when invoking PurgeDeletes(), the delete markers than are older than a default of 30 minutes will be deleted, however more recent ones will be kept. The data is always removed, even if a marker is not. The user can change the 30 minutes threshold using a new purge option called DeleteMarkersOlderThan(duration). If set to -1, it restores the old behavior of deleting delete/purge markers, regardless of their age. Signed-off-by: Ivan Kozlovic --- kv.go | 69 +++++++++++++++++++++++++++++++++++++++++++++---- test/kv_test.go | 54 +++++++++++++++++++++++++++++++++++++- 2 files changed, 117 insertions(+), 6 deletions(-) diff --git a/kv.go b/kv.go index cdbb4e302..ce8542db1 100644 --- a/kv.go +++ b/kv.go @@ -68,7 +68,7 @@ type KeyValue interface { // Bucket returns the current bucket name. Bucket() string // PurgeDeletes will remove all current delete markers. - PurgeDeletes(opts ...WatchOpt) error + PurgeDeletes(opts ...PurgeOpt) error // Status retrieves the status and configuration of a bucket Status() (KeyValueStatus, error) } @@ -149,6 +149,34 @@ func MetaOnly() WatchOpt { }) } +type PurgeOpt interface { + configurePurge(opts *purgeOpts) error +} + +type purgeOpts struct { + dmthr time.Duration // Delete markers threshold + ctx context.Context +} + +// DeleteMarkersOlderThan indicates that delete or purge markers older than that +// will be deleted as part of PurgeDeletes() operation, otherwise, only the data +// will be removed but markers that are recent will be kept. +// Note that if no option is specified, the default is 30 minutes. You can set +// this option to a negative value to instruct to always remove the markers, +// regardless of their age. +type DeleteMarkersOlderThan time.Duration + +func (ttl DeleteMarkersOlderThan) configurePurge(opts *purgeOpts) error { + opts.dmthr = time.Duration(ttl) + return nil +} + +// For nats.Context() support. +func (ctx ContextOpt) configurePurge(opts *purgeOpts) error { + opts.ctx = ctx + return nil +} + // KeyValueConfig is for configuring a KeyValue store. type KeyValueConfig struct { Bucket string @@ -536,15 +564,43 @@ func (kv *kvs) delete(key string, purge bool) error { return err } +const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute + // PurgeDeletes will remove all current delete markers. // This is a maintenance option if there is a larger buildup of delete markers. -func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { - watcher, err := kv.WatchAll(opts...) +// See DeleteMarkersOlderThan() option for more information. +func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error { + var o purgeOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configurePurge(&o); err != nil { + return err + } + } + } + // Transfer possible context purge option to the watcher. This is the + // only option that matters for the PurgeDeletes() feature. + var wopts []WatchOpt + if o.ctx != nil { + wopts = append(wopts, Context(o.ctx)) + } + watcher, err := kv.WatchAll(wopts...) if err != nil { return err } defer watcher.Stop() + var limit time.Time + olderThan := o.dmthr + // Negative value is used to instruct to always remove markers, regardless + // of age. If set to 0 (or not set), use our default value. + if olderThan == 0 { + olderThan = kvDefaultPurgeDeletesMarkerThreshold + } + if olderThan > 0 { + limit = time.Now().Add(-olderThan) + } + var deleteMarkers []KeyValueEntry for entry := range watcher.Updates() { if entry == nil { @@ -564,8 +620,11 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { b.WriteString(kv.pre) b.WriteString(entry.Key()) pr.Subject = b.String() - err := kv.js.purgeStream(kv.stream, &pr) - if err != nil { + pr.Keep = 0 + if olderThan > 0 && entry.Created().After(limit) { + pr.Keep = 1 + } + if err := kv.js.purgeStream(kv.stream, &pr); err != nil { return err } b.Reset() diff --git a/test/kv_test.go b/test/kv_test.go index 69c7da10d..5682cc04b 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -420,7 +420,7 @@ func TestKeyValueDeleteTombstones(t *testing.T) { } // Now cleanup. - err = kv.PurgeDeletes() + err = kv.PurgeDeletes(nats.DeleteMarkersOlderThan(-1)) expectOk(t, err) si, err := js.StreamInfo("KV_KVS") @@ -428,6 +428,58 @@ func TestKeyValueDeleteTombstones(t *testing.T) { if si.State.Msgs != 0 { t.Fatalf("Expected no stream msgs to be left, got %d", si.State.Msgs) } + + // Try with context + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err = kv.PurgeDeletes(nats.Context(ctx)) + expectOk(t, err) +} + +func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(key, []byte(value)) + expectOk(t, err) + } + + put("foo", "foo1") + put("bar", "bar1") + put("foo", "foo2") + err = kv.Delete("foo") + expectOk(t, err) + + time.Sleep(200 * time.Millisecond) + + err = kv.Delete("bar") + expectOk(t, err) + + err = kv.PurgeDeletes(nats.DeleteMarkersOlderThan(100 * time.Millisecond)) + expectOk(t, err) + + // The key foo should have been completely cleared of the data + // and the delete marker. + fooEntries, err := kv.History("foo") + if err != nats.ErrKeyNotFound { + t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries) + } + barEntries, err := kv.History("bar") + expectOk(t, err) + if len(barEntries) != 1 { + t.Fatalf("Expected 1 entry, got %v", barEntries) + } + if e := barEntries[0]; e.Operation() != nats.KeyValueDelete { + t.Fatalf("Unexpected entry: %+v", e) + } } func TestKeyValueKeys(t *testing.T) {