Skip to content

Commit

Permalink
Merge branch 'main' into jnm/implement_server_PR_3454
Browse files Browse the repository at this point in the history
  • Loading branch information
jnmoyne committed Sep 15, 2022
2 parents c8983a2 + 866ce08 commit 79716ec
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 18 deletions.
5 changes: 5 additions & 0 deletions example_test.go
Expand Up @@ -630,6 +630,11 @@ func ExampleSubOpt() {
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerReplicas(1))

// Force memory storage while subscribing.
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerMemoryStorage())
}

func ExampleMaxWait() {
Expand Down
11 changes: 11 additions & 0 deletions js.go
Expand Up @@ -1357,6 +1357,9 @@ func checkConfig(s, u *ConsumerConfig) error {
if u.Replicas > 0 && u.Replicas != s.Replicas {
return makeErr("replicas", u.Replicas, s.Replicas)
}
if u.MemoryStorage && !s.MemoryStorage {
return makeErr("memory storage", u.MemoryStorage, s.MemoryStorage)
}
return nil
}

Expand Down Expand Up @@ -2485,6 +2488,14 @@ func ConsumerReplicas(replicas int) SubOpt {
})
}

// ConsumerMemoryStorage sets the memory storage to true for a consumer.
func ConsumerMemoryStorage() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MemoryStorage = true
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
16 changes: 14 additions & 2 deletions nats.go
Expand Up @@ -2323,6 +2323,19 @@ func normalizeErr(line string) string {
return s
}

// natsProtoErr represents an -ERR protocol message sent by the server.
type natsProtoErr struct {
description string
}

func (nerr *natsProtoErr) Error() string {
return fmt.Sprintf("nats: %s", nerr.description)
}

func (nerr *natsProtoErr) Is(err error) bool {
return strings.ToLower(nerr.Error()) == err.Error()
}

// Send a connect protocol message to the server, issue user/password if
// applicable. Will wait for a flush to return from the server for error
// processing.
Expand Down Expand Up @@ -2377,8 +2390,7 @@ func (nc *Conn) sendConnect() error {
// in doReconnect()).
nc.processAuthError(authErr)
}

return errors.New("nats: " + proto)
return &natsProtoErr{proto}
}

// Notify that we got an unexpected protocol.
Expand Down
94 changes: 89 additions & 5 deletions object.go
Expand Up @@ -44,6 +44,10 @@ type ObjectStoreManager interface {
CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
// DeleteObjectStore will delete the underlying stream for the named object.
DeleteObjectStore(bucket string) error
// ObjectStoreNames is used to retrieve a list of bucket names
ObjectStoreNames(opts ...ObjectOpt) <-chan string
// ObjectStores is used to retrieve a list of buckets
ObjectStores(opts ...ObjectOpt) <-chan ObjectStore
}

// ObjectStore is a blob store capable of storing large objects efficiently in
Expand Down Expand Up @@ -857,13 +861,13 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
// If the new name is different from the old, and it exists, error
// If there was an error that was not ErrObjectNotFound, error.
if name != meta.Name {
_, err = obs.GetInfo(meta.Name)
if err != ErrObjectNotFound {
if err == nil {
return ErrObjectAlreadyExists
}
existingInfo, err := obs.GetInfo(meta.Name)
if err != nil && !errors.Is(err, ErrObjectNotFound) {
return err
}
if err == nil && !existingInfo.Deleted {
return ErrObjectAlreadyExists
}
}

// Update Meta prevents update of ObjectMetaOptions (Link, ChunkSize)
Expand Down Expand Up @@ -1136,3 +1140,83 @@ func (o *objResult) Error() error {
defer o.Unlock()
return o.err
}

// ObjectStoreNames is used to retrieve a list of bucket names
func (js *js) ObjectStoreNames(opts ...ObjectOpt) <-chan string {
var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil
}
}
}
ch := make(chan string)
var cancel context.CancelFunc
if o.ctx == nil {
o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
}
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
l.js.opts.ctx = o.ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "OBJ_") {
continue
}
select {
case ch <- info.Config.Name:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}

// ObjectStores is used to retrieve a list of buckets
func (js *js) ObjectStores(opts ...ObjectOpt) <-chan ObjectStore {
var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil
}
}
}
ch := make(chan ObjectStore)
var cancel context.CancelFunc
if o.ctx == nil {
o.ctx, cancel = context.WithTimeout(context.Background(), defaultRequestWait)
}
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
l.js.opts.ctx = o.ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "OBJ_") {
continue
}
select {
case ch <- &obs{name: strings.TrimPrefix(info.Config.Name, "OBJ_"), stream: info.Config.Name, js: js}:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}
5 changes: 5 additions & 0 deletions test/auth_test.go
Expand Up @@ -14,6 +14,7 @@
package test

