Skip to content

Commit

Permalink
kv: Conditional delete and purge
Browse files Browse the repository at this point in the history
  • Loading branch information
steveh committed Nov 4, 2021
1 parent 5c4ad16 commit 163ad87
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 18 deletions.
76 changes: 59 additions & 17 deletions kv.go
Expand Up @@ -45,14 +45,18 @@ type KeyValue interface {
Put(key string, value []byte) (revision uint64, err error)
// PutString will place the string for the key into the store.
PutString(key string, value string) (revision uint64, err error)
// Create will add the key/value pair iff it does not exist.
// Create will add the key/value pair if it does not exist.
Create(key string, value []byte) (revision uint64, err error)
// Update will update the value iff the latest revision matches.
// Update will update the value if the latest revision matches.
Update(key string, value []byte, last uint64) (revision uint64, err error)
// Delete will place a delete marker and leave all revisions.
Delete(key string) error
// DeleteCond will place a delete marker and leave all revision if the latest revision matches.
DeleteCond(key string, last uint64) error
// Purge will place a delete marker and remove all previous revisions.
Purge(key string) error
// PurgeCond will place a delete marker and remove all previous revisions if the latest revision matches.
PurgeCond(key string, last uint64) error
// Watch for any updates to keys that match the keys argument which could include wildcards.
// Watch will send a nil entry when it has received all initial values.
Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
Expand Down Expand Up @@ -437,7 +441,7 @@ func (kv *kvs) PutString(key string, value string) (revision uint64, err error)
return kv.Put(key, []byte(value))
}

// Create will add the key/value pair iff it does not exist.
// Create will add the key/value pair if it does not exist.
func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
v, err := kv.Update(key, value, 0)
if err == nil {
Expand All @@ -453,7 +457,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
return 0, err
}

// Update will update the value iff the latest revision matches.
// Update will update the value if the latest revision matches.
func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) {
if !keyValid(key) {
return 0, ErrInvalidKey
Expand All @@ -475,17 +479,63 @@ func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error)

// Delete will place a delete marker and leave all revisions.
func (kv *kvs) Delete(key string) error {
return kv.delete(key, false)
m, err := kv.buildDelete(key)
if err != nil {
return err
}

m.Header.Set(kvop, kvdel)

_, err = kv.js.PublishMsg(m)
return err
}

// DeleteCond will place a delete marker and leave all revisions if the latest revision matches.
func (kv *kvs) DeleteCond(key string, revision uint64) error {
m, err := kv.buildDelete(key)
if err != nil {
return err
}

m.Header.Set(kvop, kvdel)
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10))

_, err = kv.js.PublishMsg(m)
return err
}

// Purge will remove the key and all revisions.
func (kv *kvs) Purge(key string) error {
return kv.delete(key, true)
m, err := kv.buildDelete(key)
if err != nil {
return err
}

m.Header.Set(kvop, kvpurge)
m.Header.Set(MsgRollup, MsgRollupSubject)

_, err = kv.js.PublishMsg(m)
return err
}

func (kv *kvs) delete(key string, purge bool) error {
// PurgeCond will remove the key and all revisions if the latest revision matches.
func (kv *kvs) PurgeCond(key string, revision uint64) error {
m, err := kv.buildDelete(key)
if err != nil {
return err
}

m.Header.Set(kvop, kvpurge)
m.Header.Set(MsgRollup, MsgRollupSubject)
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10))

_, err = kv.js.PublishMsg(m)
return err
}

func (kv *kvs) buildDelete(key string) (*Msg, error) {
if !keyValid(key) {
return ErrInvalidKey
return nil, ErrInvalidKey
}

var b strings.Builder
Expand All @@ -494,15 +544,7 @@ func (kv *kvs) delete(key string, purge bool) error {

// DEL op marker. For watch functionality.
m := NewMsg(b.String())

if purge {
m.Header.Set(kvop, kvpurge)
m.Header.Set(MsgRollup, MsgRollupSubject)
} else {
m.Header.Set(kvop, kvdel)
}
_, err := kv.js.PublishMsg(m)
return err
return m, nil
}

// PurgeDeletes will remove all current delete markers.
Expand Down
6 changes: 5 additions & 1 deletion test/kv_test.go
Expand Up @@ -66,9 +66,13 @@ func TestKeyValueBasics(t *testing.T) {
if r != 3 {
t.Fatalf("Expected 3 for the revision, got %d", r)
}
err = kv.DeleteCond("name", 4)
expectErr(t, err)
err = kv.DeleteCond("name", 3)
expectOk(t, err)

// Conditional Updates.
r, err = kv.Update("name", []byte("rip"), 3)
r, err = kv.Update("name", []byte("rip"), 4)
expectOk(t, err)
_, err = kv.Update("name", []byte("ik"), 3)
expectErr(t, err)
Expand Down

0 comments on commit 163ad87

Please sign in to comment.