diff --git a/kv.go b/kv.go index cf5468bce..3b0ab12d5 100644 --- a/kv.go +++ b/kv.go @@ -66,13 +66,35 @@ type KeyValue interface { Bucket() string // PurgeDeletes will remove all current delete markers. PurgeDeletes(opts ...WatchOpt) error + // Status retrieves the status and configuration of a bucket + Status() (KeyValueStatus, error) +} + +type KeyValueStatus interface { + // Bucket the name of the bucket + Bucket() string + + // Values is how many messages are in the bucket, including historical values + Values() uint64 + + // History returns the configured history kept per key + History() int64 + + // TTL is how long the bucket keeps values for + TTL() time.Duration + + // Replicas is how many storage replicas are kept + Replicas() int + + // StreamName is the name of the stream used to store the data + StreamName() string } // KeyWatcher is what is returned when doing a watch. type KeyWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan KeyValueEntry - // Stop() will stop this watcher. + // Stop will stop this watcher. Stop() error } @@ -642,3 +664,36 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { func (kv *kvs) Bucket() string { return kv.name } + +type kvStatus struct { + nfo *StreamInfo + bucket string +} + +// Bucket the name of the bucket +func (s *kvStatus) Bucket() string { return s.bucket } + +// Values is how many messages are in the bucket, including historical values +func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs } + +// History returns the configured history kept per key +func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject } + +// TTL is how long the bucket keeps values for +func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge } + +// Replicas is how many storage replicas are kept +func (s *kvStatus) Replicas() int { return s.nfo.Config.Replicas } + +// StreamName is the name of the stream used to store the data +func (s *kvStatus) StreamName() string { return s.nfo.Config.Name } + +// Status retrieves the status and configuration of a bucket +func (kv *kvs) Status() (KeyValueStatus, error) { + nfo, err := kv.js.StreamInfo(kv.stream) + if err != nil { + return nil, err + } + + return &kvStatus{nfo: nfo, bucket: kv.name}, nil +} diff --git a/test/kv_test.go b/test/kv_test.go index 01934ba48..f74fa88bc 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -33,7 +33,7 @@ func TestKeyValueBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour}) expectOk(t, err) if kv.Bucket() != "TEST" { @@ -78,6 +78,28 @@ func TestKeyValueBasics(t *testing.T) { expectOk(t, err) _, err = kv.Update("age", []byte("33"), r) expectOk(t, err) + + // Status + status, err := kv.Status() + expectOk(t, err) + if status.Replicas() != 1 { + t.Fatalf("expected 1 replica got %d", status.Replicas()) + } + if status.History() != 5 { + t.Fatalf("expected history of 5 got %d", status.History()) + } + if status.Bucket() != "TEST" { + t.Fatalf("expected bucket TEST got %v", status.Bucket()) + } + if status.TTL() != time.Hour { + t.Fatalf("expected 1 hour TTL got %v", status.TTL()) + } + if status.Values() != 7 { + t.Fatalf("expected 7 values got %d", status.Values()) + } + if status.StreamName() != "KV_TEST" { + t.Fatalf("expected KV_TEST stream got %v", status.StreamName()) + } } func TestKeyValueHistory(t *testing.T) {