diff --git a/kv.go b/kv.go index 5ba116027..d4171b55f 100644 --- a/kv.go +++ b/kv.go @@ -54,9 +54,9 @@ type KeyValue interface { // Update will update the value iff the latest revision matches. Update(key string, value []byte, last uint64) (revision uint64, err error) // Delete will place a delete marker and leave all revisions. - Delete(key string) error + Delete(key string, opts ...DeleteOpt) error // Purge will place a delete marker and remove all previous revisions. - Purge(key string) error + Purge(key string, opts ...DeleteOpt) error // Watch for any updates to keys that match the keys argument which could include wildcards. // Watch will send a nil entry when it has received all initial values. Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) @@ -180,6 +180,40 @@ func (ctx ContextOpt) configurePurge(opts *purgeOpts) error { return nil } +type DeleteOpt interface { + configureDelete(opts *deleteOpts) error +} + +type deleteOpts struct { + // Remove all previous revisions. + purge bool + + // Delete only if the latest revision matches. + revision uint64 +} + +type deleteOptFn func(opts *deleteOpts) error + +func (opt deleteOptFn) configureDelete(opts *deleteOpts) error { + return opt(opts) +} + +// LastRevision deletes if the latest revision matches. +func LastRevision(revision uint64) DeleteOpt { + return deleteOptFn(func(opts *deleteOpts) error { + opts.revision = revision + return nil + }) +} + +// purge removes all previous revisions. +func purge() DeleteOpt { + return deleteOptFn(func(opts *deleteOpts) error { + opts.purge = true + return nil + }) +} + // KeyValueConfig is for configuring a KeyValue store. type KeyValueConfig struct { Bucket string @@ -591,16 +625,7 @@ func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) } // Delete will place a delete marker and leave all revisions. -func (kv *kvs) Delete(key string) error { - return kv.delete(key, false) -} - -// Purge will remove the key and all revisions. -func (kv *kvs) Purge(key string) error { - return kv.delete(key, true) -} - -func (kv *kvs) delete(key string, purge bool) error { +func (kv *kvs) Delete(key string, opts ...DeleteOpt) error { if !keyValid(key) { return ErrInvalidKey } @@ -615,16 +640,35 @@ func (kv *kvs) delete(key string, purge bool) error { // DEL op marker. For watch functionality. m := NewMsg(b.String()) - if purge { + var o deleteOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureDelete(&o); err != nil { + return err + } + } + } + + if o.purge { m.Header.Set(kvop, kvpurge) m.Header.Set(MsgRollup, MsgRollupSubject) } else { m.Header.Set(kvop, kvdel) } + + if o.revision != 0 { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.revision, 10)) + } + _, err := kv.js.PublishMsg(m) return err } +// Purge will remove the key and all revisions. +func (kv *kvs) Purge(key string, opts ...DeleteOpt) error { + return kv.Delete(key, append(opts, purge())...) +} + const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute // PurgeDeletes will remove all current delete markers. diff --git a/test/kv_test.go b/test/kv_test.go index e6dd2c6f3..4ece98562 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -67,9 +67,13 @@ func TestKeyValueBasics(t *testing.T) { if r != 3 { t.Fatalf("Expected 3 for the revision, got %d", r) } + err = kv.Delete("name", nats.LastRevision(4)) + expectErr(t, err) + err = kv.Delete("name", nats.LastRevision(3)) + expectOk(t, err) // Conditional Updates. - r, err = kv.Update("name", []byte("rip"), 3) + r, err = kv.Update("name", []byte("rip"), 4) expectOk(t, err) _, err = kv.Update("name", []byte("ik"), 3) expectErr(t, err) @@ -374,7 +378,9 @@ func TestKeyValueDeleteVsPurge(t *testing.T) { if len(entries) != 4 { t.Fatalf("Expected 4 entries for age after delete, got %d", len(entries)) } - err = kv.Purge("name") + err = kv.Purge("name", nats.LastRevision(4)) + expectErr(t, err) + err = kv.Purge("name", nats.LastRevision(5)) expectOk(t, err) // Check marker e, err := kv.Get("name")