Skip to content

Commit

Permalink
Merge pull request #845 from ripienaar/obj_status
Browse files Browse the repository at this point in the history
add kv and object status functionality
  • Loading branch information
ripienaar committed Oct 12, 2021
2 parents 1655009 + 4b5b7cf commit 15a7702
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 6 deletions.
10 changes: 9 additions & 1 deletion js.go
Expand Up @@ -214,6 +214,8 @@ type jsOpts struct {
aecb MsgErrHandler
// Maximum in flight.
maxap int
// the domain that produced the pre
domain string
}

const (
Expand Down Expand Up @@ -258,7 +260,13 @@ func Domain(domain string) JSOpt {
return APIPrefix(_EMPTY_)
}

return APIPrefix(fmt.Sprintf(jsDomainT, domain))
return jsOptFn(func(js *jsOpts) error {
js.domain = domain
js.pre = fmt.Sprintf(jsDomainT, domain)

return nil
})

}

// APIPrefix changes the default prefix used for the JetStream API.
Expand Down
2 changes: 1 addition & 1 deletion jsm.go
Expand Up @@ -1132,7 +1132,7 @@ func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc,
if o.ctx == nil && o.wait > 0 {
o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
}
if o.pre == "" {
if o.pre == _EMPTY_ {
o.pre = defs.pre
}

Expand Down
69 changes: 68 additions & 1 deletion kv.go
Expand Up @@ -66,13 +66,33 @@ type KeyValue interface {
Bucket() string
// PurgeDeletes will remove all current delete markers.
PurgeDeletes(opts ...WatchOpt) error
// Status retrieves the status and configuration of a bucket
Status() (KeyValueStatus, error)
}

// KeyValueStatus is run-time status about a Key-Value bucket
type KeyValueStatus interface {
// Bucket the name of the bucket
Bucket() string

// Values is how many messages are in the bucket, including historical values
Values() uint64

// History returns the configured history kept per key
History() int64

// TTL is how long the bucket keeps values for
TTL() time.Duration

// BackingStore is information about the backend hosting the data
BackingStore() BackingStore
}

// KeyWatcher is what is returned when doing a watch.
type KeyWatcher interface {
// Updates returns a channel to read any updates to entries.
Updates() <-chan KeyValueEntry
// Stop() will stop this watcher.
// Stop will stop this watcher.
Stop() error
}

Expand Down Expand Up @@ -657,3 +677,50 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
func (kv *kvs) Bucket() string {
return kv.name
}

type kvBackingStore struct {
info map[string]string
}

func (b *kvBackingStore) Kind() string { return "JetStream" }
func (b *kvBackingStore) Info() map[string]string { return b.info }

type kvStatus struct {
nfo *StreamInfo
bucket string
bs *kvBackingStore
}

// Bucket the name of the bucket
func (s *kvStatus) Bucket() string { return s.bucket }

// Values is how many messages are in the bucket, including historical values
func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs }

// History returns the configured history kept per key
func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }

// TTL is how long the bucket keeps values for
func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }

// BackingStore is information about the backend and storage used for the KV store
func (s *kvStatus) BackingStore() BackingStore { return s.bs }

// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
nfo, err := kv.js.StreamInfo(kv.stream)
if err != nil {
return nil, err
}

bs := &kvBackingStore{info: map[string]string{
"stream": kv.stream,
"domain": kv.js.opts.domain,
}}

if nfo.Cluster != nil {
bs.info["placement_cluster"] = nfo.Cluster.Name
}

return &kvStatus{nfo: nfo, bucket: kv.name, bs: bs}, nil
}
84 changes: 83 additions & 1 deletion object.go
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/nats-io/nuid"
)

// ObjectStoreManager creates, loads and deletes Object Stores
//
// Notice: Experimental Preview
//
// This functionality is EXPERIMENTAL and may be changed in later releases.
Expand All @@ -43,6 +45,9 @@ type ObjectStoreManager interface {
DeleteObjectStore(bucket string) error
}

// ObjectStore is a blob store capable of storing large objects efficiently in
// JetStream streams
//
// Notice: Experimental Preview
//
// This functionality is EXPERIMENTAL and may be changed in later releases.
Expand Down Expand Up @@ -89,6 +94,9 @@ type ObjectStore interface {

// List will list all the objects in this store.
List(opts ...WatchOpt) ([]*ObjectInfo, error)

// Status retrieves run-time status about the backing store of the bucket.
Status() (ObjectStoreStatus, error)
}

