Skip to content

Commit

Permalink
Updates based on PR feedback around key and bucket validation, Delete…
Browse files Browse the repository at this point in the history
… and Purge keys

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 4, 2021
1 parent 8e8d7ff commit 4c3e19d
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 48 deletions.
6 changes: 5 additions & 1 deletion jsm.go
Expand Up @@ -795,7 +795,10 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") {
return nil, ErrMsgNotFound
}
return nil, fmt.Errorf("nats: %s", resp.Error.Description)
}

msg := resp.Message
Expand Down Expand Up @@ -861,6 +864,7 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
return nil
}

// purgeRequest is optional request information to the purge API.
type streamPurgeRequest struct {
// Purge up to but not including sequence.
Sequence uint64 `json:"seq,omitempty"`
Expand Down
130 changes: 91 additions & 39 deletions kv.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -40,14 +41,16 @@ type KeyValue interface {
Create(key string, value []byte) (revision uint64, err error)
// Update will update the value iff the latest revision matches.
Update(key string, value []byte, last uint64) (revision uint64, err error)
// Delete the key and all revisions.
// Delete will place a delete marker and leave all revisions.
Delete(key string) error
// Purge will remove the key and all revisions.
Purge(key string) error
// WatchAll will invoke the callback for all updates.
WatchAll(cb KeyValueUpdate) (*Subscription, error)
// Watch will invoke the callback for any keys that match keyPattern when they update.
Watch(keys string, cb KeyValueUpdate) (*Subscription, error)
// List will return all values for the key.
List(key string) ([]KeyValueEntry, error)
// History will return all histroical values for the key.
History(key string) ([]KeyValueEntry, error)
// Bucket returns the current bucket name.
Bucket() string
}
Expand All @@ -66,9 +69,10 @@ type KeyValueConfig struct {

// Used to watch all keys.
const (
AllKeys = ">"
kvop = "KV-Operation"
kvdel = "DEL"
KeyValueMaxHistory = 64
AllKeys = ">"
kvop = "KV-Operation"
kvdel = "DEL"
)

type KeyValueOp uint8
Expand All @@ -78,7 +82,7 @@ const (
KeyValueDelete
)

// Retrieved entry for Get or List or Watch.
// KeyValueEntry is a retrieved entry for Get or List or Watch.
type KeyValueEntry interface {
// Bucket is the bucket the data was loaded from.
Bucket() string
Expand All @@ -96,15 +100,19 @@ type KeyValueEntry interface {
Operation() KeyValueOp
}

// Callback handler for KeyValue updates.
// KeyValueUpdate is the callback handler for KeyValueEntry updates.
type KeyValueUpdate func(v KeyValueEntry)

// Errors
var (
ErrBucketNameRequired = errors.New("nats: bucket name is required")
ErrInvalidBucketName = errors.New("nats: invalid bucket name")
ErrBucketNotFound = errors.New("nats: bucket not found")
ErrBadBucket = errors.New("nats: bucket not valid key-value store")
ErrKeyValueConfigRequired = errors.New("nats: config required")
ErrInvalidBucketName = errors.New("nats: invalid bucket name")
ErrInvalidKey = errors.New("nats: invalid key")
ErrBucketNotFound = errors.New("nats: bucket not found")
ErrBadBucket = errors.New("nats: bucket not valid key-value store")
ErrKeyNotFound = errors.New("nats: key not found")
ErrKeyDeleted = errors.New("nats: key was deleted")
ErrHistoryToLarge = errors.New("nats: history limited to a max of 64")
)

const (
Expand All @@ -114,12 +122,15 @@ const (
kvNoPending = "0"
)

// Regex for valid keys and buckets.
var (
validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`)
validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`)
)

// KeyValue will lookup and bind to an existing KeyValue store.
func (js *js) KeyValue(bucket string) (KeyValue, error) {
if bucket == _EMPTY_ {
return nil, ErrBucketNameRequired
}
if strings.Contains(bucket, ".") {
if !validBucketRe.MatchString(bucket) {
return nil, ErrInvalidBucketName
}
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
Expand Down Expand Up @@ -147,21 +158,29 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) {

// CreateKeyValue will create a KeyValue store with the following configuration.
func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
if cfg == nil || cfg.Bucket == _EMPTY_ {
return nil, ErrBucketNameRequired
if cfg == nil {
return nil, ErrKeyValueConfigRequired
}
if !validBucketRe.MatchString(cfg.Bucket) {
return nil, ErrInvalidBucketName
}

if strings.Contains(cfg.Bucket, ".") {
return nil, ErrInvalidBucketName
}
if _, err := js.AccountInfo(); err != nil {
return nil, err
}

// Default to 1 for history.
// Default to 1 for history. Max is 64 for now.
history := int64(1)
if cfg.History > 0 {
if cfg.History > KeyValueMaxHistory {
return nil, ErrHistoryToLarge
}
history = int64(cfg.History)
}

replicas := cfg.Replicas
if replicas == 0 {
replicas = 1
Expand Down Expand Up @@ -194,6 +213,9 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {

// DeleteKeyValue will delete this KeyValue store (JetStream stream).
func (js *js) DeleteKeyValue(bucket string) error {
if !validBucketRe.MatchString(bucket) {
return ErrInvalidBucketName
}
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
return js.DeleteStream(stream)
}
Expand Down Expand Up @@ -224,14 +246,28 @@ func (e *kve) Created() time.Time { return e.created }
func (e *kve) Delta() uint64 { return e.delta }
func (e *kve) Operation() KeyValueOp { return e.op }

func keyValid(key string) bool {
if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' {
return false
}
return validKeyRe.MatchString(key)
}

// Get returns the latest value for the key.
func (kv *kvs) Get(key string) (KeyValueEntry, error) {
if !keyValid(key) {
return nil, ErrInvalidKey
}

var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(key)

m, err := kv.js.GetLastMsg(kv.stream, b.String())
if err != nil {
if err == ErrMsgNotFound {
err = ErrKeyNotFound
}
return nil, err
}

Expand All @@ -246,14 +282,18 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) {
// Double check here that this is not a DEL Operation marker.
if len(m.Header) > 0 && m.Header.Get(kvop) == kvdel {
entry.op = KeyValueDelete
return entry, ErrMSgNotFound
return entry, ErrKeyDeleted
}

return entry, nil
}

// Put will place the new value for the key into the store.
func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) {
if !keyValid(key) {
return 0, ErrInvalidKey
}

var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(key)
Expand All @@ -273,14 +313,18 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
}
// TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
// so we need to double check.
if e, err := kv.Get(key); err == ErrMSgNotFound {
if e, err := kv.Get(key); err == ErrKeyDeleted {
return kv.Update(key, value, e.Revision())
}
return 0, err
}

// Update will update the value iff the latest revision matches.
func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) {
if !keyValid(key) {
return 0, ErrInvalidKey
}

var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(key)
Expand All @@ -295,35 +339,42 @@ func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error)
return pa.Sequence, err
}

// Delete the key and all revisions.
// Delete will place a delete marker and leave all revisions.
func (kv *kvs) Delete(key string) error {
return kv.delete(key, false)
}

// Purge will remove the key and all revisions.
func (kv *kvs) Purge(key string) error {
return kv.delete(key, true)
}

func (kv *kvs) delete(key string, purge bool) error {
if !keyValid(key) {
return ErrInvalidKey
}

var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(key)

// DEL op marker. For watch functionality.
m := Msg{Subject: b.String(), Header: Header{}}
m := NewMsg(b.String())
m.Header.Set(kvop, kvdel)
paf, err := kv.js.PublishMsgAsync(&m)
if err != nil {
return err
}
err = kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String(), Keep: 1})
if err != nil {
return err
_, err := kv.js.PublishMsg(m)
if err == nil && purge {
err = kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String()})
}
return err
}

// Double check the pubAck future.
select {
case <-paf.Ok():
return nil
case err := <-paf.Err():
return err
// History will return all values for the key.
func (kv *kvs) History(key string) ([]KeyValueEntry, error) {
// Do quick check to make sure this is a legit key with history.
if _, err := kv.Get(key); err != nil && err != ErrKeyDeleted {
return nil, err
}
}

// List will return all values for the key.
func (kv *kvs) List(key string) ([]KeyValueEntry, error) {
o, cancel, err := getJSContextOpts(kv.js.opts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -396,6 +447,7 @@ func (kv *kvs) WatchAll(cb KeyValueUpdate) (*Subscription, error) {
// Watch will fire the callback when a key that matches the keys pattern is updated.
// keys needs to be a valid NATS subject.
func (kv *kvs) Watch(keys string, cb KeyValueUpdate) (*Subscription, error) {
// Could be a pattern so don't check for validity as we normally do.
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(keys)
Expand Down
2 changes: 1 addition & 1 deletion nats.go
Expand Up @@ -155,7 +155,7 @@ var (
ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer")
ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer")
ErrConsumerNotActive = errors.New("nats: consumer not active")
ErrMSgNotFound = errors.New("nats: message not found")
ErrMsgNotFound = errors.New("nats: message not found")
)

func init() {
Expand Down
2 changes: 1 addition & 1 deletion test/js_test.go
Expand Up @@ -1597,7 +1597,7 @@ func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) {

// Try to fetch the same message which should be gone.
_, err = js.GetMsg("foo", originalSeq)
if err == nil || err.Error() != `no message found` {
if err == nil || err != nats.ErrMsgNotFound {
t.Errorf("Expected no message found error, got: %v", err)
}
})
Expand Down
41 changes: 38 additions & 3 deletions test/kv_test.go
Expand Up @@ -58,7 +58,7 @@ func TestKeyValueBasics(t *testing.T) {
err = kv.Delete("name")
expectOk(t, err)
_, err = kv.Get("name")
expectErr(t, err)
expectErr(t, err, nats.ErrKeyDeleted)
r, err = kv.Create("name", []byte("derek"))
expectOk(t, err)
if r != 3 {
Expand All @@ -78,7 +78,7 @@ func TestKeyValueBasics(t *testing.T) {
expectOk(t, err)
}

func TestKeyValueList(t *testing.T) {
func TestKeyValueHistory(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)

Expand All @@ -94,7 +94,7 @@ func TestKeyValueList(t *testing.T) {
expectOk(t, err)
}

vl, err := kv.List("age")
vl, err := kv.History("age")
expectOk(t, err)

if len(vl) != 10 {
Expand Down Expand Up @@ -237,6 +237,41 @@ func TestKeyValueDeleteStore(t *testing.T) {
expectErr(t, err)
}

func TestKeyValueDeleteVsPurge(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10})
expectOk(t, err)

put := func(key, value string) {
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

// Put in a few names and ages.
put("name", "derek")
put("age", "22")
put("name", "ivan")
put("age", "33")
put("name", "rip")
put("age", "44")

kv.Delete("age")
entries, err := kv.History("age")
expectOk(t, err)
// Expect three entries and delete marker.
if len(entries) != 4 {
t.Fatalf("Expected 4 entries for age after delete, got %d", len(entries))
}
kv.Purge("name")
_, err = kv.History("name")
expectErr(t, err, nats.ErrKeyNotFound)
}

// Helpers

func client(t *testing.T, s *server.Server) *nats.Conn {
Expand Down
3 changes: 0 additions & 3 deletions test/object_test.go
Expand Up @@ -377,9 +377,6 @@ func TestObjectWatch(t *testing.T) {
err = obs.Delete("A")
expectOk(t, err)

// FIXME(dlc) - I think there is a bug in server on rollup that causes an update on "B" here.
expectUpdate("B")

expectUpdate("A")
expectNoMoreUpdates()

Expand Down

0 comments on commit 4c3e19d

Please sign in to comment.