Skip to content

Commit

Permalink
add kv and object status functionality
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <rip@devco.net>
  • Loading branch information
ripienaar committed Oct 12, 2021
1 parent 1655009 commit 929fcd4
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 6 deletions.
10 changes: 9 additions & 1 deletion js.go
Expand Up @@ -214,6 +214,8 @@ type jsOpts struct {
aecb MsgErrHandler
// Maximum in flight.
maxap int
// the domain that produced the pre
domain string
}

const (
Expand Down Expand Up @@ -258,7 +260,13 @@ func Domain(domain string) JSOpt {
return APIPrefix(_EMPTY_)
}

return APIPrefix(fmt.Sprintf(jsDomainT, domain))
return jsOptFn(func(js *jsOpts) error {
js.domain = domain
js.pre = fmt.Sprintf(jsDomainT, domain)

return nil
})

}

// APIPrefix changes the default prefix used for the JetStream API.
Expand Down
2 changes: 1 addition & 1 deletion jsm.go
Expand Up @@ -1132,7 +1132,7 @@ func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc,
if o.ctx == nil && o.wait > 0 {
o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
}
if o.pre == "" {
if o.pre == _EMPTY_ {
o.pre = defs.pre
}

Expand Down
69 changes: 68 additions & 1 deletion kv.go
Expand Up @@ -66,13 +66,33 @@ type KeyValue interface {
Bucket() string
// PurgeDeletes will remove all current delete markers.
PurgeDeletes(opts ...WatchOpt) error
// Status retrieves the status and configuration of a bucket
Status() (KeyValueStatus, error)
}

// KeyValueStatus is run-time status about a Key-Value bucket
type KeyValueStatus interface {
// Bucket the name of the bucket
Bucket() string

// Values is how many messages are in the bucket, including historical values
Values() uint64

// History returns the configured history kept per key
History() int64

// TTL is how long the bucket keeps values for
TTL() time.Duration

// BackingStore is information about the backend hosting the data
BackingStore() BackingStore
}

// KeyWatcher is what is returned when doing a watch.
type KeyWatcher interface {
// Updates returns a channel to read any updates to entries.
Updates() <-chan KeyValueEntry
// Stop() will stop this watcher.
// Stop will stop this watcher.
Stop() error
}

Expand Down Expand Up @@ -657,3 +677,50 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
func (kv *kvs) Bucket() string {
return kv.name
}

type kvBackingStore struct {
info map[string]string
}

func (b *kvBackingStore) Kind() string { return "JetStream" }
func (b *kvBackingStore) Info() map[string]string { return b.info }

type kvStatus struct {
nfo *StreamInfo
bucket string
bs *kvBackingStore
}

// Bucket the name of the bucket
func (s *kvStatus) Bucket() string { return s.bucket }

// Values is how many messages are in the bucket, including historical values
func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs }

// History returns the configured history kept per key
func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }

// TTL is how long the bucket keeps values for
func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }

// BackingStore is information about the backend and storage used for the KV store
func (s *kvStatus) BackingStore() BackingStore { return s.bs }

// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
nfo, err := kv.js.StreamInfo(kv.stream)
if err != nil {
return nil, err
}

bs := &kvBackingStore{info: map[string]string{
"stream": kv.stream,
"domain": kv.js.opts.domain,
}}

if nfo.Cluster != nil {
bs.info["placement_cluster"] = nfo.Cluster.Name
}

return &kvStatus{nfo: nfo, bucket: kv.name, bs: bs}, nil
}
62 changes: 61 additions & 1 deletion object.go
Expand Up @@ -89,6 +89,9 @@ type ObjectStore interface {

// List will list all the objects in this store.
List(opts ...WatchOpt) ([]*ObjectInfo, error)

// Status retrieves run-time status about the backing store of the bucket.
Status() (*ObjectStoreStatus, error)
}

