From 92b850dca5a1807932fbf02267cc27405c5092ef Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Mon, 11 Oct 2021 17:50:27 +0200 Subject: [PATCH] add kv and object status functionality Signed-off-by: R.I.Pienaar --- js.go | 10 ++++++- jsm.go | 2 +- kv.go | 69 ++++++++++++++++++++++++++++++++++++++++++++- object.go | 62 +++++++++++++++++++++++++++++++++++++++- test/kv_test.go | 26 ++++++++++++++++- test/object_test.go | 44 ++++++++++++++++++++++++++++- 6 files changed, 207 insertions(+), 6 deletions(-) diff --git a/js.go b/js.go index 8e50f83ee..9b0c09c34 100644 --- a/js.go +++ b/js.go @@ -214,6 +214,8 @@ type jsOpts struct { aecb MsgErrHandler // Maximum in flight. maxap int + // the domain that produced the pre + domain string } const ( @@ -258,7 +260,13 @@ func Domain(domain string) JSOpt { return APIPrefix(_EMPTY_) } - return APIPrefix(fmt.Sprintf(jsDomainT, domain)) + return jsOptFn(func(js *jsOpts) error { + js.domain = domain + js.pre = fmt.Sprintf(jsDomainT, domain) + + return nil + }) + } // APIPrefix changes the default prefix used for the JetStream API. diff --git a/jsm.go b/jsm.go index 95f38b593..382879aae 100644 --- a/jsm.go +++ b/jsm.go @@ -1132,7 +1132,7 @@ func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, if o.ctx == nil && o.wait > 0 { o.ctx, cancel = context.WithTimeout(context.Background(), o.wait) } - if o.pre == "" { + if o.pre == _EMPTY_ { o.pre = defs.pre } diff --git a/kv.go b/kv.go index 30ea14208..4cf420f25 100644 --- a/kv.go +++ b/kv.go @@ -66,13 +66,33 @@ type KeyValue interface { Bucket() string // PurgeDeletes will remove all current delete markers. PurgeDeletes(opts ...WatchOpt) error + // Status retrieves the status and configuration of a bucket + Status() (KeyValueStatus, error) +} + +// KeyValueStatus is run-time status about a Key-Value bucket +type KeyValueStatus interface { + // Bucket the name of the bucket + Bucket() string + + // Values is how many messages are in the bucket, including historical values + Values() uint64 + + // History returns the configured history kept per key + History() int64 + + // TTL is how long the bucket keeps values for + TTL() time.Duration + + // BackingStore is information about the backend hosting the data + BackingStore() BackingStore } // KeyWatcher is what is returned when doing a watch. type KeyWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan KeyValueEntry - // Stop() will stop this watcher. + // Stop will stop this watcher. Stop() error } @@ -657,3 +677,50 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { func (kv *kvs) Bucket() string { return kv.name } + +type kvBackingStore struct { + info map[string]string +} + +func (b *kvBackingStore) Kind() string { return "JetStream" } +func (b *kvBackingStore) Info() map[string]string { return b.info } + +type kvStatus struct { + nfo *StreamInfo + bucket string + bs *kvBackingStore +} + +// Bucket the name of the bucket +func (s *kvStatus) Bucket() string { return s.bucket } + +// Values is how many messages are in the bucket, including historical values +func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs } + +// History returns the configured history kept per key +func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject } + +// TTL is how long the bucket keeps values for +func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge } + +// BackingStore is information about the backend and storage used for the KV store +func (s *kvStatus) BackingStore() BackingStore { return s.bs } + +// Status retrieves the status and configuration of a bucket +func (kv *kvs) Status() (KeyValueStatus, error) { + nfo, err := kv.js.StreamInfo(kv.stream) + if err != nil { + return nil, err + } + + bs := &kvBackingStore{info: map[string]string{ + "stream": kv.stream, + "domain": kv.js.opts.domain, + }} + + if nfo.Cluster != nil { + bs.info["placement_cluster"] = nfo.Cluster.Name + } + + return &kvStatus{nfo: nfo, bucket: kv.name, bs: bs}, nil +} diff --git a/object.go b/object.go index 13dd7b280..143942553 100644 --- a/object.go +++ b/object.go @@ -89,6 +89,9 @@ type ObjectStore interface { // List will list all the objects in this store. List(opts ...WatchOpt) ([]*ObjectInfo, error) + + // Status retrieves run-time status about the backing store of the bucket. + Status() (*ObjectStoreStatus, error) } type ObjectOpt interface { @@ -109,7 +112,7 @@ func (ctx ContextOpt) configureObject(opts *objOpts) error { type ObjectWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan *ObjectInfo - // Stop() will stop this watcher. + // Stop will stop this watcher. Stop() error } @@ -132,6 +135,24 @@ type ObjectStoreConfig struct { Replicas int } +// BackingStore describes the implementation and storage backend of KV or Object stores +type BackingStore interface { + Kind() string + Info() map[string]string +} + +// ObjectStoreStatus is the status of the object store +type ObjectStoreStatus struct { + Bucket string + Description string + TTL time.Duration + Storage StorageType + Replicas int + Sealed bool + Size uint64 + BackingStore BackingStore +} + // ObjectMetaOptions type ObjectMetaOptions struct { Link *ObjectLink `json:"link,omitempty"` @@ -857,6 +878,45 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) { return objs, nil } +type objBackingStore struct { + info map[string]string +} + +func (b *objBackingStore) Kind() string { return "JetStream" } +func (b *objBackingStore) Info() map[string]string { return b.info } + +// Status retrieves run-time status about a bucket +func (obs *obs) Status() (*ObjectStoreStatus, error) { + nfo, err := obs.js.StreamInfo(obs.stream) + if err != nil { + return nil, err + } + + bs := &objBackingStore{ + info: map[string]string{ + "stream": obs.stream, + "domain": obs.js.opts.domain, + }, + } + + if nfo.Cluster != nil { + bs.info["placement_cluster"] = nfo.Cluster.Name + } + + status := &ObjectStoreStatus{ + Sealed: nfo.Config.Sealed, + Size: nfo.State.Bytes, + Bucket: obs.name, + Description: nfo.Config.Description, + TTL: nfo.Config.MaxAge, + Storage: nfo.Config.Storage, + Replicas: nfo.Config.Replicas, + BackingStore: bs, + } + + return status, nil +} + // Read impl. func (o *objResult) Read(p []byte) (n int, err error) { o.Lock() diff --git a/test/kv_test.go b/test/kv_test.go index f2f75bae5..70bc53353 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -33,7 +33,7 @@ func TestKeyValueBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour}) expectOk(t, err) if kv.Bucket() != "TEST" { @@ -78,6 +78,30 @@ func TestKeyValueBasics(t *testing.T) { expectOk(t, err) _, err = kv.Update("age", []byte("33"), r) expectOk(t, err) + + // Status + status, err := kv.Status() + expectOk(t, err) + if status.History() != 5 { + t.Fatalf("expected history of 5 got %d", status.History()) + } + if status.Bucket() != "TEST" { + t.Fatalf("expected bucket TEST got %v", status.Bucket()) + } + if status.TTL() != time.Hour { + t.Fatalf("expected 1 hour TTL got %v", status.TTL()) + } + if status.Values() != 7 { + t.Fatalf("expected 7 values got %d", status.Values()) + } + bs := status.BackingStore() + if bs.Kind() != "JetStream" { + t.Fatalf("invalid backing store kind %s", bs.Kind()) + } + info := bs.Info() + if info["stream"] != "KV_TEST" { + t.Fatalf("invalid stream name %+v", info) + } } func TestKeyValueHistory(t *testing.T) { diff --git a/test/object_test.go b/test/object_test.go index c1c70684e..7ca654f2a 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -34,7 +34,7 @@ func TestObjectBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) expectOk(t, err) // Create ~16MB object. @@ -67,6 +67,21 @@ func TestObjectBasics(t *testing.T) { t.Fatalf("Expected the object stream to be sealed, got %+v", si) } + status, err := obs.Status() + expectOk(t, err) + if !status.Sealed { + t.Fatalf("exected sealed status") + } + if status.Size == 0 { + t.Fatalf("size is 0") + } + if status.Storage != nats.FileStorage { + t.Fatalf("stauts reports %d storage", status.Storage) + } + if status.Description != "testing" { + t.Fatalf("invalid description: '%s'", status.Description) + } + // Check simple errors. _, err = obs.Get("FOO") expectErr(t, err) @@ -97,6 +112,33 @@ func TestObjectBasics(t *testing.T) { expectErr(t, err, nats.ErrStreamNotFound) } +func TestDefaultObjectStatus(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) + expectOk(t, err) + + blob := make([]byte, 1024*1024+22) + rand.Read(blob) + + _, err = obs.PutBytes("BLOB", blob) + expectOk(t, err) + + status, err := obs.Status() + expectOk(t, err) + if status.BackingStore.Kind() != "JetStream" { + t.Fatalf("invalid backing store kind: %s", status.BackingStore.Kind()) + } + info := status.BackingStore.Info() + if info["stream"] != "OBJ_OBJS" { + t.Fatalf("invalid stream name %+v", info) + } +} + func TestObjectFileBasics(t *testing.T) { s := RunBasicJetStreamServer() defer shutdown(s)