Skip to content

Commit

Permalink
Merge pull request #903 from boxboatmatt/feature/get-revision
Browse files Browse the repository at this point in the history
Add GetRevision to nats.KeyValue
  • Loading branch information
kozlovic committed Feb 10, 2022
2 parents 52371e4 + 14145b0 commit 3ead809
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions kv.go
Expand Up @@ -42,6 +42,8 @@ type KeyValueManager interface {
type KeyValue interface {
// Get returns the latest value for the key.
Get(key string) (entry KeyValueEntry, err error)
// GetRevision returns a specific revision value for the key.
GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)
// Put will place the new value for the key into the store.
Put(key string, value []byte) (revision uint64, err error)
// PutString will place the string for the key into the store.
Expand Down Expand Up @@ -163,6 +165,7 @@ type KeyValueConfig struct {
const (
KeyValueMaxHistory = 64
AllKeys = ">"
kvLatestRevision = 0
kvop = "KV-Operation"
kvdel = "DEL"
kvpurge = "PURGE"
Expand Down Expand Up @@ -369,18 +372,31 @@ func keyValid(key string) bool {

// Get returns the latest value for the key.
func (kv *kvs) Get(key string) (KeyValueEntry, error) {
e, err := kv.get(key)
if err == ErrKeyDeleted {
return nil, ErrKeyNotFound
e, err := kv.get(key, kvLatestRevision)
if err != nil {
if err == ErrKeyDeleted {
return nil, ErrKeyNotFound
}
return nil, err
}

return e, nil
}

// GetRevision returns a specific revision value for the key.
func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) {
e, err := kv.get(key, revision)
if err != nil {
if err == ErrKeyDeleted {
return nil, ErrKeyNotFound
}
return nil, err
}

return e, nil
}

func (kv *kvs) get(key string) (KeyValueEntry, error) {
func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
if !keyValid(key) {
return nil, ErrInvalidKey
}
Expand All @@ -389,7 +405,17 @@ func (kv *kvs) get(key string) (KeyValueEntry, error) {
b.WriteString(kv.pre)
b.WriteString(key)

m, err := kv.js.GetLastMsg(kv.stream, b.String())
var m *RawStreamMsg
var err error
if revision == kvLatestRevision {
m, err = kv.js.GetLastMsg(kv.stream, b.String())
} else {
m, err = kv.js.GetMsg(kv.stream, revision)
if err == nil && m.Subject != b.String() {
return nil, ErrKeyNotFound
}
}

if err != nil {
if err == ErrMsgNotFound {
err = ErrKeyNotFound
Expand All @@ -415,7 +441,6 @@ func (kv *kvs) get(key string) (KeyValueEntry, error) {
entry.op = KeyValuePurge
return entry, ErrKeyDeleted
}

}

return entry, nil
Expand Down Expand Up @@ -452,7 +477,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {

// TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
// so we need to double check.
if e, err := kv.get(key); err == ErrKeyDeleted {
if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted {
return kv.Update(key, value, e.Revision())
}

Expand Down

0 comments on commit 3ead809

Please sign in to comment.