Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetRevision to nats.KeyValue #903

Merged
merged 3 commits into from Feb 10, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 31 additions & 5 deletions kv.go
Expand Up @@ -42,6 +42,8 @@ type KeyValueManager interface {
type KeyValue interface {
// Get returns the latest value for the key.
Get(key string) (entry KeyValueEntry, err error)
// GetRevision returns a specific revision value for the key.
GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)
// Put will place the new value for the key into the store.
Put(key string, value []byte) (revision uint64, err error)
// PutString will place the string for the key into the store.
Expand Down Expand Up @@ -163,6 +165,7 @@ type KeyValueConfig struct {
const (
KeyValueMaxHistory = 64
AllKeys = ">"
kvLatestRevision = 0
kvop = "KV-Operation"
kvdel = "DEL"
kvpurge = "PURGE"
Expand Down Expand Up @@ -369,7 +372,7 @@ func keyValid(key string) bool {

// Get returns the latest value for the key.
func (kv *kvs) Get(key string) (KeyValueEntry, error) {
e, err := kv.get(key)
e, err := kv.get(key, kvLatestRevision)
if err == ErrKeyDeleted {
return nil, ErrKeyNotFound
}
Expand All @@ -380,7 +383,20 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) {
return e, nil
}

func (kv *kvs) get(key string) (KeyValueEntry, error) {
// GetRevision returns a specific revision value for the key.
func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) {
e, err := kv.get(key, revision)
if err == ErrKeyDeleted {
return nil, ErrKeyNotFound
}
if err != nil {
return nil, err
}
kozlovic marked this conversation as resolved.
Show resolved Hide resolved

return e, nil
}

func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
if !keyValid(key) {
return nil, ErrInvalidKey
}
Expand All @@ -389,14 +405,25 @@ func (kv *kvs) get(key string) (KeyValueEntry, error) {
b.WriteString(kv.pre)
b.WriteString(key)

m, err := kv.js.GetLastMsg(kv.stream, b.String())
var m *RawStreamMsg
var err error
if revision == kvLatestRevision {
m, err = kv.js.GetLastMsg(kv.stream, b.String())
} else {
m, err = kv.js.GetMsg(kv.stream, revision)
}

if err != nil {
if err == ErrMsgNotFound {
err = ErrKeyNotFound
}
return nil, err
}

if m.Subject != b.String() {
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
return nil, ErrKeyNotFound
}

entry := &kve{
bucket: kv.name,
key: key,
Expand All @@ -415,7 +442,6 @@ func (kv *kvs) get(key string) (KeyValueEntry, error) {
entry.op = KeyValuePurge
return entry, ErrKeyDeleted
}

}

return entry, nil
Expand Down Expand Up @@ -452,7 +478,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {

// TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
// so we need to double check.
if e, err := kv.get(key); err == ErrKeyDeleted {
if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted {
return kv.Update(key, value, e.Revision())
}

Expand Down