type ObjectOpt interface {
Expand All @@ -109,7 +117,7 @@ func (ctx ContextOpt) configureObject(opts *objOpts) error {
type ObjectWatcher interface {
// Updates returns a channel to read any updates to entries.
Updates() <-chan *ObjectInfo
// Stop() will stop this watcher.
// Stop will stop this watcher.
Stop() error
}

Expand All @@ -132,6 +140,31 @@ type ObjectStoreConfig struct {
Replicas int
}

// BackingStore describes the implementation and storage backend of KV or Object stores
type BackingStore interface {
Kind() string
Info() map[string]string
}

type ObjectStoreStatus interface {
// Bucket is the name of the bucket
Bucket() string
// Description is the description supplied when creating the bucket
Description() string
// TTL indicates how long objects are kept in the bucket
TTL() time.Duration
// Storage indicates the underlying JetStream storage technology used to store data
Storage() StorageType
// Replicas indicates how many storage replicas are kept for the data in the bucket
Replicas() int
// Sealed indicates the stream is sealed and cannot be modified in any way
Sealed() bool
// Size is the combined size of all data in the bucket including metadata, in bytes
Size() uint64
// BackingStore provides details about the underlying storage
BackingStore() BackingStore
}

// ObjectMetaOptions
type ObjectMetaOptions struct {
Link *ObjectLink `json:"link,omitempty"`
Expand Down Expand Up @@ -857,6 +890,55 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) {
return objs, nil
}

type objBackingStore struct {
info map[string]string
}

func (b *objBackingStore) Kind() string { return "JetStream" }
func (b *objBackingStore) Info() map[string]string { return b.info }

type objStatus struct {
nfo *StreamInfo
bucket string
bs BackingStore
}

func (s *objStatus) Bucket() string { return s.bucket }
func (s *objStatus) Description() string { return s.nfo.Config.Description }
func (s *objStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }
func (s *objStatus) Storage() StorageType { return s.nfo.Config.Storage }
func (s *objStatus) Replicas() int { return s.nfo.Config.Replicas }
func (s *objStatus) Sealed() bool { return s.nfo.Config.Sealed }
func (s *objStatus) Size() uint64 { return s.nfo.State.Bytes }
func (s *objStatus) BackingStore() BackingStore { return s.bs }

// Status retrieves run-time status about a bucket
func (obs *obs) Status() (ObjectStoreStatus, error) {
nfo, err := obs.js.StreamInfo(obs.stream)
if err != nil {
return nil, err
}

bs := &objBackingStore{
info: map[string]string{
"stream": obs.stream,
"domain": obs.js.opts.domain,
},
}

if nfo.Cluster != nil {
bs.info["placement_cluster"] = nfo.Cluster.Name
}

status := &objStatus{
nfo: nfo,
bucket: obs.name,
bs: bs,
}

return status, nil
}

// Read impl.
func (o *objResult) Read(p []byte) (n int, err error) {
o.Lock()
Expand Down
26 changes: 25 additions & 1 deletion test/kv_test.go
Expand Up @@ -33,7 +33,7 @@ func TestKeyValueBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour})
expectOk(t, err)

if kv.Bucket() != "TEST" {
Expand Down Expand Up @@ -78,6 +78,30 @@ func TestKeyValueBasics(t *testing.T) {
expectOk(t, err)
_, err = kv.Update("age", []byte("33"), r)
expectOk(t, err)

// Status
status, err := kv.Status()
expectOk(t, err)
if status.History() != 5 {
t.Fatalf("expected history of 5 got %d", status.History())
}
if status.Bucket() != "TEST" {
t.Fatalf("expected bucket TEST got %v", status.Bucket())
}
if status.TTL() != time.Hour {
t.Fatalf("expected 1 hour TTL got %v", status.TTL())
}
if status.Values() != 7 {
t.Fatalf("expected 7 values got %d", status.Values())
}
bs := status.BackingStore()
if bs.Kind() != "JetStream" {
t.Fatalf("invalid backing store kind %s", bs.Kind())
}
info := bs.Info()
if info["stream"] != "KV_TEST" {
t.Fatalf("invalid stream name %+v", info)
}
}

func TestKeyValueHistory(t *testing.T) {
Expand Down
44 changes: 43 additions & 1 deletion test/object_test.go
Expand Up @@ -34,7 +34,7 @@ func TestObjectBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"})
obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"})
expectOk(t, err)

// Create ~16MB object.
Expand Down Expand Up @@ -67,6 +67,21 @@ func TestObjectBasics(t *testing.T) {
t.Fatalf("Expected the object stream to be sealed, got %+v", si)
}

status, err := obs.Status()
expectOk(t, err)
if !status.Sealed() {
t.Fatalf("exected sealed status")
}
if status.Size() == 0 {
t.Fatalf("size is 0")
}
if status.Storage() != nats.FileStorage {
t.Fatalf("stauts reports %d storage", status.Storage())
}
if status.Description() != "testing" {
t.Fatalf("invalid description: '%s'", status.Description())
}

// Check simple errors.
_, err = obs.Get("FOO")
expectErr(t, err)
Expand Down Expand Up @@ -97,6 +112,33 @@ func TestObjectBasics(t *testing.T) {
expectErr(t, err, nats.ErrStreamNotFound)
}

func TestDefaultObjectStatus(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)

nc, js := jsClient(t, s)
defer nc.Close()

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"})
expectOk(t, err)

blob := make([]byte, 1024*1024+22)
rand.Read(blob)

_, err = obs.PutBytes("BLOB", blob)
expectOk(t, err)

status, err := obs.Status()
expectOk(t, err)
if status.BackingStore().Kind() != "JetStream" {
t.Fatalf("invalid backing store kind: %s", status.BackingStore().Kind())
}
info := status.BackingStore().Info()
if info["stream"] != "OBJ_OBJS" {
t.Fatalf("invalid stream name %+v", info)
}
}

func TestObjectFileBasics(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)
Expand Down

0 comments on commit 15a7702

Please sign in to comment.