diff --git a/jsm.go b/jsm.go index 0c3ee2978..94547e944 100644 --- a/jsm.go +++ b/jsm.go @@ -795,7 +795,10 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt return nil, err } if resp.Error != nil { - return nil, errors.New(resp.Error.Description) + if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") { + return nil, ErrMsgNotFound + } + return nil, fmt.Errorf("nats: %s", resp.Error.Description) } msg := resp.Message @@ -861,6 +864,7 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { return nil } +// purgeRequest is optional request information to the purge API. type streamPurgeRequest struct { // Purge up to but not including sequence. Sequence uint64 `json:"seq,omitempty"` diff --git a/kv.go b/kv.go index d12e50024..3f399c2b0 100644 --- a/kv.go +++ b/kv.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "regexp" "strconv" "strings" "time" @@ -40,14 +41,16 @@ type KeyValue interface { Create(key string, value []byte) (revision uint64, err error) // Update will update the value iff the latest revision matches. Update(key string, value []byte, last uint64) (revision uint64, err error) - // Delete the key and all revisions. + // Delete will place a delete marker and leave all revisions. Delete(key string) error + // Purge will remove the key and all revisions. + Purge(key string) error // WatchAll will invoke the callback for all updates. WatchAll(cb KeyValueUpdate) (*Subscription, error) // Watch will invoke the callback for any keys that match keyPattern when they update. Watch(keys string, cb KeyValueUpdate) (*Subscription, error) - // List will return all values for the key. - List(key string) ([]KeyValueEntry, error) + // History will return all histroical values for the key. + History(key string) ([]KeyValueEntry, error) // Bucket returns the current bucket name. Bucket() string } @@ -66,9 +69,10 @@ type KeyValueConfig struct { // Used to watch all keys. const ( - AllKeys = ">" - kvop = "KV-Operation" - kvdel = "DEL" + KeyValueMaxHistory = 64 + AllKeys = ">" + kvop = "KV-Operation" + kvdel = "DEL" ) type KeyValueOp uint8 @@ -78,7 +82,7 @@ const ( KeyValueDelete ) -// Retrieved entry for Get or List or Watch. +// KeyValueEntry is a retrieved entry for Get or List or Watch. type KeyValueEntry interface { // Bucket is the bucket the data was loaded from. Bucket() string @@ -96,15 +100,19 @@ type KeyValueEntry interface { Operation() KeyValueOp } -// Callback handler for KeyValue updates. +// KeyValueUpdate is the callback handler for KeyValueEntry updates. type KeyValueUpdate func(v KeyValueEntry) // Errors var ( - ErrBucketNameRequired = errors.New("nats: bucket name is required") - ErrInvalidBucketName = errors.New("nats: invalid bucket name") - ErrBucketNotFound = errors.New("nats: bucket not found") - ErrBadBucket = errors.New("nats: bucket not valid key-value store") + ErrKeyValueConfigRequired = errors.New("nats: config required") + ErrInvalidBucketName = errors.New("nats: invalid bucket name") + ErrInvalidKey = errors.New("nats: invalid key") + ErrBucketNotFound = errors.New("nats: bucket not found") + ErrBadBucket = errors.New("nats: bucket not valid key-value store") + ErrKeyNotFound = errors.New("nats: key not found") + ErrKeyDeleted = errors.New("nats: key was deleted") + ErrHistoryToLarge = errors.New("nats: history limited to a max of 64") ) const ( @@ -114,12 +122,15 @@ const ( kvNoPending = "0" ) +// Regex for valid keys and buckets. +var ( + validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`) + validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`) +) + // KeyValue will lookup and bind to an existing KeyValue store. func (js *js) KeyValue(bucket string) (KeyValue, error) { - if bucket == _EMPTY_ { - return nil, ErrBucketNameRequired - } - if strings.Contains(bucket, ".") { + if !validBucketRe.MatchString(bucket) { return nil, ErrInvalidBucketName } stream := fmt.Sprintf(kvBucketNameTmpl, bucket) @@ -147,9 +158,13 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) { // CreateKeyValue will create a KeyValue store with the following configuration. func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { - if cfg == nil || cfg.Bucket == _EMPTY_ { - return nil, ErrBucketNameRequired + if cfg == nil { + return nil, ErrKeyValueConfigRequired } + if !validBucketRe.MatchString(cfg.Bucket) { + return nil, ErrInvalidBucketName + } + if strings.Contains(cfg.Bucket, ".") { return nil, ErrInvalidBucketName } @@ -157,11 +172,15 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { return nil, err } - // Default to 1 for history. + // Default to 1 for history. Max is 64 for now. history := int64(1) if cfg.History > 0 { + if cfg.History > KeyValueMaxHistory { + return nil, ErrHistoryToLarge + } history = int64(cfg.History) } + replicas := cfg.Replicas if replicas == 0 { replicas = 1 @@ -194,6 +213,9 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { // DeleteKeyValue will delete this KeyValue store (JetStream stream). func (js *js) DeleteKeyValue(bucket string) error { + if !validBucketRe.MatchString(bucket) { + return ErrInvalidBucketName + } stream := fmt.Sprintf(kvBucketNameTmpl, bucket) return js.DeleteStream(stream) } @@ -224,14 +246,28 @@ func (e *kve) Created() time.Time { return e.created } func (e *kve) Delta() uint64 { return e.delta } func (e *kve) Operation() KeyValueOp { return e.op } +func keyValid(key string) bool { + if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' { + return false + } + return validKeyRe.MatchString(key) +} + // Get returns the latest value for the key. func (kv *kvs) Get(key string) (KeyValueEntry, error) { + if !keyValid(key) { + return nil, ErrInvalidKey + } + var b strings.Builder b.WriteString(kv.pre) b.WriteString(key) m, err := kv.js.GetLastMsg(kv.stream, b.String()) if err != nil { + if err == ErrMsgNotFound { + err = ErrKeyNotFound + } return nil, err } @@ -246,7 +282,7 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) { // Double check here that this is not a DEL Operation marker. if len(m.Header) > 0 && m.Header.Get(kvop) == kvdel { entry.op = KeyValueDelete - return entry, ErrMSgNotFound + return entry, ErrKeyDeleted } return entry, nil @@ -254,6 +290,10 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) { // Put will place the new value for the key into the store. func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) { + if !keyValid(key) { + return 0, ErrInvalidKey + } + var b strings.Builder b.WriteString(kv.pre) b.WriteString(key) @@ -273,7 +313,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { } // TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that // so we need to double check. - if e, err := kv.Get(key); err == ErrMSgNotFound { + if e, err := kv.Get(key); err == ErrKeyDeleted { return kv.Update(key, value, e.Revision()) } return 0, err @@ -281,6 +321,10 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { // Update will update the value iff the latest revision matches. func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) { + if !keyValid(key) { + return 0, ErrInvalidKey + } + var b strings.Builder b.WriteString(kv.pre) b.WriteString(key) @@ -295,35 +339,42 @@ func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) return pa.Sequence, err } -// Delete the key and all revisions. +// Delete will place a delete marker and leave all revisions. func (kv *kvs) Delete(key string) error { + return kv.delete(key, false) +} + +// Purge will remove the key and all revisions. +func (kv *kvs) Purge(key string) error { + return kv.delete(key, true) +} + +func (kv *kvs) delete(key string, purge bool) error { + if !keyValid(key) { + return ErrInvalidKey + } + var b strings.Builder b.WriteString(kv.pre) b.WriteString(key) // DEL op marker. For watch functionality. - m := Msg{Subject: b.String(), Header: Header{}} + m := NewMsg(b.String()) m.Header.Set(kvop, kvdel) - paf, err := kv.js.PublishMsgAsync(&m) - if err != nil { - return err - } - err = kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String(), Keep: 1}) - if err != nil { - return err + _, err := kv.js.PublishMsg(m) + if err == nil && purge { + err = kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String()}) } + return err +} - // Double check the pubAck future. - select { - case <-paf.Ok(): - return nil - case err := <-paf.Err(): - return err +// History will return all values for the key. +func (kv *kvs) History(key string) ([]KeyValueEntry, error) { + // Do quick check to make sure this is a legit key with history. + if _, err := kv.Get(key); err != nil && err != ErrKeyDeleted { + return nil, err } -} -// List will return all values for the key. -func (kv *kvs) List(key string) ([]KeyValueEntry, error) { o, cancel, err := getJSContextOpts(kv.js.opts) if err != nil { return nil, err @@ -396,6 +447,7 @@ func (kv *kvs) WatchAll(cb KeyValueUpdate) (*Subscription, error) { // Watch will fire the callback when a key that matches the keys pattern is updated. // keys needs to be a valid NATS subject. func (kv *kvs) Watch(keys string, cb KeyValueUpdate) (*Subscription, error) { + // Could be a pattern so don't check for validity as we normally do. var b strings.Builder b.WriteString(kv.pre) b.WriteString(keys) diff --git a/nats.go b/nats.go index 49a76e8c6..777d540b8 100644 --- a/nats.go +++ b/nats.go @@ -155,7 +155,7 @@ var ( ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer") ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer") ErrConsumerNotActive = errors.New("nats: consumer not active") - ErrMSgNotFound = errors.New("nats: message not found") + ErrMsgNotFound = errors.New("nats: message not found") ) func init() { diff --git a/test/js_test.go b/test/js_test.go index 1c96656b9..e31042046 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1597,7 +1597,7 @@ func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) { // Try to fetch the same message which should be gone. _, err = js.GetMsg("foo", originalSeq) - if err == nil || err.Error() != `no message found` { + if err == nil || err != nats.ErrMsgNotFound { t.Errorf("Expected no message found error, got: %v", err) } }) diff --git a/test/kv_test.go b/test/kv_test.go index 8a47e9efe..f5bdc9384 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -58,7 +58,7 @@ func TestKeyValueBasics(t *testing.T) { err = kv.Delete("name") expectOk(t, err) _, err = kv.Get("name") - expectErr(t, err) + expectErr(t, err, nats.ErrKeyDeleted) r, err = kv.Create("name", []byte("derek")) expectOk(t, err) if r != 3 { @@ -78,7 +78,7 @@ func TestKeyValueBasics(t *testing.T) { expectOk(t, err) } -func TestKeyValueList(t *testing.T) { +func TestKeyValueHistory(t *testing.T) { s := RunBasicJetStreamServer() defer shutdown(s) @@ -94,7 +94,7 @@ func TestKeyValueList(t *testing.T) { expectOk(t, err) } - vl, err := kv.List("age") + vl, err := kv.History("age") expectOk(t, err) if len(vl) != 10 { @@ -237,6 +237,41 @@ func TestKeyValueDeleteStore(t *testing.T) { expectErr(t, err) } +func TestKeyValueDeleteVsPurge(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10}) + expectOk(t, err) + + put := func(key, value string) { + _, err := kv.Put(key, []byte(value)) + expectOk(t, err) + } + + // Put in a few names and ages. + put("name", "derek") + put("age", "22") + put("name", "ivan") + put("age", "33") + put("name", "rip") + put("age", "44") + + kv.Delete("age") + entries, err := kv.History("age") + expectOk(t, err) + // Expect three entries and delete marker. + if len(entries) != 4 { + t.Fatalf("Expected 4 entries for age after delete, got %d", len(entries)) + } + kv.Purge("name") + _, err = kv.History("name") + expectErr(t, err, nats.ErrKeyNotFound) +} + // Helpers func client(t *testing.T, s *server.Server) *nats.Conn { diff --git a/test/object_test.go b/test/object_test.go index 842add2f3..a7613cd93 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -377,9 +377,6 @@ func TestObjectWatch(t *testing.T) { err = obs.Delete("A") expectOk(t, err) - // FIXME(dlc) - I think there is a bug in server on rollup that causes an update on "B" here. - expectUpdate("B") - expectUpdate("A") expectNoMoreUpdates()