diff --git a/js.go b/js.go index 8e50f83ee..9b0c09c34 100644 --- a/js.go +++ b/js.go @@ -214,6 +214,8 @@ type jsOpts struct { aecb MsgErrHandler // Maximum in flight. maxap int + // the domain that produced the pre + domain string } const ( @@ -258,7 +260,13 @@ func Domain(domain string) JSOpt { return APIPrefix(_EMPTY_) } - return APIPrefix(fmt.Sprintf(jsDomainT, domain)) + return jsOptFn(func(js *jsOpts) error { + js.domain = domain + js.pre = fmt.Sprintf(jsDomainT, domain) + + return nil + }) + } // APIPrefix changes the default prefix used for the JetStream API. diff --git a/jsm.go b/jsm.go index 95f38b593..382879aae 100644 --- a/jsm.go +++ b/jsm.go @@ -1132,7 +1132,7 @@ func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, if o.ctx == nil && o.wait > 0 { o.ctx, cancel = context.WithTimeout(context.Background(), o.wait) } - if o.pre == "" { + if o.pre == _EMPTY_ { o.pre = defs.pre } diff --git a/kv.go b/kv.go index 30ea14208..4cf420f25 100644 --- a/kv.go +++ b/kv.go @@ -66,13 +66,33 @@ type KeyValue interface { Bucket() string // PurgeDeletes will remove all current delete markers. PurgeDeletes(opts ...WatchOpt) error + // Status retrieves the status and configuration of a bucket + Status() (KeyValueStatus, error) +} + +// KeyValueStatus is run-time status about a Key-Value bucket +type KeyValueStatus interface { + // Bucket the name of the bucket + Bucket() string + + // Values is how many messages are in the bucket, including historical values + Values() uint64 + + // History returns the configured history kept per key + History() int64 + + // TTL is how long the bucket keeps values for + TTL() time.Duration + + // BackingStore is information about the backend hosting the data + BackingStore() BackingStore } // KeyWatcher is what is returned when doing a watch. type KeyWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan KeyValueEntry - // Stop() will stop this watcher. + // Stop will stop this watcher. Stop() error } @@ -657,3 +677,50 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { func (kv *kvs) Bucket() string { return kv.name } + +type kvBackingStore struct { + info map[string]string +} + +func (b *kvBackingStore) Kind() string { return "JetStream" } +func (b *kvBackingStore) Info() map[string]string { return b.info } + +type kvStatus struct { + nfo *StreamInfo + bucket string + bs *kvBackingStore +} + +// Bucket the name of the bucket +func (s *kvStatus) Bucket() string { return s.bucket } + +// Values is how many messages are in the bucket, including historical values +func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs } + +// History returns the configured history kept per key +func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject } + +// TTL is how long the bucket keeps values for +func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge } + +// BackingStore is information about the backend and storage used for the KV store +func (s *kvStatus) BackingStore() BackingStore { return s.bs } + +// Status retrieves the status and configuration of a bucket +func (kv *kvs) Status() (KeyValueStatus, error) { + nfo, err := kv.js.StreamInfo(kv.stream) + if err != nil { + return nil, err + } + + bs := &kvBackingStore{info: map[string]string{ + "stream": kv.stream, + "domain": kv.js.opts.domain, + }} + + if nfo.Cluster != nil { + bs.info["placement_cluster"] = nfo.Cluster.Name + } + + return &kvStatus{nfo: nfo, bucket: kv.name, bs: bs}, nil +} diff --git a/object.go b/object.go index 13dd7b280..a9263fe8f 100644 --- a/object.go +++ b/object.go @@ -31,6 +31,8 @@ import ( "github.com/nats-io/nuid" ) +// ObjectStoreManager creates, loads and deletes Object Stores +// // Notice: Experimental Preview // // This functionality is EXPERIMENTAL and may be changed in later releases. @@ -43,6 +45,9 @@ type ObjectStoreManager interface { DeleteObjectStore(bucket string) error } +// ObjectStore is a blob store capable of storing large objects efficiently in +// JetStream streams +// // Notice: Experimental Preview // // This functionality is EXPERIMENTAL and may be changed in later releases. @@ -89,6 +94,9 @@ type ObjectStore interface { // List will list all the objects in this store. List(opts ...WatchOpt) ([]*ObjectInfo, error) + + // Status retrieves run-time status about the backing store of the bucket. + Status() (ObjectStoreStatus, error) } type ObjectOpt interface { @@ -109,7 +117,7 @@ func (ctx ContextOpt) configureObject(opts *objOpts) error { type ObjectWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan *ObjectInfo - // Stop() will stop this watcher. + // Stop will stop this watcher. Stop() error } @@ -132,6 +140,31 @@ type ObjectStoreConfig struct { Replicas int } +// BackingStore describes the implementation and storage backend of KV or Object stores +type BackingStore interface { + Kind() string + Info() map[string]string +} + +type ObjectStoreStatus interface { + // Bucket is the name of the bucket + Bucket() string + // Description is the description supplied when creating the bucket + Description() string + // TTL indicates how long objects are kept in the bucket + TTL() time.Duration + // Storage indicates the underlying JetStream storage technology used to store data + Storage() StorageType + // Replicas indicates how many storage replicas are kept for the data in the bucket + Replicas() int + // Sealed indicates the stream is sealed and cannot be modified in any way + Sealed() bool + // Size is the combined size of all data in the bucket including metadata, in bytes + Size() uint64 + // BackingStore provides details about the underlying storage + BackingStore() BackingStore +} + // ObjectMetaOptions type ObjectMetaOptions struct { Link *ObjectLink `json:"link,omitempty"` @@ -857,6 +890,55 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) { return objs, nil } +type objBackingStore struct { + info map[string]string +} + +func (b *objBackingStore) Kind() string { return "JetStream" } +func (b *objBackingStore) Info() map[string]string { return b.info } + +type objStatus struct { + nfo *StreamInfo + bucket string + bs BackingStore +} + +func (s *objStatus) Bucket() string { return s.bucket } +func (s *objStatus) Description() string { return s.nfo.Config.Description } +func (s *objStatus) TTL() time.Duration { return s.nfo.Config.MaxAge } +func (s *objStatus) Storage() StorageType { return s.nfo.Config.Storage } +func (s *objStatus) Replicas() int { return s.nfo.Config.Replicas } +func (s *objStatus) Sealed() bool { return s.nfo.Config.Sealed } +func (s *objStatus) Size() uint64 { return s.nfo.State.Bytes } +func (s *objStatus) BackingStore() BackingStore { return s.bs } + +// Status retrieves run-time status about a bucket +func (obs *obs) Status() (ObjectStoreStatus, error) { + nfo, err := obs.js.StreamInfo(obs.stream) + if err != nil { + return nil, err + } + + bs := &objBackingStore{ + info: map[string]string{ + "stream": obs.stream, + "domain": obs.js.opts.domain, + }, + } + + if nfo.Cluster != nil { + bs.info["placement_cluster"] = nfo.Cluster.Name + } + + status := &objStatus{ + nfo: nfo, + bucket: obs.name, + bs: bs, + } + + return status, nil +} + // Read impl. func (o *objResult) Read(p []byte) (n int, err error) { o.Lock() diff --git a/test/kv_test.go b/test/kv_test.go index f2f75bae5..70bc53353 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -33,7 +33,7 @@ func TestKeyValueBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour}) expectOk(t, err) if kv.Bucket() != "TEST" { @@ -78,6 +78,30 @@ func TestKeyValueBasics(t *testing.T) { expectOk(t, err) _, err = kv.Update("age", []byte("33"), r) expectOk(t, err) + + // Status + status, err := kv.Status() + expectOk(t, err) + if status.History() != 5 { + t.Fatalf("expected history of 5 got %d", status.History()) + } + if status.Bucket() != "TEST" { + t.Fatalf("expected bucket TEST got %v", status.Bucket()) + } + if status.TTL() != time.Hour { + t.Fatalf("expected 1 hour TTL got %v", status.TTL()) + } + if status.Values() != 7 { + t.Fatalf("expected 7 values got %d", status.Values()) + } + bs := status.BackingStore() + if bs.Kind() != "JetStream" { + t.Fatalf("invalid backing store kind %s", bs.Kind()) + } + info := bs.Info() + if info["stream"] != "KV_TEST" { + t.Fatalf("invalid stream name %+v", info) + } } func TestKeyValueHistory(t *testing.T) { diff --git a/test/object_test.go b/test/object_test.go index c1c70684e..31f004baa 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -34,7 +34,7 @@ func TestObjectBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) expectOk(t, err) // Create ~16MB object. @@ -67,6 +67,21 @@ func TestObjectBasics(t *testing.T) { t.Fatalf("Expected the object stream to be sealed, got %+v", si) } + status, err := obs.Status() + expectOk(t, err) + if !status.Sealed() { + t.Fatalf("exected sealed status") + } + if status.Size() == 0 { + t.Fatalf("size is 0") + } + if status.Storage() != nats.FileStorage { + t.Fatalf("stauts reports %d storage", status.Storage()) + } + if status.Description() != "testing" { + t.Fatalf("invalid description: '%s'", status.Description()) + } + // Check simple errors. _, err = obs.Get("FOO") expectErr(t, err) @@ -97,6 +112,33 @@ func TestObjectBasics(t *testing.T) { expectErr(t, err, nats.ErrStreamNotFound) } +func TestDefaultObjectStatus(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) + expectOk(t, err) + + blob := make([]byte, 1024*1024+22) + rand.Read(blob) + + _, err = obs.PutBytes("BLOB", blob) + expectOk(t, err) + + status, err := obs.Status() + expectOk(t, err) + if status.BackingStore.Kind() != "JetStream" { + t.Fatalf("invalid backing store kind: %s", status.BackingStore.Kind()) + } + info := status.BackingStore.Info() + if info["stream"] != "OBJ_OBJS" { + t.Fatalf("invalid stream name %+v", info) + } +} + func TestObjectFileBasics(t *testing.T) { s := RunBasicJetStreamServer() defer shutdown(s)