diff --git a/example_test.go b/example_test.go index 42e3f322d..9651785b8 100644 --- a/example_test.go +++ b/example_test.go @@ -630,6 +630,11 @@ func ExampleSubOpt() { js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.ConsumerReplicas(1)) + + // Force memory storage while subscribing. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.Durable("FOO"), nats.ConsumerMemoryStorage()) } func ExampleMaxWait() { diff --git a/js.go b/js.go index 2ef685a14..e0022015c 100644 --- a/js.go +++ b/js.go @@ -1357,6 +1357,9 @@ func checkConfig(s, u *ConsumerConfig) error { if u.Replicas > 0 && u.Replicas != s.Replicas { return makeErr("replicas", u.Replicas, s.Replicas) } + if u.MemoryStorage && !s.MemoryStorage { + return makeErr("memory storage", u.MemoryStorage, s.MemoryStorage) + } return nil } @@ -2485,6 +2488,14 @@ func ConsumerReplicas(replicas int) SubOpt { }) } +// ConsumerMemoryStorage sets the memory storage to true for a consumer. +func ConsumerMemoryStorage() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MemoryStorage = true + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/nats.go b/nats.go index 988924d24..5edbdd3d3 100644 --- a/nats.go +++ b/nats.go @@ -2323,6 +2323,19 @@ func normalizeErr(line string) string { return s } +// natsProtoErr represents an -ERR protocol message sent by the server. +type natsProtoErr struct { + description string +} + +func (nerr *natsProtoErr) Error() string { + return fmt.Sprintf("nats: %s", nerr.description) +} + +func (nerr *natsProtoErr) Is(err error) bool { + return strings.ToLower(nerr.Error()) == err.Error() +} + // Send a connect protocol message to the server, issue user/password if // applicable. Will wait for a flush to return from the server for error // processing. @@ -2377,8 +2390,7 @@ func (nc *Conn) sendConnect() error { // in doReconnect()). nc.processAuthError(authErr) } - - return errors.New("nats: " + proto) + return &natsProtoErr{proto} } // Notify that we got an unexpected protocol. diff --git a/object.go b/object.go index a9a93b77e..57a6364ea 100644 --- a/object.go +++ b/object.go @@ -44,6 +44,10 @@ type ObjectStoreManager interface { CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) // DeleteObjectStore will delete the underlying stream for the named object. DeleteObjectStore(bucket string) error + // ObjectStoreNames is used to retrieve a list of bucket names + ObjectStoreNames(opts ...ObjectOpt) <-chan string + // ObjectStores is used to retrieve a list of buckets + ObjectStores(opts ...ObjectOpt) <-chan ObjectStore } // ObjectStore is a blob store capable of storing large objects efficiently in @@ -857,13 +861,13 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { // If the new name is different from the old, and it exists, error // If there was an error that was not ErrObjectNotFound, error. if name != meta.Name { - _, err = obs.GetInfo(meta.Name) - if err != ErrObjectNotFound { - if err == nil { - return ErrObjectAlreadyExists - } + existingInfo, err := obs.GetInfo(meta.Name) + if err != nil && !errors.Is(err, ErrObjectNotFound) { return err } + if err == nil && !existingInfo.Deleted { + return ErrObjectAlreadyExists + } } // Update Meta prevents update of ObjectMetaOptions (Link, ChunkSize) @@ -1136,3 +1140,83 @@ func (o *objResult) Error() error { defer o.Unlock() return o.err } + +// ObjectStoreNames is used to retrieve a list of bucket names +func (js *js) ObjectStoreNames(opts ...ObjectOpt) <-chan string { + var o objOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureObject(&o); err != nil { + return nil + } + } + } + ch := make(chan string) + var cancel context.CancelFunc + if o.ctx == nil { + o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait) + } + l := &streamLister{js: js} + l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*") + l.js.opts.ctx = o.ctx + go func() { + if cancel != nil { + defer cancel() + } + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + if !strings.HasPrefix(info.Config.Name, "OBJ_") { + continue + } + select { + case ch <- info.Config.Name: + case <-o.ctx.Done(): + return + } + } + } + }() + + return ch +} + +// ObjectStores is used to retrieve a list of buckets +func (js *js) ObjectStores(opts ...ObjectOpt) <-chan ObjectStore { + var o objOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureObject(&o); err != nil { + return nil + } + } + } + ch := make(chan ObjectStore) + var cancel context.CancelFunc + if o.ctx == nil { + o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait) + } + l := &streamLister{js: js} + l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*") + l.js.opts.ctx = o.ctx + go func() { + if cancel != nil { + defer cancel() + } + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + if !strings.HasPrefix(info.Config.Name, "OBJ_") { + continue + } + select { + case ch <- &obs{name: strings.TrimPrefix(info.Config.Name, "OBJ_"), stream: info.Config.Name, js: js}: + case <-o.ctx.Done(): + return + } + } + } + }() + + return ch +} diff --git a/test/auth_test.go b/test/auth_test.go index 27559a750..32d80e5b4 100644 --- a/test/auth_test.go +++ b/test/auth_test.go @@ -14,6 +14,7 @@ package test import ( + "errors" "fmt" "strings" "sync/atomic" @@ -44,6 +45,10 @@ func TestAuth(t *testing.T) { t.Fatalf("Expected error '%v', got '%v'", nats.ErrAuthorization, err) } + if !errors.Is(err, nats.ErrAuthorization) { + t.Fatalf("Expected error '%v', got '%v'", nats.ErrAuthorization, err) + } + nc, err := nats.Connect("nats://derek:foo@127.0.0.1:8232") if err != nil { t.Fatal("Should have connected successfully with a token") diff --git a/test/cluster_test.go b/test/cluster_test.go index bbe3b9075..5f7595f98 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -14,6 +14,7 @@ package test import ( + "errors" "fmt" "math" "net" @@ -193,6 +194,10 @@ func TestAuthServers(t *testing.T) { t.Fatalf("Wrong error, wanted Auth failure, got '%s'\n", err) } + if !errors.Is(err, nats.ErrAuthorization) { + t.Fatalf("Expected error '%v', got '%v'", nats.ErrAuthorization, err) + } + // Test that we can connect to a subsequent correct server. var authServers = []string{ "nats://127.0.0.1:1222", diff --git a/test/js_test.go b/test/js_test.go index 362f00645..ee450d396 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -424,7 +424,7 @@ func TestJetStreamSubscribe(t *testing.T) { t.Helper() checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { var infos []*nats.ConsumerInfo - for info := range js.ConsumersInfo("TEST") { + for info := range js.Consumers("TEST") { infos = append(infos, info) } if len(infos) != expected { @@ -1427,7 +1427,7 @@ func TestJetStreamManagement(t *testing.T) { t.Run("list streams", func(t *testing.T) { var infos []*nats.StreamInfo - for info := range js.StreamsInfo() { + for info := range js.Streams() { infos = append(infos, info) } if len(infos) != 1 || infos[0].Config.Name != "foo" { @@ -1437,20 +1437,20 @@ func TestJetStreamManagement(t *testing.T) { t.Run("list consumers", func(t *testing.T) { var infos []*nats.ConsumerInfo - for info := range js.ConsumersInfo("") { + for info := range js.Consumers("") { infos = append(infos, info) } if len(infos) != 0 { t.Fatalf("ConsumerInfo is not correct %+v", infos) } - for info := range js.ConsumersInfo("bad.stream.name") { + for info := range js.Consumers("bad.stream.name") { infos = append(infos, info) } if len(infos) != 0 { t.Fatalf("ConsumerInfo is not correct %+v", infos) } infos = infos[:0] - for info := range js.ConsumersInfo("foo") { + for info := range js.Consumers("foo") { infos = append(infos, info) } if len(infos) != 1 || infos[0].Stream != "foo" || infos[0].Config.Durable != "dlc" { @@ -1682,6 +1682,14 @@ func TestStreamLister(t *testing.T) { t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.streamsNum, len(names)) } infos := make([]*nats.StreamInfo, 0) + for info := range js.Streams() { + infos = append(infos, info) + } + if len(infos) != test.streamsNum { + t.Fatalf("Invalid number of streams; want: %d; got: %d", test.streamsNum, len(infos)) + } + // test the deprecated StreamsInfo() + infos = make([]*nats.StreamInfo, 0) for info := range js.StreamsInfo() { infos = append(infos, info) } @@ -1751,7 +1759,7 @@ func TestStreamLister_FilterSubject(t *testing.T) { // list streams names = make([]string, 0) - for info := range js.StreamsInfo(nats.StreamListFilter(test.filter)) { + for info := range js.Streams(nats.StreamListFilter(test.filter)) { names = append(names, info.Config.Name) } if !reflect.DeepEqual(names, test.expected) { @@ -1797,6 +1805,15 @@ func TestConsumersLister(t *testing.T) { t.Fatalf("Invalid number of consumer names; want: %d; got: %d", test.consumersNum, len(names)) } infos := make([]*nats.ConsumerInfo, 0) + for info := range js.Consumers("foo") { + infos = append(infos, info) + } + if len(infos) != test.consumersNum { + t.Fatalf("Invalid number of consumers; want: %d; got: %d", test.consumersNum, len(infos)) + } + + // test the deprecated ConsumersInfo() + infos = make([]*nats.ConsumerInfo, 0) for info := range js.ConsumersInfo("foo") { infos = append(infos, info) } @@ -4133,7 +4150,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Helper() checkFor(t, time.Second, 15*time.Millisecond, func() error { var infos []*nats.ConsumerInfo - for info := range js.ConsumersInfo("foo") { + for info := range js.Consumers("foo") { infos = append(infos, info) } if len(infos) != expected { @@ -4270,7 +4287,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { t.Helper() var infos []*nats.ConsumerInfo - for info := range jsm.ConsumersInfo("foo") { + for info := range jsm.Consumers("foo") { infos = append(infos, info) } if len(infos) != expected { diff --git a/test/object_test.go b/test/object_test.go index 5baa861ec..ef8caa37e 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -391,6 +391,8 @@ func TestObjectMetadata(t *testing.T) { // Simple with no Meta. _, err = obs.PutString("A", "AAA") expectOk(t, err) + _, err = obs.PutString("C", "CCC") + expectOk(t, err) meta := &nats.ObjectMeta{Name: "A"} meta.Description = "descA" @@ -423,15 +425,25 @@ func TestObjectMetadata(t *testing.T) { info, err = obs.GetInfo("B") expectOk(t, err) - if info.Name != "B" || info.Description != "descB" || info.Headers == nil || info.Headers.Get("color") != "red" { t.Fatalf("Update failed: %+v", info) } - // delete the object to test updating against a deleted object - err = obs.Delete("B") + // Change meta name to existing object's name + meta = &nats.ObjectMeta{Name: "C"} + + err = obs.UpdateMeta("B", meta) + expectErr(t, err, nats.ErrObjectAlreadyExists) + + err = obs.Delete("C") expectOk(t, err) err = obs.UpdateMeta("B", meta) + expectOk(t, err) + + // delete the object to test updating against a deleted object + err = obs.Delete("C") + expectOk(t, err) + err = obs.UpdateMeta("C", meta) expectErr(t, err, nats.ErrUpdateMetaDeleted) err = obs.UpdateMeta("X", meta) @@ -777,3 +789,53 @@ func TestObjectMaxBytes(t *testing.T) { t.Fatalf("invalid object stream MaxSize %+v", info.Config.MaxBytes) } } + +func TestBucketNames(t *testing.T) { + tests := []struct { + name string + bucketsNum int + }{ + { + name: "single page", + bucketsNum: 5, + }, + { + name: "multi page", + bucketsNum: 1025, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + // create stream without the chunk subject, but with OBJ_ prefix + _, err := js.AddStream(&nats.StreamConfig{Name: "OBJ_FOO", Subjects: []string{"FOO.*"}}) + expectOk(t, err) + // create stream with chunk subject, but without "OBJ_" prefix + _, err = js.AddStream(&nats.StreamConfig{Name: "FOO", Subjects: []string{"$O.ABC.C.>"}}) + expectOk(t, err) + for i := 0; i < test.bucketsNum; i++ { + _, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: fmt.Sprintf("OBJS_%d", i), MaxBytes: 1024}) + expectOk(t, err) + } + names := make([]string, 0) + for name := range js.ObjectStoreNames() { + names = append(names, name) + } + if len(names) != test.bucketsNum { + t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.bucketsNum, len(names)) + } + infos := make([]nats.ObjectStore, 0) + for info := range js.ObjectStores() { + infos = append(infos, info) + } + if len(infos) != test.bucketsNum { + t.Fatalf("Invalid number of streams; want: %d; got: %d", test.bucketsNum, len(infos)) + } + }) + } +}