Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kv status interface #843

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 56 additions & 1 deletion kv.go
Expand Up @@ -66,13 +66,35 @@ type KeyValue interface {
Bucket() string
// PurgeDeletes will remove all current delete markers.
PurgeDeletes(opts ...WatchOpt) error
// Status retrieves the status and configuration of a bucket
Status() (KeyValueStatus, error)
}

type KeyValueStatus interface {
// Bucket the name of the bucket
Bucket() string

// Values is how many messages are in the bucket, including historical values
Values() uint64

// History returns the configured history kept per key
History() int64

// TTL is how long the bucket keeps values for
TTL() time.Duration

// Replicas is how many storage replicas are kept
Replicas() int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is a bit simplistic - since we should communicate healthy and failed replicas to users who dont know/care for JS underlying specirfics. I had it Replicas() (healthy int, failed int) but you didnt like that @derekcollison thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replicas needs a bit more thought imo, so would not rush to add that here just yet imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLI needs to be able to show the health of a bucket

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand but again I think we need to do some more ground work on mirrored replicas etc. Right now we do not show stream health based on mirrors AFAIK. I know we do report it though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cli shows it, healthy/unhealthy/lagged/offline etc, we can minimise that down to ok, bad int or similar, thats enough for KV users to alert their storage/backend admins who understand the jetstream specifics


// StreamName is the name of the stream used to store the data
StreamName() string
Copy link
Contributor Author

@ripienaar ripienaar Oct 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one I would prefer a bit more generic, I had it BackingStore in the ADR. The idea is that once we have other backends that this would be some kind of pointer to the specific kind. So string might not be enough unless its a url, for jetstream at least we need to know stream name, domain, placement tags and placement cluster.

Initially I thought some kind of url structure, but not sure now, thoughts @derekcollison ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be tempted again here to not offer anything until we get more feedback and figure out replicas and how we want this to interop with other impls with other backends etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the feedback, the CLI needs it hence the PR.

We need to be able to do things like backup the stream, restore etc, there are several quality of life things around buckets that is JS specific and where knowing the underlying stream name and location helps. We can ofcourse double up the implementation details, but thats no good.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming transition from KV to Stream is well known and that is what is needed. The CLI can do that on its own no?

I am ok with making this more clearly aligned with JetStream, but I know you feel strongly about a generalized abstraction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont at this point realistically expect nats.go to get non js behaviors. We'd rather make a new one client library eventually that satisfies the interface (hence all the interface investment).

But for StreamName() we can go back to BackingStore() and have it return something like:

type BackingStore interface {
  Kind string // jetstream, ngs, kine, etc
  Store string // KV_FOO, depends on Kind what you put here
  Options map[string]interface{} // domain:foo for example? obviously this can improve a bit, just showing the idea
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly.. Or we do manager additions that can give you back a stream info for the KV and ObjectStores and leave the instance interfaces void to maximize compatibility. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I considered, but map[string]interface at least makes nice json by default:

type BackingStore interface {
  Kind string // jetstream, ngs, kine, etc
  Store string // KV_FOO, depends on Kind what you put here
  Options interface{} // Kind specific types
}

So the first two support CLI info display and such, 2nd programatic things, if you know your tool supports Kind==JetStream you can cast Options to JS specific things which would be stream info and stuff like cluster info perhaps

Copy link
Contributor Author

@ripienaar ripienaar Oct 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to be most useful:

type BackingStore interface {
  Kind() string // "JETSTREAM" etc
  Info() map[string]string // domain:foo for example? kind specific stuff
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with Kind() (and can be string I guess versus enum) and some sort of map[string]string.

Not sure Options is the right thing to call it, but that is a harder problem ;)

}

// KeyWatcher is what is returned when doing a watch.
type KeyWatcher interface {
// Updates returns a channel to read any updates to entries.
Updates() <-chan KeyValueEntry
// Stop() will stop this watcher.
// Stop will stop this watcher.
Stop() error
}

Expand Down Expand Up @@ -642,3 +664,36 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
func (kv *kvs) Bucket() string {
return kv.name
}

type kvStatus struct {
nfo *StreamInfo
bucket string
}

// Bucket the name of the bucket
func (s *kvStatus) Bucket() string { return s.bucket }

// Values is how many messages are in the bucket, including historical values
func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs }

// History returns the configured history kept per key
func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }

// TTL is how long the bucket keeps values for
func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }

// Replicas is how many storage replicas are kept
func (s *kvStatus) Replicas() int { return s.nfo.Config.Replicas }

// StreamName is the name of the stream used to store the data
func (s *kvStatus) StreamName() string { return s.nfo.Config.Name }

// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
nfo, err := kv.js.StreamInfo(kv.stream)
if err != nil {
return nil, err
}

return &kvStatus{nfo: nfo, bucket: kv.name}, nil
}
24 changes: 23 additions & 1 deletion test/kv_test.go
Expand Up @@ -33,7 +33,7 @@ func TestKeyValueBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour})
expectOk(t, err)

if kv.Bucket() != "TEST" {
Expand Down Expand Up @@ -78,6 +78,28 @@ func TestKeyValueBasics(t *testing.T) {
expectOk(t, err)
_, err = kv.Update("age", []byte("33"), r)
expectOk(t, err)

// Status
status, err := kv.Status()
expectOk(t, err)
if status.Replicas() != 1 {
t.Fatalf("expected 1 replica got %d", status.Replicas())
}
if status.History() != 5 {
t.Fatalf("expected history of 5 got %d", status.History())
}
if status.Bucket() != "TEST" {
t.Fatalf("expected bucket TEST got %v", status.Bucket())
}
if status.TTL() != time.Hour {
t.Fatalf("expected 1 hour TTL got %v", status.TTL())
}
if status.Values() != 7 {
t.Fatalf("expected 7 values got %d", status.Values())
}
if status.StreamName() != "KV_TEST" {
t.Fatalf("expected KV_TEST stream got %v", status.StreamName())
}
}

func TestKeyValueHistory(t *testing.T) {
Expand Down