From 163ad879f94a137fb566f7bef11c77885bbf4615 Mon Sep 17 00:00:00 2001 From: Steve Hoeksema Date: Fri, 5 Nov 2021 10:02:27 +1300 Subject: [PATCH] kv: Conditional delete and purge --- kv.go | 76 ++++++++++++++++++++++++++++++++++++++----------- test/kv_test.go | 6 +++- 2 files changed, 64 insertions(+), 18 deletions(-) diff --git a/kv.go b/kv.go index deaefde2a..765ab41a2 100644 --- a/kv.go +++ b/kv.go @@ -45,14 +45,18 @@ type KeyValue interface { Put(key string, value []byte) (revision uint64, err error) // PutString will place the string for the key into the store. PutString(key string, value string) (revision uint64, err error) - // Create will add the key/value pair iff it does not exist. + // Create will add the key/value pair if it does not exist. Create(key string, value []byte) (revision uint64, err error) - // Update will update the value iff the latest revision matches. + // Update will update the value if the latest revision matches. Update(key string, value []byte, last uint64) (revision uint64, err error) // Delete will place a delete marker and leave all revisions. Delete(key string) error + // DeleteCond will place a delete marker and leave all revision if the latest revision matches. + DeleteCond(key string, last uint64) error // Purge will place a delete marker and remove all previous revisions. Purge(key string) error + // PurgeCond will place a delete marker and remove all previous revisions if the latest revision matches. + PurgeCond(key string, last uint64) error // Watch for any updates to keys that match the keys argument which could include wildcards. // Watch will send a nil entry when it has received all initial values. Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) @@ -437,7 +441,7 @@ func (kv *kvs) PutString(key string, value string) (revision uint64, err error) return kv.Put(key, []byte(value)) } -// Create will add the key/value pair iff it does not exist. +// Create will add the key/value pair if it does not exist. func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { v, err := kv.Update(key, value, 0) if err == nil { @@ -453,7 +457,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { return 0, err } -// Update will update the value iff the latest revision matches. +// Update will update the value if the latest revision matches. func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) { if !keyValid(key) { return 0, ErrInvalidKey @@ -475,17 +479,63 @@ func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) // Delete will place a delete marker and leave all revisions. func (kv *kvs) Delete(key string) error { - return kv.delete(key, false) + m, err := kv.buildDelete(key) + if err != nil { + return err + } + + m.Header.Set(kvop, kvdel) + + _, err = kv.js.PublishMsg(m) + return err +} + +// DeleteCond will place a delete marker and leave all revisions if the latest revision matches. +func (kv *kvs) DeleteCond(key string, revision uint64) error { + m, err := kv.buildDelete(key) + if err != nil { + return err + } + + m.Header.Set(kvop, kvdel) + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10)) + + _, err = kv.js.PublishMsg(m) + return err } // Purge will remove the key and all revisions. func (kv *kvs) Purge(key string) error { - return kv.delete(key, true) + m, err := kv.buildDelete(key) + if err != nil { + return err + } + + m.Header.Set(kvop, kvpurge) + m.Header.Set(MsgRollup, MsgRollupSubject) + + _, err = kv.js.PublishMsg(m) + return err } -func (kv *kvs) delete(key string, purge bool) error { +// PurgeCond will remove the key and all revisions if the latest revision matches. +func (kv *kvs) PurgeCond(key string, revision uint64) error { + m, err := kv.buildDelete(key) + if err != nil { + return err + } + + m.Header.Set(kvop, kvpurge) + m.Header.Set(MsgRollup, MsgRollupSubject) + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10)) + + _, err = kv.js.PublishMsg(m) + return err +} + +func (kv *kvs) buildDelete(key string) (*Msg, error) { if !keyValid(key) { - return ErrInvalidKey + return nil, ErrInvalidKey } var b strings.Builder @@ -494,15 +544,7 @@ func (kv *kvs) delete(key string, purge bool) error { // DEL op marker. For watch functionality. m := NewMsg(b.String()) - - if purge { - m.Header.Set(kvop, kvpurge) - m.Header.Set(MsgRollup, MsgRollupSubject) - } else { - m.Header.Set(kvop, kvdel) - } - _, err := kv.js.PublishMsg(m) - return err + return m, nil } // PurgeDeletes will remove all current delete markers. diff --git a/test/kv_test.go b/test/kv_test.go index 6c8602603..02c62ced7 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -66,9 +66,13 @@ func TestKeyValueBasics(t *testing.T) { if r != 3 { t.Fatalf("Expected 3 for the revision, got %d", r) } + err = kv.DeleteCond("name", 4) + expectErr(t, err) + err = kv.DeleteCond("name", 3) + expectOk(t, err) // Conditional Updates. - r, err = kv.Update("name", []byte("rip"), 3) + r, err = kv.Update("name", []byte("rip"), 4) expectOk(t, err) _, err = kv.Update("name", []byte("ik"), 3) expectErr(t, err)