import (
"errors"
"fmt"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -44,6 +45,10 @@ func TestAuth(t *testing.T) {
t.Fatalf("Expected error '%v', got '%v'", nats.ErrAuthorization, err)
}

if !errors.Is(err, nats.ErrAuthorization) {
t.Fatalf("Expected error '%v', got '%v'", nats.ErrAuthorization, err)
}

nc, err := nats.Connect("nats://derek:foo@127.0.0.1:8232")
if err != nil {
t.Fatal("Should have connected successfully with a token")
Expand Down
5 changes: 5 additions & 0 deletions test/cluster_test.go
Expand Up @@ -14,6 +14,7 @@
package test

import (
"errors"
"fmt"
"math"
"net"
Expand Down Expand Up @@ -193,6 +194,10 @@ func TestAuthServers(t *testing.T) {
t.Fatalf("Wrong error, wanted Auth failure, got '%s'\n", err)
}

if !errors.Is(err, nats.ErrAuthorization) {
t.Fatalf("Expected error '%v', got '%v'", nats.ErrAuthorization, err)
}

// Test that we can connect to a subsequent correct server.
var authServers = []string{
"nats://127.0.0.1:1222",
Expand Down
33 changes: 25 additions & 8 deletions test/js_test.go
Expand Up @@ -424,7 +424,7 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("TEST") {
for info := range js.Consumers("TEST") {
infos = append(infos, info)
}
if len(infos) != expected {
Expand Down Expand Up @@ -1427,7 +1427,7 @@ func TestJetStreamManagement(t *testing.T) {

t.Run("list streams", func(t *testing.T) {
var infos []*nats.StreamInfo
for info := range js.StreamsInfo() {
for info := range js.Streams() {
infos = append(infos, info)
}
if len(infos) != 1 || infos[0].Config.Name != "foo" {
Expand All @@ -1437,20 +1437,20 @@ func TestJetStreamManagement(t *testing.T) {

t.Run("list consumers", func(t *testing.T) {
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("") {
for info := range js.Consumers("") {
infos = append(infos, info)
}
if len(infos) != 0 {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
for info := range js.ConsumersInfo("bad.stream.name") {
for info := range js.Consumers("bad.stream.name") {
infos = append(infos, info)
}
if len(infos) != 0 {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
infos = infos[:0]
for info := range js.ConsumersInfo("foo") {
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != 1 || infos[0].Stream != "foo" || infos[0].Config.Durable != "dlc" {
Expand Down Expand Up @@ -1682,6 +1682,14 @@ func TestStreamLister(t *testing.T) {
t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.streamsNum, len(names))
}
infos := make([]*nats.StreamInfo, 0)
for info := range js.Streams() {
infos = append(infos, info)
}
if len(infos) != test.streamsNum {
t.Fatalf("Invalid number of streams; want: %d; got: %d", test.streamsNum, len(infos))
}
// test the deprecated StreamsInfo()
infos = make([]*nats.StreamInfo, 0)
for info := range js.StreamsInfo() {
infos = append(infos, info)
}
Expand Down Expand Up @@ -1751,7 +1759,7 @@ func TestStreamLister_FilterSubject(t *testing.T) {

// list streams
names = make([]string, 0)
for info := range js.StreamsInfo(nats.StreamListFilter(test.filter)) {
for info := range js.Streams(nats.StreamListFilter(test.filter)) {
names = append(names, info.Config.Name)
}
if !reflect.DeepEqual(names, test.expected) {
Expand Down Expand Up @@ -1797,6 +1805,15 @@ func TestConsumersLister(t *testing.T) {
t.Fatalf("Invalid number of consumer names; want: %d; got: %d", test.consumersNum, len(names))
}
infos := make([]*nats.ConsumerInfo, 0)
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != test.consumersNum {
t.Fatalf("Invalid number of consumers; want: %d; got: %d", test.consumersNum, len(infos))
}

// test the deprecated ConsumersInfo()
infos = make([]*nats.ConsumerInfo, 0)
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
Expand Down Expand Up @@ -4133,7 +4150,7 @@ func TestJetStream_Unsubscribe(t *testing.T) {
t.Helper()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("foo") {
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != expected {
Expand Down Expand Up @@ -4270,7 +4287,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
var infos []*nats.ConsumerInfo
for info := range jsm.ConsumersInfo("foo") {
for info := range jsm.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != expected {
Expand Down

0 comments on commit 79716ec

Please sign in to comment.