Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Listing buckets and bucket names #1074

Merged
merged 2 commits into from Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 24 additions & 4 deletions jsm.go
Expand Up @@ -41,8 +41,12 @@ type JetStreamManager interface {
PurgeStream(name string, opts ...JSOpt) error

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
// DEPRECATED: Use Streams() instead.
StreamsInfo(opts ...JSOpt) <-chan *StreamInfo

// Streams can be used to retrieve a list of StreamInfo objects.
Streams(opts ...JSOpt) <-chan *StreamInfo

// StreamNames is used to retrieve a list of Stream names.
StreamNames(opts ...JSOpt) <-chan string

Expand Down Expand Up @@ -78,8 +82,12 @@ type JetStreamManager interface {
ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
// DEPRECATED: Use Consumers() instead.
ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo

// Consumers is used to retrieve a list of ConsumerInfo objects.
Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo

// ConsumerNames is used to retrieve a list of Consumer names.
ConsumerNames(stream string, opts ...JSOpt) <-chan string

Expand Down Expand Up @@ -502,8 +510,8 @@ func (c *consumerLister) Err() error {
return c.err
}

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
// Consumers is used to retrieve a list of ConsumerInfo objects.
func (jsc *js) Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
Expand All @@ -530,6 +538,12 @@ func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
return ch
}

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
// DEPRECATED: Use Consumers() instead.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
return jsc.Consumers(stream, opts...)
}

type consumerNamesLister struct {
stream string
js *js
Expand Down Expand Up @@ -1258,8 +1272,8 @@ func (s *streamLister) Err() error {
return s.err
}

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
// Streams can be used to retrieve a list of StreamInfo objects.
func (jsc *js) Streams(opts ...JSOpt) <-chan *StreamInfo {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
Expand All @@ -1286,6 +1300,12 @@ func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
return ch
}

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
// DEPRECATED: Use Streams() instead.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
return jsc.Streams(opts...)
}

type streamNamesLister struct {
js *js

Expand Down
84 changes: 84 additions & 0 deletions object.go
Expand Up @@ -44,6 +44,10 @@ type ObjectStoreManager interface {
CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
// DeleteObjectStore will delete the underlying stream for the named object.
DeleteObjectStore(bucket string) error
// ObjectStoreNames is used to retrieve a list of bucket names
ObjectStoreNames(opts ...ObjectOpt) <-chan string
// ObjectStores is used to retrieve a list of buckets
ObjectStores(opts ...ObjectOpt) <-chan ObjectStore
}

// ObjectStore is a blob store capable of storing large objects efficiently in
Expand Down Expand Up @@ -1136,3 +1140,83 @@ func (o *objResult) Error() error {
defer o.Unlock()
return o.err
}

// ObjectStoreNames is used to retrieve a list of bucket names
func (js *js) ObjectStoreNames(opts ...ObjectOpt) <-chan string {
var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil
}
}
}
ch := make(chan string)
var cancel context.CancelFunc
if o.ctx == nil {
o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
}
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
l.js.opts.ctx = o.ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "OBJ_") {
continue
}
select {
case ch <- info.Config.Name:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}

