Skip to content

Commit

Permalink
[ADDED] Listing buckets and bucket names (#1074)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Sep 15, 2022
1 parent 25b6392 commit 225c557
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 12 deletions.
28 changes: 24 additions & 4 deletions jsm.go
Expand Up @@ -41,8 +41,12 @@ type JetStreamManager interface {
PurgeStream(name string, opts ...JSOpt) error

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
// DEPRECATED: Use Streams() instead.
StreamsInfo(opts ...JSOpt) <-chan *StreamInfo

// Streams can be used to retrieve a list of StreamInfo objects.
Streams(opts ...JSOpt) <-chan *StreamInfo

// StreamNames is used to retrieve a list of Stream names.
StreamNames(opts ...JSOpt) <-chan string

Expand Down Expand Up @@ -78,8 +82,12 @@ type JetStreamManager interface {
ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
// DEPRECATED: Use Consumers() instead.
ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo

// Consumers is used to retrieve a list of ConsumerInfo objects.
Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo

// ConsumerNames is used to retrieve a list of Consumer names.
ConsumerNames(stream string, opts ...JSOpt) <-chan string

Expand Down Expand Up @@ -502,8 +510,8 @@ func (c *consumerLister) Err() error {
return c.err
}

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
// Consumers is used to retrieve a list of ConsumerInfo objects.
func (jsc *js) Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
Expand All @@ -530,6 +538,12 @@ func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
return ch
}

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
// DEPRECATED: Use Consumers() instead.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
return jsc.Consumers(stream, opts...)
}

type consumerNamesLister struct {
stream string
js *js
Expand Down Expand Up @@ -1258,8 +1272,8 @@ func (s *streamLister) Err() error {
return s.err
}

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
// Streams can be used to retrieve a list of StreamInfo objects.
func (jsc *js) Streams(opts ...JSOpt) <-chan *StreamInfo {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
Expand All @@ -1286,6 +1300,12 @@ func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
return ch
}

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
// DEPRECATED: Use Streams() instead.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
return jsc.Streams(opts...)
}

type streamNamesLister struct {
js *js

Expand Down
84 changes: 84 additions & 0 deletions object.go
Expand Up @@ -44,6 +44,10 @@ type ObjectStoreManager interface {
CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
// DeleteObjectStore will delete the underlying stream for the named object.
DeleteObjectStore(bucket string) error
// ObjectStoreNames is used to retrieve a list of bucket names
ObjectStoreNames(opts ...ObjectOpt) <-chan string
// ObjectStores is used to retrieve a list of buckets
ObjectStores(opts ...ObjectOpt) <-chan ObjectStore
}

// ObjectStore is a blob store capable of storing large objects efficiently in
Expand Down Expand Up @@ -1136,3 +1140,83 @@ func (o *objResult) Error() error {
defer o.Unlock()
return o.err
}

// ObjectStoreNames is used to retrieve a list of bucket names
func (js *js) ObjectStoreNames(opts ...ObjectOpt) <-chan string {
var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil
}
}
}
ch := make(chan string)
var cancel context.CancelFunc
if o.ctx == nil {
o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
}
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
l.js.opts.ctx = o.ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "OBJ_") {
continue
}
select {
case ch <- info.Config.Name:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}

