Skip to content

Commit

Permalink
kv: Conditional delete and purge
Browse files Browse the repository at this point in the history
  • Loading branch information
steveh committed Apr 4, 2022
1 parent e076b0d commit 95a94e1
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 15 deletions.
70 changes: 57 additions & 13 deletions kv.go
Expand Up @@ -54,9 +54,9 @@ type KeyValue interface {
// Update will update the value iff the latest revision matches.
Update(key string, value []byte, last uint64) (revision uint64, err error)
// Delete will place a delete marker and leave all revisions.
Delete(key string) error
Delete(key string, opts ...DeleteOpt) error
// Purge will place a delete marker and remove all previous revisions.
Purge(key string) error
Purge(key string, opts ...DeleteOpt) error
// Watch for any updates to keys that match the keys argument which could include wildcards.
// Watch will send a nil entry when it has received all initial values.
Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
Expand Down Expand Up @@ -180,6 +180,40 @@ func (ctx ContextOpt) configurePurge(opts *purgeOpts) error {
return nil
}

type DeleteOpt interface {
configureDelete(opts *deleteOpts) error
}

type deleteOpts struct {
// Remove all previous revisions.
purge bool

// Delete only if the latest revision matches.
revision uint64
}

type deleteOptFn func(opts *deleteOpts) error

func (opt deleteOptFn) configureDelete(opts *deleteOpts) error {
return opt(opts)
}

// LastRevision deletes if the latest revision matches.
func LastRevision(revision uint64) DeleteOpt {
return deleteOptFn(func(opts *deleteOpts) error {
opts.revision = revision
return nil
})
}

// purge removes all previous revisions.
func purge() DeleteOpt {
return deleteOptFn(func(opts *deleteOpts) error {
opts.purge = true
return nil
})
}

// KeyValueConfig is for configuring a KeyValue store.
type KeyValueConfig struct {
Bucket string
Expand Down Expand Up @@ -591,16 +625,7 @@ func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error)
}

// Delete will place a delete marker and leave all revisions.
func (kv *kvs) Delete(key string) error {
return kv.delete(key, false)
}

// Purge will remove the key and all revisions.
func (kv *kvs) Purge(key string) error {
return kv.delete(key, true)
}

func (kv *kvs) delete(key string, purge bool) error {
func (kv *kvs) Delete(key string, opts ...DeleteOpt) error {
if !keyValid(key) {
return ErrInvalidKey
}
Expand All @@ -615,16 +640,35 @@ func (kv *kvs) delete(key string, purge bool) error {
// DEL op marker. For watch functionality.
m := NewMsg(b.String())

if purge {
var o deleteOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureDelete(&o); err != nil {
return err
}
}
}

if o.purge {
m.Header.Set(kvop, kvpurge)
m.Header.Set(MsgRollup, MsgRollupSubject)
} else {
m.Header.Set(kvop, kvdel)
}

if o.revision != 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.revision, 10))
}

_, err := kv.js.PublishMsg(m)
return err
}

// Purge will remove the key and all revisions.
func (kv *kvs) Purge(key string, opts ...DeleteOpt) error {
return kv.Delete(key, append(opts, purge())...)
}

const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute

// PurgeDeletes will remove all current delete markers.
Expand Down
10 changes: 8 additions & 2 deletions test/kv_test.go
Expand Up @@ -67,9 +67,13 @@ func TestKeyValueBasics(t *testing.T) {
if r != 3 {
t.Fatalf("Expected 3 for the revision, got %d", r)
}
err = kv.Delete("name", nats.LastRevision(4))
expectErr(t, err)
err = kv.Delete("name", nats.LastRevision(3))
expectOk(t, err)

// Conditional Updates.
r, err = kv.Update("name", []byte("rip"), 3)
r, err = kv.Update("name", []byte("rip"), 4)
expectOk(t, err)
_, err = kv.Update("name", []byte("ik"), 3)
expectErr(t, err)
Expand Down Expand Up @@ -374,7 +378,9 @@ func TestKeyValueDeleteVsPurge(t *testing.T) {
if len(entries) != 4 {
t.Fatalf("Expected 4 entries for age after delete, got %d", len(entries))
}
err = kv.Purge("name")
err = kv.Purge("name", nats.LastRevision(4))
expectErr(t, err)
err = kv.Purge("name", nats.LastRevision(5))
expectOk(t, err)
// Check marker
e, err := kv.Get("name")
Expand Down

0 comments on commit 95a94e1

Please sign in to comment.