// ObjectStores is used to retrieve a list of buckets
func (js *js) ObjectStores(opts ...ObjectOpt) <-chan ObjectStore {
var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil
}
}
}
ch := make(chan ObjectStore)
var cancel context.CancelFunc
if o.ctx == nil {
o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
}
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
l.js.opts.ctx = o.ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "OBJ_") {
continue
}
select {
case ch <- &obs{name: strings.TrimPrefix(info.Config.Name, "OBJ_"), stream: info.Config.Name, js: js}:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}
33 changes: 25 additions & 8 deletions test/js_test.go
Expand Up @@ -424,7 +424,7 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("TEST") {
for info := range js.Consumers("TEST") {
infos = append(infos, info)
}
if len(infos) != expected {
Expand Down Expand Up @@ -1427,7 +1427,7 @@ func TestJetStreamManagement(t *testing.T) {

t.Run("list streams", func(t *testing.T) {
var infos []*nats.StreamInfo
for info := range js.StreamsInfo() {
for info := range js.Streams() {
infos = append(infos, info)
}
if len(infos) != 1 || infos[0].Config.Name != "foo" {
Expand All @@ -1437,20 +1437,20 @@ func TestJetStreamManagement(t *testing.T) {

t.Run("list consumers", func(t *testing.T) {
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("") {
for info := range js.Consumers("") {
infos = append(infos, info)
}
if len(infos) != 0 {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
for info := range js.ConsumersInfo("bad.stream.name") {
for info := range js.Consumers("bad.stream.name") {
infos = append(infos, info)
}
if len(infos) != 0 {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
infos = infos[:0]
for info := range js.ConsumersInfo("foo") {
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != 1 || infos[0].Stream != "foo" || infos[0].Config.Durable != "dlc" {
Expand Down Expand Up @@ -1682,6 +1682,14 @@ func TestStreamLister(t *testing.T) {
t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.streamsNum, len(names))
}
infos := make([]*nats.StreamInfo, 0)
for info := range js.Streams() {
infos = append(infos, info)
}
if len(infos) != test.streamsNum {
t.Fatalf("Invalid number of streams; want: %d; got: %d", test.streamsNum, len(infos))
}
// test the deprecated StreamsInfo()
infos = make([]*nats.StreamInfo, 0)
for info := range js.StreamsInfo() {
infos = append(infos, info)
}
Expand Down Expand Up @@ -1751,7 +1759,7 @@ func TestStreamLister_FilterSubject(t *testing.T) {

// list streams
names = make([]string, 0)
for info := range js.StreamsInfo(nats.StreamListFilter(test.filter)) {
for info := range js.Streams(nats.StreamListFilter(test.filter)) {
names = append(names, info.Config.Name)
}
if !reflect.DeepEqual(names, test.expected) {
Expand Down Expand Up @@ -1797,6 +1805,15 @@ func TestConsumersLister(t *testing.T) {
t.Fatalf("Invalid number of consumer names; want: %d; got: %d", test.consumersNum, len(names))
}
infos := make([]*nats.ConsumerInfo, 0)
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != test.consumersNum {
t.Fatalf("Invalid number of consumers; want: %d; got: %d", test.consumersNum, len(infos))
}

// test the deprecated ConsumersInfo()
infos = make([]*nats.ConsumerInfo, 0)
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
Expand Down Expand Up @@ -4133,7 +4150,7 @@ func TestJetStream_Unsubscribe(t *testing.T) {
t.Helper()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("foo") {
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != expected {
Expand Down Expand Up @@ -4270,7 +4287,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
var infos []*nats.ConsumerInfo
for info := range jsm.ConsumersInfo("foo") {
for info := range jsm.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != expected {
Expand Down
50 changes: 50 additions & 0 deletions test/object_test.go
Expand Up @@ -777,3 +777,53 @@ func TestObjectMaxBytes(t *testing.T) {
t.Fatalf("invalid object stream MaxSize %+v", info.Config.MaxBytes)
}
}

func TestBucketNames(t *testing.T) {
tests := []struct {
name string
bucketsNum int
}{
{
name: "single page",
bucketsNum: 5,
},
{
name: "multi page",
bucketsNum: 1025,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
// create stream without the chunk subject, but with OBJ_ prefix
_, err := js.AddStream(&nats.StreamConfig{Name: "OBJ_FOO", Subjects: []string{"FOO.*"}})
expectOk(t, err)
// create stream with chunk subject, but without "OBJ_" prefix
_, err = js.AddStream(&nats.StreamConfig{Name: "FOO", Subjects: []string{"$O.ABC.C.>"}})
expectOk(t, err)
for i := 0; i < test.bucketsNum; i++ {
_, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: fmt.Sprintf("OBJS_%d", i), MaxBytes: 1024})
expectOk(t, err)
}
names := make([]string, 0)
for name := range js.ObjectStoreNames() {
names = append(names, name)
}
if len(names) != test.bucketsNum {
t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.bucketsNum, len(names))
}
infos := make([]nats.ObjectStore, 0)
for info := range js.ObjectStores() {
infos = append(infos, info)
}
if len(infos) != test.bucketsNum {
t.Fatalf("Invalid number of streams; want: %d; got: %d", test.bucketsNum, len(infos))
}
})
}
}