// ObjectStores is used to retrieve a list of buckets
func (js *js) ObjectStores(opts ...ObjectOpt) <-chan ObjectStore {
var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil
}
}
}
ch := make(chan ObjectStore)
var cancel context.CancelFunc
if o.ctx == nil {
o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
}
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
l.js.opts.ctx = o.ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "OBJ_") {
continue
}
select {
case ch <- &obs{name: strings.TrimPrefix(info.Config.Name, "OBJ_"), stream: info.Config.Name, js: js}:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}
33 changes: 25 additions & 8 deletions test/js_test.go
Expand Up @@ -424,7 +424,7 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("TEST") {
for info := range js.Consumers("TEST") {
infos = append(infos, info)
}
if len(infos) != expected {
Expand Down Expand Up @@ -1427,7 +1427,7 @@ func TestJetStreamManagement(t *testing.T) {

t.Run("list streams", func(t *testing.T) {
var infos []*nats.StreamInfo
for info := range js.StreamsInfo() {
for info := range js.Streams() {
infos = append(infos, info)
}
if len(infos) != 1 || infos[0].Config.Name != "foo" {
Expand All @@ -1437,20 +1437,20 @@ func TestJetStreamManagement(t *testing.T) {

t.Run("list consumers", func(t *testing.T) {
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("") {
for info := range js.Consumers("") {
infos = append(infos, info)
}
if len(infos) != 0 {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
for info := range js.ConsumersInfo("bad.stream.name") {
for info := range js.Consumers("bad.stream.name") {
infos = append(infos, info)
}
if len(infos) != 0 {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
infos = infos[:0]
for info := range js.ConsumersInfo("foo") {
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != 1 || infos[0].Stream != "foo" || infos[0].Config.Durable != "dlc" {
Expand Down Expand Up @@ -1682,6 +1682,14 @@ func TestStreamLister(t *testing.T) {
t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.streamsNum, len(names))
}
infos := make([]*nats.StreamInfo, 0)
for info := range js.Streams() {
infos = append(infos, info)
}
if len(infos) != test.streamsNum {
t.Fatalf("Invalid number of streams; want: %d; got: %d", test.streamsNum, len(infos))
}
// test the deprecated StreamsInfo()
infos = make([]*nats.StreamInfo, 0)
for info := range js.StreamsInfo() {
infos = append(infos, info)
}
Expand Down Expand Up @@ -1751,7 +1759,7 @@ func TestStreamLister_FilterSubject(t *testing.T) {

// list streams
names = make([]string, 0)
for info := range js.StreamsInfo(nats.StreamListFilter(test.filter)) {
for info := range js.Streams(nats.StreamListFilter(test.filter)) {
names = append(names, info.Config.Name)
}
if !reflect.DeepEqual(names, test.expected) {
Expand Down Expand Up @@ -1797,6 +1805,15 @@ func TestConsumersLister(t *testing.T) {
t.Fatalf("Invalid number of consumer names; want: %d; got: %d", test.consumersNum, len(names))
}
infos := make([]*nats.ConsumerInfo, 0)
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != test.consumersNum {
t.Fatalf("Invalid number of consumers; want: %d; got: %d", test.consumersNum, len(infos))
}

// test the deprecated ConsumersInfo()
infos = make([]*nats.ConsumerInfo, 0)
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
Expand Down Expand Up @@ -4133,7 +4150,7 @@ func TestJetStream_Unsubscribe(t *testing.T) {
t.Helper()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("foo") {
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != expected {
Expand Down Expand Up @@ -4270,7 +4287,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
var infos []*nats.ConsumerInfo
for info := range jsm.ConsumersInfo("foo") {
for info := range jsm.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != expected {
Expand Down
50 changes: 50 additions & 0 deletions test/object_test.go
Expand Up @@ -777,3 +777,53 @@ func TestObjectMaxBytes(t *testing.T) {
t.Fatalf("invalid object stream MaxSize %+v", info.Config.MaxBytes)
}
}

func TestBucketNames(t *testing.T) {
tests := []struct {
name string
bucketsNum int
}{
{
name: "single page",
bucketsNum: 5,
},
{
name: "multi page",
bucketsNum: 1025,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
// create stream without the chunk subject, but with OBJ_ prefix
_, err := js.AddStream(&nats.StreamConfig{Name: "OBJ_FOO", Subjects: []string{"FOO.*"}})
expectOk(t, err)
// create stream with chunk subject, but without "OBJ_" prefix
_, err = js.AddStream(&nats.StreamConfig{Name: "FOO", Subjects: []string{"$O.ABC.C.>"}})
expectOk(t, err)
for i := 0; i < test.bucketsNum; i++ {
_, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: fmt.Sprintf("OBJS_%d", i), MaxBytes: 1024})
expectOk(t, err)
}
names := make([]string, 0)
for name := range js.ObjectStoreNames() {
names = append(names, name)
}
if len(names) != test.bucketsNum {
t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.bucketsNum, len(names))
}
infos := make([]nats.ObjectStore, 0)
for info := range js.ObjectStores() {
infos = append(infos, info)
}
if len(infos) != test.bucketsNum {
t.Fatalf("Invalid number of streams; want: %d; got: %d", test.bucketsNum, len(infos))
}
})
}
}

0 comments on commit 225c557

Please sign in to comment.