diff --git a/js.go b/js.go index 8e50f83ee..9b0c09c34 100644 --- a/js.go +++ b/js.go @@ -214,6 +214,8 @@ type jsOpts struct { aecb MsgErrHandler // Maximum in flight. maxap int + // the domain that produced the pre + domain string } const ( @@ -258,7 +260,13 @@ func Domain(domain string) JSOpt { return APIPrefix(_EMPTY_) } - return APIPrefix(fmt.Sprintf(jsDomainT, domain)) + return jsOptFn(func(js *jsOpts) error { + js.domain = domain + js.pre = fmt.Sprintf(jsDomainT, domain) + + return nil + }) + } // APIPrefix changes the default prefix used for the JetStream API. diff --git a/jsm.go b/jsm.go index 95f38b593..382879aae 100644 --- a/jsm.go +++ b/jsm.go @@ -1132,7 +1132,7 @@ func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, if o.ctx == nil && o.wait > 0 { o.ctx, cancel = context.WithTimeout(context.Background(), o.wait) } - if o.pre == "" { + if o.pre == _EMPTY_ { o.pre = defs.pre } diff --git a/kv.go b/kv.go index 30ea14208..4cf420f25 100644 --- a/kv.go +++ b/kv.go @@ -66,13 +66,33 @@ type KeyValue interface { Bucket() string // PurgeDeletes will remove all current delete markers. PurgeDeletes(opts ...WatchOpt) error + // Status retrieves the status and configuration of a bucket + Status() (KeyValueStatus, error) +} + +// KeyValueStatus is run-time status about a Key-Value bucket +type KeyValueStatus interface { + // Bucket the name of the bucket + Bucket() string + + // Values is how many messages are in the bucket, including historical values + Values() uint64 + + // History returns the configured history kept per key + History() int64 + + // TTL is how long the bucket keeps values for + TTL() time.Duration + + // BackingStore is information about the backend hosting the data + BackingStore() BackingStore } // KeyWatcher is what is returned when doing a watch. type KeyWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan KeyValueEntry - // Stop() will stop this watcher. + // Stop will stop this watcher. Stop() error } @@ -657,3 +677,50 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { func (kv *kvs) Bucket() string { return kv.name } + +type kvBackingStore struct { + info map[string]string +} + +func (b *kvBackingStore) Kind() string { return "JetStream" } +func (b *kvBackingStore) Info() map[string]string { return b.info } + +type kvStatus struct { + nfo *StreamInfo + bucket string + bs *kvBackingStore +} + +// Bucket the name of the bucket +func (s *kvStatus) Bucket() string { return s.bucket } + +// Values is how many messages are in the bucket, including historical values +func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs } + +// History returns the configured history kept per key +func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject } + +// TTL is how long the bucket keeps values for +func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge } + +// BackingStore is information about the backend and storage used for the KV store +func (s *kvStatus) BackingStore() BackingStore { return s.bs } + +// Status retrieves the status and configuration of a bucket +func (kv *kvs) Status() (KeyValueStatus, error) { + nfo, err := kv.js.StreamInfo(kv.stream) + if err != nil { + return nil, err + } + + bs := &kvBackingStore{info: map[string]string{ + "stream": kv.stream, + "domain": kv.js.opts.domain, + }} + + if nfo.Cluster != nil { + bs.info["placement_cluster"] = nfo.Cluster.Name + } + + return &kvStatus{nfo: nfo, bucket: kv.name, bs: bs}, nil +} diff --git a/object.go b/object.go index 87a6e6c75..143942553 100644 --- a/object.go +++ b/object.go @@ -112,7 +112,7 @@ func (ctx ContextOpt) configureObject(opts *objOpts) error { type ObjectWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan *ObjectInfo - // Stop() will stop this watcher. + // Stop will stop this watcher. Stop() error } @@ -135,11 +135,22 @@ type ObjectStoreConfig struct { Replicas int } +// BackingStore describes the implementation and storage backend of KV or Object stores +type BackingStore interface { + Kind() string + Info() map[string]string +} + // ObjectStoreStatus is the status of the object store type ObjectStoreStatus struct { - ObjectStoreConfig - Sealed bool - Size uint64 + Bucket string + Description string + TTL time.Duration + Storage StorageType + Replicas int + Sealed bool + Size uint64 + BackingStore BackingStore } // ObjectMetaOptions @@ -867,6 +878,13 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) { return objs, nil } +type objBackingStore struct { + info map[string]string +} + +func (b *objBackingStore) Kind() string { return "JetStream" } +func (b *objBackingStore) Info() map[string]string { return b.info } + // Status retrieves run-time status about a bucket func (obs *obs) Status() (*ObjectStoreStatus, error) { nfo, err := obs.js.StreamInfo(obs.stream) @@ -874,18 +892,28 @@ func (obs *obs) Status() (*ObjectStoreStatus, error) { return nil, err } - status := &ObjectStoreStatus{ - Sealed: nfo.Config.Sealed, - Size: nfo.State.Bytes, - ObjectStoreConfig: ObjectStoreConfig{ - Bucket: obs.name, - Description: nfo.Config.Description, - TTL: nfo.Config.MaxAge, - Storage: nfo.Config.Storage, - Replicas: nfo.Config.Replicas, + bs := &objBackingStore{ + info: map[string]string{ + "stream": obs.stream, + "domain": obs.js.opts.domain, }, } + if nfo.Cluster != nil { + bs.info["placement_cluster"] = nfo.Cluster.Name + } + + status := &ObjectStoreStatus{ + Sealed: nfo.Config.Sealed, + Size: nfo.State.Bytes, + Bucket: obs.name, + Description: nfo.Config.Description, + TTL: nfo.Config.MaxAge, + Storage: nfo.Config.Storage, + Replicas: nfo.Config.Replicas, + BackingStore: bs, + } + return status, nil } diff --git a/test/kv_test.go b/test/kv_test.go index f2f75bae5..963716996 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -33,7 +33,7 @@ func TestKeyValueBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour}) expectOk(t, err) if kv.Bucket() != "TEST" { @@ -78,6 +78,36 @@ func TestKeyValueBasics(t *testing.T) { expectOk(t, err) _, err = kv.Update("age", []byte("33"), r) expectOk(t, err) + + // Status + status, err := kv.Status() + expectOk(t, err) + if status.Replicas() != 1 { + t.Fatalf("expected 1 replica got %d", status.Replicas()) + } + if status.History() != 5 { + t.Fatalf("expected history of 5 got %d", status.History()) + } + if status.Bucket() != "TEST" { + t.Fatalf("expected bucket TEST got %v", status.Bucket()) + } + if status.TTL() != time.Hour { + t.Fatalf("expected 1 hour TTL got %v", status.TTL()) + } + if status.Values() != 7 { + t.Fatalf("expected 7 values got %d", status.Values()) + } + if status.StreamName() != "KV_TEST" { + t.Fatalf("expected KV_TEST stream got %v", status.StreamName()) + } + bs := status.BackingStore() + if bs.Kind() != "JetStream" { + t.Fatalf("invalid backing store kind %s", bs.Kind()) + } + info := bs.Info() + if info["stream"] != "KV_TEST" { + t.Fatalf("invalid stream name %+v", info) + } } func TestKeyValueHistory(t *testing.T) { diff --git a/test/object_test.go b/test/object_test.go index 7a3984c0d..7ca654f2a 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -112,6 +112,33 @@ func TestObjectBasics(t *testing.T) { expectErr(t, err, nats.ErrStreamNotFound) } +func TestDefaultObjectStatus(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) + expectOk(t, err) + + blob := make([]byte, 1024*1024+22) + rand.Read(blob) + + _, err = obs.PutBytes("BLOB", blob) + expectOk(t, err) + + status, err := obs.Status() + expectOk(t, err) + if status.BackingStore.Kind() != "JetStream" { + t.Fatalf("invalid backing store kind: %s", status.BackingStore.Kind()) + } + info := status.BackingStore.Info() + if info["stream"] != "OBJ_OBJS" { + t.Fatalf("invalid stream name %+v", info) + } +} + func TestObjectFileBasics(t *testing.T) { s := RunBasicJetStreamServer() defer shutdown(s)