diff --git a/kv.go b/kv.go index b9ada37f1..b5239f7ac 100644 --- a/kv.go +++ b/kv.go @@ -49,18 +49,14 @@ type KeyValue interface { Put(key string, value []byte) (revision uint64, err error) // PutString will place the string for the key into the store. PutString(key string, value string) (revision uint64, err error) - // Create will add the key/value pair if it does not exist. + // Create will add the key/value pair iff it does not exist. Create(key string, value []byte) (revision uint64, err error) - // Update will update the value if the latest revision matches. + // Update will update the value iff the latest revision matches. Update(key string, value []byte, last uint64) (revision uint64, err error) // Delete will place a delete marker and leave all revisions. - Delete(key string) error - // DeleteCond will place a delete marker and leave all revision if the latest revision matches. - DeleteCond(key string, last uint64) error + Delete(key string, opts ...DeleteOpt) error // Purge will place a delete marker and remove all previous revisions. - Purge(key string) error - // PurgeCond will place a delete marker and remove all previous revisions if the latest revision matches. - PurgeCond(key string, last uint64) error + Purge(key string, opts ...DeleteOpt) error // Watch for any updates to keys that match the keys argument which could include wildcards. // Watch will send a nil entry when it has received all initial values. Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) @@ -73,7 +69,7 @@ type KeyValue interface { // Bucket returns the current bucket name. Bucket() string // PurgeDeletes will remove all current delete markers. - PurgeDeletes(opts ...PurgeOpt) error + PurgeDeletes(opts ...DeleteOpt) error // Status retrieves the status and configuration of a bucket Status() (KeyValueStatus, error) } @@ -156,13 +152,21 @@ func MetaOnly() WatchOpt { }) } -type PurgeOpt interface { - configurePurge(opts *purgeOpts) error +type DeleteOpt interface { + configureDelete(opts *deleteOpts) error } -type purgeOpts struct { - dmthr time.Duration // Delete markers threshold - ctx context.Context +type deleteOpts struct { + ctx context.Context + + // Delete markers threshold. + dmthr time.Duration + + // Remove all previous revisions. + purge bool + + // Delete only if the latest revision matches. + revision uint64 } // DeleteMarkersOlderThan indicates that delete or purge markers older than that @@ -173,17 +177,39 @@ type purgeOpts struct { // regardless of their age. type DeleteMarkersOlderThan time.Duration -func (ttl DeleteMarkersOlderThan) configurePurge(opts *purgeOpts) error { +func (ttl DeleteMarkersOlderThan) configureDelete(opts *deleteOpts) error { opts.dmthr = time.Duration(ttl) return nil } // For nats.Context() support. -func (ctx ContextOpt) configurePurge(opts *purgeOpts) error { +func (ctx ContextOpt) configureDelete(opts *deleteOpts) error { opts.ctx = ctx return nil } +type deleteOptFn func(opts *deleteOpts) error + +func (opt deleteOptFn) configureDelete(opts *deleteOpts) error { + return opt(opts) +} + +// LastRevision deletes if the latest revision matches. +func LastRevision(revision uint64) DeleteOpt { + return deleteOptFn(func(opts *deleteOpts) error { + opts.revision = revision + return nil + }) +} + +// Purge removes all previous revisions. +func Purge() DeleteOpt { + return deleteOptFn(func(opts *deleteOpts) error { + opts.purge = true + return nil + }) +} + // KeyValueConfig is for configuring a KeyValue store. type KeyValueConfig struct { Bucket string @@ -555,7 +581,7 @@ func (kv *kvs) PutString(key string, value string) (revision uint64, err error) return kv.Put(key, []byte(value)) } -// Create will add the key/value pair if it does not exist. +// Create will add the key/value pair iff it does not exist. func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { v, err := kv.Update(key, value, 0) if err == nil { @@ -571,7 +597,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { return 0, err } -// Update will update the value if the latest revision matches. +// Update will update the value iff the latest revision matches. func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) { if !keyValid(key) { return 0, ErrInvalidKey @@ -595,76 +621,48 @@ func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) } // Delete will place a delete marker and leave all revisions. -func (kv *kvs) Delete(key string) error { - m, err := kv.buildDelete(key) - if err != nil { - return err +func (kv *kvs) Delete(key string, opts ...DeleteOpt) error { + if !keyValid(key) { + return ErrInvalidKey } - m.Header.Set(kvop, kvdel) - - _, err = kv.js.PublishMsg(m) - return err -} - -// DeleteCond will place a delete marker and leave all revisions if the latest revision matches. -func (kv *kvs) DeleteCond(key string, revision uint64) error { - m, err := kv.buildDelete(key) - if err != nil { - return err + var b strings.Builder + if kv.useJSPfx { + b.WriteString(kv.js.opts.pre) } + b.WriteString(kv.pre) + b.WriteString(key) - m.Header.Set(kvop, kvdel) - m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10)) - - _, err = kv.js.PublishMsg(m) - return err -} + // DEL op marker. For watch functionality. + m := NewMsg(b.String()) -// Purge will remove the key and all revisions. -func (kv *kvs) Purge(key string) error { - m, err := kv.buildDelete(key) - if err != nil { - return err + var o deleteOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureDelete(&o); err != nil { + return err + } + } } - m.Header.Set(kvop, kvpurge) - m.Header.Set(MsgRollup, MsgRollupSubject) - - _, err = kv.js.PublishMsg(m) - return err -} - -// PurgeCond will remove the key and all revisions if the latest revision matches. -func (kv *kvs) PurgeCond(key string, revision uint64) error { - m, err := kv.buildDelete(key) - if err != nil { - return err + if o.purge { + m.Header.Set(kvop, kvpurge) + m.Header.Set(MsgRollup, MsgRollupSubject) + } else { + m.Header.Set(kvop, kvdel) } - m.Header.Set(kvop, kvpurge) - m.Header.Set(MsgRollup, MsgRollupSubject) - m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10)) + if o.revision != 0 { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.revision, 10)) + } - _, err = kv.js.PublishMsg(m) + _, err := kv.js.PublishMsg(m) return err } -func (kv *kvs) buildDelete(key string) (*Msg, error) { - if !keyValid(key) { - return nil, ErrInvalidKey - } - - var b strings.Builder - if kv.useJSPfx { - b.WriteString(kv.js.opts.pre) - } - b.WriteString(kv.pre) - b.WriteString(key) - - // DEL op marker. For watch functionality. - m := NewMsg(b.String()) - return m, nil +// Purge will remove the key and all revisions. +func (kv *kvs) Purge(key string, opts ...DeleteOpt) error { + return kv.Delete(key, append(opts, Purge())...) } const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute @@ -672,11 +670,11 @@ const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute // PurgeDeletes will remove all current delete markers. // This is a maintenance option if there is a larger buildup of delete markers. // See DeleteMarkersOlderThan() option for more information. -func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error { - var o purgeOpts +func (kv *kvs) PurgeDeletes(opts ...DeleteOpt) error { + var o deleteOpts for _, opt := range opts { if opt != nil { - if err := opt.configurePurge(&o); err != nil { + if err := opt.configureDelete(&o); err != nil { return err } } diff --git a/test/kv_test.go b/test/kv_test.go index c054192c3..704fb3b9b 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -67,9 +67,9 @@ func TestKeyValueBasics(t *testing.T) { if r != 3 { t.Fatalf("Expected 3 for the revision, got %d", r) } - err = kv.DeleteCond("name", 4) + err = kv.Delete("name", nats.LastRevision(4)) expectErr(t, err) - err = kv.DeleteCond("name", 3) + err = kv.Delete("name", nats.LastRevision(3)) expectOk(t, err) // Conditional Updates.