Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Conditional delete and purge for KV #856

Merged
merged 1 commit into from Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 57 additions & 13 deletions kv.go
Expand Up @@ -54,9 +54,9 @@ type KeyValue interface {
// Update will update the value iff the latest revision matches.
Update(key string, value []byte, last uint64) (revision uint64, err error)
// Delete will place a delete marker and leave all revisions.
Delete(key string) error
Delete(key string, opts ...DeleteOpt) error
// Purge will place a delete marker and remove all previous revisions.
Purge(key string) error
Purge(key string, opts ...DeleteOpt) error
// Watch for any updates to keys that match the keys argument which could include wildcards.
// Watch will send a nil entry when it has received all initial values.
Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
Expand Down Expand Up @@ -180,6 +180,40 @@ func (ctx ContextOpt) configurePurge(opts *purgeOpts) error {
return nil
}

type DeleteOpt interface {
configureDelete(opts *deleteOpts) error
}

type deleteOpts struct {
// Remove all previous revisions.
purge bool

// Delete only if the latest revision matches.
revision uint64
}

type deleteOptFn func(opts *deleteOpts) error

func (opt deleteOptFn) configureDelete(opts *deleteOpts) error {
return opt(opts)
}

// LastRevision deletes if the latest revision matches.
func LastRevision(revision uint64) DeleteOpt {
return deleteOptFn(func(opts *deleteOpts) error {
opts.revision = revision
return nil
})
}

// purge removes all previous revisions.
func purge() DeleteOpt {
return deleteOptFn(func(opts *deleteOpts) error {
opts.purge = true
return nil
})
}

// KeyValueConfig is for configuring a KeyValue store.
type KeyValueConfig struct {
Bucket string
Expand Down Expand Up @@ -591,16 +625,7 @@ func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error)
}

// Delete will place a delete marker and leave all revisions.
func (kv *kvs) Delete(key string) error {
return kv.delete(key, false)
}

// Purge will remove the key and all revisions.
func (kv *kvs) Purge(key string) error {
return kv.delete(key, true)
}

func (kv *kvs) delete(key string, purge bool) error {
func (kv *kvs) Delete(key string, opts ...DeleteOpt) error {
if !keyValid(key) {
return ErrInvalidKey
}
Expand All @@ -615,16 +640,35 @@ func (kv *kvs) delete(key string, purge bool) error {
// DEL op marker. For watch functionality.
m := NewMsg(b.String())

if purge {
var o deleteOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureDelete(&o); err != nil {
return err
}
}
}

if o.purge {
m.Header.Set(kvop, kvpurge)
m.Header.Set(MsgRollup, MsgRollupSubject)
} else {
m.Header.Set(kvop, kvdel)
}

if o.revision != 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.revision, 10))
}

_, err := kv.js.PublishMsg(m)
return err
}

// Purge will remove the key and all revisions.
func (kv *kvs) Purge(key string, opts ...DeleteOpt) error {
return kv.Delete(key, append(opts, purge())...)
}

const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute

// PurgeDeletes will remove all current delete markers.
Expand Down
10 changes: 8 additions & 2 deletions test/kv_test.go
Expand Up @@ -67,9 +67,13 @@ func TestKeyValueBasics(t *testing.T) {
if r != 3 {
t.Fatalf("Expected 3 for the revision, got %d", r)
}
err = kv.Delete("name", nats.LastRevision(4))
expectErr(t, err)
err = kv.Delete("name", nats.LastRevision(3))
expectOk(t, err)

// Conditional Updates.
r, err = kv.Update("name", []byte("rip"), 3)
r, err = kv.Update("name", []byte("rip"), 4)
expectOk(t, err)
_, err = kv.Update("name", []byte("ik"), 3)
expectErr(t, err)
Expand Down Expand Up @@ -374,7 +378,9 @@ func TestKeyValueDeleteVsPurge(t *testing.T) {
if len(entries) != 4 {
t.Fatalf("Expected 4 entries for age after delete, got %d", len(entries))
}
err = kv.Purge("name")
err = kv.Purge("name", nats.LastRevision(4))
expectErr(t, err)
err = kv.Purge("name", nats.LastRevision(5))
expectOk(t, err)
// Check marker
e, err := kv.Get("name")
Expand Down