Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kv and object status functionality #845

Merged
merged 1 commit into from Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion js.go
Expand Up @@ -214,6 +214,8 @@ type jsOpts struct {
aecb MsgErrHandler
// Maximum in flight.
maxap int
// the domain that produced the pre
domain string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoids trying to parse pre to figure out domain which is not great at all

}

const (
Expand Down Expand Up @@ -258,7 +260,13 @@ func Domain(domain string) JSOpt {
return APIPrefix(_EMPTY_)
}

return APIPrefix(fmt.Sprintf(jsDomainT, domain))
return jsOptFn(func(js *jsOpts) error {
js.domain = domain
js.pre = fmt.Sprintf(jsDomainT, domain)

return nil
})

}

// APIPrefix changes the default prefix used for the JetStream API.
Expand Down
2 changes: 1 addition & 1 deletion jsm.go
Expand Up @@ -1132,7 +1132,7 @@ func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc,
if o.ctx == nil && o.wait > 0 {
o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
}
if o.pre == "" {
if o.pre == _EMPTY_ {
o.pre = defs.pre
}

Expand Down
69 changes: 68 additions & 1 deletion kv.go
Expand Up @@ -66,13 +66,33 @@ type KeyValue interface {
Bucket() string
// PurgeDeletes will remove all current delete markers.
PurgeDeletes(opts ...WatchOpt) error
// Status retrieves the status and configuration of a bucket
Status() (KeyValueStatus, error)
}

// KeyValueStatus is run-time status about a Key-Value bucket
type KeyValueStatus interface {
// Bucket the name of the bucket
Bucket() string

// Values is how many messages are in the bucket, including historical values
Values() uint64

// History returns the configured history kept per key
History() int64

// TTL is how long the bucket keeps values for
TTL() time.Duration

// BackingStore is information about the backend hosting the data
BackingStore() BackingStore
}

// KeyWatcher is what is returned when doing a watch.
type KeyWatcher interface {
// Updates returns a channel to read any updates to entries.
Updates() <-chan KeyValueEntry
// Stop() will stop this watcher.
// Stop will stop this watcher.
Stop() error
}

Expand Down Expand Up @@ -657,3 +677,50 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
func (kv *kvs) Bucket() string {
return kv.name
}

type kvBackingStore struct {
info map[string]string
}

func (b *kvBackingStore) Kind() string { return "JetStream" }
func (b *kvBackingStore) Info() map[string]string { return b.info }

type kvStatus struct {
nfo *StreamInfo
bucket string
bs *kvBackingStore
}

// Bucket the name of the bucket
func (s *kvStatus) Bucket() string { return s.bucket }

// Values is how many messages are in the bucket, including historical values
func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs }

// History returns the configured history kept per key
func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }

// TTL is how long the bucket keeps values for
func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }

// BackingStore is information about the backend and storage used for the KV store
func (s *kvStatus) BackingStore() BackingStore { return s.bs }

// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
nfo, err := kv.js.StreamInfo(kv.stream)
if err != nil {
return nil, err
}

bs := &kvBackingStore{info: map[string]string{
"stream": kv.stream,
"domain": kv.js.opts.domain,
}}

if nfo.Cluster != nil {
bs.info["placement_cluster"] = nfo.Cluster.Name
}

return &kvStatus{nfo: nfo, bucket: kv.name, bs: bs}, nil
}
84 changes: 83 additions & 1 deletion object.go
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/nats-io/nuid"
)

// ObjectStoreManager creates, loads and deletes Object Stores
//
// Notice: Experimental Preview
//
// This functionality is EXPERIMENTAL and may be changed in later releases.
Expand All @@ -43,6 +45,9 @@ type ObjectStoreManager interface {
DeleteObjectStore(bucket string) error
}

// ObjectStore is a blob store capable of storing large objects efficiently in
// JetStream streams
//
// Notice: Experimental Preview
//
// This functionality is EXPERIMENTAL and may be changed in later releases.
Expand Down Expand Up @@ -89,6 +94,9 @@ type ObjectStore interface {

// List will list all the objects in this store.
List(opts ...WatchOpt) ([]*ObjectInfo, error)

// Status retrieves run-time status about the backing store of the bucket.
Status() (ObjectStoreStatus, error)
}