type ObjectOpt interface {
Expand All @@ -109,7 +112,7 @@ func (ctx ContextOpt) configureObject(opts *objOpts) error {
type ObjectWatcher interface {
// Updates returns a channel to read any updates to entries.
Updates() <-chan *ObjectInfo
// Stop() will stop this watcher.
// Stop will stop this watcher.
Stop() error
}

Expand All @@ -132,6 +135,24 @@ type ObjectStoreConfig struct {
Replicas int
}

// BackingStore describes the implementation and storage backend of KV or Object stores
type BackingStore interface {
Kind() string
Info() map[string]string
}

// ObjectStoreStatus is the status of the object store
type ObjectStoreStatus struct {
Bucket string
Description string
TTL time.Duration
Storage StorageType
Replicas int
Sealed bool
Size uint64
BackingStore BackingStore
}

// ObjectMetaOptions
type ObjectMetaOptions struct {
Link *ObjectLink `json:"link,omitempty"`
Expand Down Expand Up @@ -857,6 +878,45 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) {
return objs, nil
}

type objBackingStore struct {
info map[string]string
}

func (b *objBackingStore) Kind() string { return "JetStream" }
func (b *objBackingStore) Info() map[string]string { return b.info }

// Status retrieves run-time status about a bucket
func (obs *obs) Status() (*ObjectStoreStatus, error) {
nfo, err := obs.js.StreamInfo(obs.stream)
if err != nil {
return nil, err
}

bs := &objBackingStore{
info: map[string]string{
"stream": obs.stream,
"domain": obs.js.opts.domain,
},
}

if nfo.Cluster != nil {
bs.info["placement_cluster"] = nfo.Cluster.Name
}

status := &ObjectStoreStatus{
Sealed: nfo.Config.Sealed,
Size: nfo.State.Bytes,
Bucket: obs.name,
Description: nfo.Config.Description,
TTL: nfo.Config.MaxAge,
Storage: nfo.Config.Storage,
Replicas: nfo.Config.Replicas,
BackingStore: bs,
}

return status, nil
}

// Read impl.
func (o *objResult) Read(p []byte) (n int, err error) {
o.Lock()
Expand Down
32 changes: 31 additions & 1 deletion test/kv_test.go
Expand Up @@ -33,7 +33,7 @@ func TestKeyValueBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour})
expectOk(t, err)

if kv.Bucket() != "TEST" {
Expand Down Expand Up @@ -78,6 +78,36 @@ func TestKeyValueBasics(t *testing.T) {
expectOk(t, err)
_, err = kv.Update("age", []byte("33"), r)
expectOk(t, err)

// Status
status, err := kv.Status()
expectOk(t, err)
if status.Replicas() != 1 {
t.Fatalf("expected 1 replica got %d", status.Replicas())
}
if status.History() != 5 {
t.Fatalf("expected history of 5 got %d", status.History())
}
if status.Bucket() != "TEST" {
t.Fatalf("expected bucket TEST got %v", status.Bucket())
}
if status.TTL() != time.Hour {
t.Fatalf("expected 1 hour TTL got %v", status.TTL())
}
if status.Values() != 7 {
t.Fatalf("expected 7 values got %d", status.Values())
}
if status.StreamName() != "KV_TEST" {
t.Fatalf("expected KV_TEST stream got %v", status.StreamName())
}
bs := status.BackingStore()
if bs.Kind() != "JetStream" {
t.Fatalf("invalid backing store kind %s", bs.Kind())
}
info := bs.Info()
if info["stream"] != "KV_TEST" {
t.Fatalf("invalid stream name %+v", info)
}
}

func TestKeyValueHistory(t *testing.T) {
Expand Down
44 changes: 43 additions & 1 deletion test/object_test.go
Expand Up @@ -34,7 +34,7 @@ func TestObjectBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"})
obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"})
expectOk(t, err)

// Create ~16MB object.
Expand Down Expand Up @@ -67,6 +67,21 @@ func TestObjectBasics(t *testing.T) {
t.Fatalf("Expected the object stream to be sealed, got %+v", si)
}

status, err := obs.Status()
expectOk(t, err)
if !status.Sealed {
t.Fatalf("exected sealed status")
}
if status.Size == 0 {
t.Fatalf("size is 0")
}
if status.Storage != nats.FileStorage {
t.Fatalf("stauts reports %d storage", status.Storage)
}
if status.Description != "testing" {
t.Fatalf("invalid description: '%s'", status.Description)
}

// Check simple errors.
_, err = obs.Get("FOO")
expectErr(t, err)
Expand Down Expand Up @@ -97,6 +112,33 @@ func TestObjectBasics(t *testing.T) {
expectErr(t, err, nats.ErrStreamNotFound)
}

func TestDefaultObjectStatus(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)

nc, js := jsClient(t, s)
defer nc.Close()

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"})
expectOk(t, err)

blob := make([]byte, 1024*1024+22)
rand.Read(blob)

_, err = obs.PutBytes("BLOB", blob)
expectOk(t, err)

status, err := obs.Status()
expectOk(t, err)
if status.BackingStore.Kind() != "JetStream" {
t.Fatalf("invalid backing store kind: %s", status.BackingStore.Kind())
}
info := status.BackingStore.Info()
if info["stream"] != "OBJ_OBJS" {
t.Fatalf("invalid stream name %+v", info)
}
}

func TestObjectFileBasics(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)
Expand Down

0 comments on commit 929fcd4

Please sign in to comment.