Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Listing buckets and bucket names #1074

Merged
merged 2 commits into from Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 84 additions & 0 deletions object.go
Expand Up @@ -44,6 +44,10 @@ type ObjectStoreManager interface {
CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
// DeleteObjectStore will delete the underlying stream for the named object.
DeleteObjectStore(bucket string) error
// BucketNames is used to retrieve a list of bucket names
BucketNames(opts ...ObjectOpt) <-chan string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe StoreNames and Stores?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, makes sense as we mostly use ObjectStore instead of Bucket elsewhere. What about ObjectStoreNames and ObjectStores? Just to make it more obvious when using JetStreamContext interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep StreamsInfo but let's make alias for Streams() and that will be consistent imo.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// BucketsInfo is used to retrieve a list of buckets
BucketsInfo(opts ...ObjectOpt) <-chan ObjectStore
}

// ObjectStore is a blob store capable of storing large objects efficiently in
Expand Down Expand Up @@ -1136,3 +1140,83 @@ func (o *objResult) Error() error {
defer o.Unlock()
return o.err
}

// BucketNames is used to retrieve a list of bucket names
func (js *js) BucketNames(opts ...ObjectOpt) <-chan string {
var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil
}
}
}
ch := make(chan string)
var cancel context.CancelFunc
if o.ctx == nil {
o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
}
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
l.js.opts.ctx = o.ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "OBJ_") {
continue
}
select {
case ch <- info.Config.Name:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}

// BucketsInfo is used to retrieve a list of buckets
func (js *js) BucketsInfo(opts ...ObjectOpt) <-chan ObjectStore {
var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil
}
}
}
ch := make(chan ObjectStore)
var cancel context.CancelFunc
if o.ctx == nil {
o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
}
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
l.js.opts.ctx = o.ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "OBJ_") {
continue
}
select {
case ch <- &obs{name: strings.TrimPrefix(info.Config.Name, "OBJ_"), stream: info.Config.Name, js: js}:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}
50 changes: 50 additions & 0 deletions test/object_test.go
Expand Up @@ -777,3 +777,53 @@ func TestObjectMaxBytes(t *testing.T) {
t.Fatalf("invalid object stream MaxSize %+v", info.Config.MaxBytes)
}
}

func TestBucketNames(t *testing.T) {
tests := []struct {
name string
bucketsNum int
}{
{
name: "single page",
bucketsNum: 5,
},
{
name: "multi page",
bucketsNum: 1025,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
// create stream without the chunk subject, but with OBJ_ prefix
_, err := js.AddStream(&nats.StreamConfig{Name: "OBJ_FOO", Subjects: []string{"FOO.*"}})
expectOk(t, err)
// create stream with chunk subject, but without "OBJ_" prefix
_, err = js.AddStream(&nats.StreamConfig{Name: "FOO", Subjects: []string{"$O.ABC.C.>"}})
expectOk(t, err)
for i := 0; i < test.bucketsNum; i++ {
_, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: fmt.Sprintf("OBJS_%d", i), MaxBytes: 1024})
expectOk(t, err)
}
names := make([]string, 0)
for name := range js.BucketNames() {
names = append(names, name)
}
if len(names) != test.bucketsNum {
t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.bucketsNum, len(names))
}
infos := make([]nats.ObjectStore, 0)
for info := range js.BucketsInfo() {
infos = append(infos, info)
}
if len(infos) != test.bucketsNum {
t.Fatalf("Invalid number of streams; want: %d; got: %d", test.bucketsNum, len(infos))
}
})
}
}