type ObjectOpt interface {
Expand All @@ -109,7 +117,7 @@ func (ctx ContextOpt) configureObject(opts *objOpts) error {
type ObjectWatcher interface {
// Updates returns a channel to read any updates to entries.
Updates() <-chan *ObjectInfo
// Stop() will stop this watcher.
// Stop will stop this watcher.
Stop() error
}

Expand All @@ -132,6 +140,31 @@ type ObjectStoreConfig struct {
Replicas int
}

// BackingStore describes the implementation and storage backend of KV or Object stores
type BackingStore interface {
Kind() string
Info() map[string]string
}

type ObjectStoreStatus interface {
// Bucket is the name of the bucket
Bucket() string
// Description is the description supplied when creating the bucket
Description() string
// TTL indicates how long objects are kept in the bucket
TTL() time.Duration
// Storage indicates the underlying JetStream storage technology used to store data
Storage() StorageType
// Replicas indicates how many storage replicas are kept for the data in the bucket
Replicas() int
// Sealed indicates the stream is sealed and cannot be modified in any way
Sealed() bool
// Size is the combined size of all data in the bucket including metadata, in bytes
Size() uint64
// BackingStore provides details about the underlying storage
BackingStore() BackingStore
}

// ObjectMetaOptions
type ObjectMetaOptions struct {
Link *ObjectLink `json:"link,omitempty"`
Expand Down Expand Up @@ -857,6 +890,55 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) {
return objs, nil
}

type objBackingStore struct {
info map[string]string
}

func (b *objBackingStore) Kind() string { return "JetStream" }
func (b *objBackingStore) Info() map[string]string { return b.info }

type objStatus struct {
nfo *StreamInfo
bucket string
bs BackingStore
}

func (s *objStatus) Bucket() string { return s.bucket }
func (s *objStatus) Description() string { return s.nfo.Config.Description }
func (s *objStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }
func (s *objStatus) Storage() StorageType { return s.nfo.Config.Storage }
func (s *objStatus) Replicas() int { return s.nfo.Config.Replicas }
func (s *objStatus) Sealed() bool { return s.nfo.Config.Sealed }
func (s *objStatus) Size() uint64 { return s.nfo.State.Bytes }
func (s *objStatus) BackingStore() BackingStore { return s.bs }

// Status retrieves run-time status about a bucket
func (obs *obs) Status() (ObjectStoreStatus, error) {
nfo, err := obs.js.StreamInfo(obs.stream)
if err != nil {
return nil, err
}

bs := &objBackingStore{
info: map[string]string{
"stream": obs.stream,
"domain": obs.js.opts.domain,
},
}

if nfo.Cluster != nil {
bs.info["placement_cluster"] = nfo.Cluster.Name
}

status := &objStatus{
nfo: nfo,
bucket: obs.name,
bs: bs,
}

return status, nil
}

// Read impl.
func (o *objResult) Read(p []byte) (n int, err error) {
o.Lock()
Expand Down
26 changes: 25 additions & 1 deletion test/kv_test.go
Expand Up @@ -33,7 +33,7 @@ func TestKeyValueBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour})
expectOk(t, err)

if kv.Bucket() != "TEST" {
Expand Down Expand Up @@ -78,6 +78,30 @@ func TestKeyValueBasics(t *testing.T) {
expectOk(t, err)
_, err = kv.Update("age", []byte("33"), r)
expectOk(t, err)

// Status
status, err := kv.Status()
expectOk(t, err)
if status.History() != 5 {
t.Fatalf("expected history of 5 got %d", status.History())
}
if status.Bucket() != "TEST" {
t.Fatalf("expected bucket TEST got %v", status.Bucket())
}
if status.TTL() != time.Hour {
t.Fatalf("expected 1 hour TTL got %v", status.TTL())
}
if status.Values() != 7 {
t.Fatalf("expected 7 values got %d", status.Values())
}
bs := status.BackingStore()
if bs.Kind() != "JetStream" {
t.Fatalf("invalid backing store kind %s", bs.Kind())
}
info := bs.Info()
if info["stream"] != "KV_TEST" {
t.Fatalf("invalid stream name %+v", info)
}
}

func TestKeyValueHistory(t *testing.T) {
Expand Down
44 changes: 43 additions & 1 deletion test/object_test.go
Expand Up @@ -34,7 +34,7 @@ func TestObjectBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"})
obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"})
expectOk(t, err)

// Create ~16MB object.
Expand Down Expand Up @@ -67,6 +67,21 @@ func TestObjectBasics(t *testing.T) {
t.Fatalf("Expected the object stream to be sealed, got %+v", si)
}

status, err := obs.Status()
expectOk(t, err)
if !status.Sealed() {
t.Fatalf("exected sealed status")
}
if status.Size() == 0 {
t.Fatalf("size is 0")
}
if status.Storage() != nats.FileStorage {
t.Fatalf("stauts reports %d storage", status.Storage())
}
if status.Description() != "testing" {
t.Fatalf("invalid description: '%s'", status.Description())
}

// Check simple errors.
_, err = obs.Get("FOO")
expectErr(t, err)
Expand Down Expand Up @@ -97,6 +112,33 @@ func TestObjectBasics(t *testing.T) {
expectErr(t, err, nats.ErrStreamNotFound)
}

func TestDefaultObjectStatus(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)

nc, js := jsClient(t, s)
defer nc.Close()

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"})
expectOk(t, err)

blob := make([]byte, 1024*1024+22)
rand.Read(blob)

_, err = obs.PutBytes("BLOB", blob)
expectOk(t, err)

status, err := obs.Status()
expectOk(t, err)
if status.BackingStore().Kind() != "JetStream" {
t.Fatalf("invalid backing store kind: %s", status.BackingStore().Kind())
}
info := status.BackingStore().Info()
if info["stream"] != "OBJ_OBJS" {
t.Fatalf("invalid stream name %+v", info)
}
}

func TestObjectFileBasics(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)
Expand Down