From 45b7e7725b19ea24e02d7461896f04f162141691 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sun, 15 Aug 2021 11:18:22 -0600 Subject: [PATCH 1/4] JetStream: lot of changes They will be described in the release notes, but gist: Added: - `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants) - `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation) - Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers - Field `Last` in `SequencePair` Changed: - With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API - If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes - Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5 - Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error Fixed: - Possible lock inversion - JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()` Resolves #785 Resolves #776 Resolves #775 Resolves #748 Resolves #747 Signed-off-by: Ivan Kozlovic --- .travis.yml | 4 +- context.go | 7 +- example_test.go | 8 +- go_test.mod | 2 +- go_test.sum | 6 +- js.go | 612 ++++++++++++-------- js_test.go | 271 ++++++++- nats.go | 128 +++-- norace_test.go | 639 +++++++++++++++++++++ scripts/cov.sh | 8 +- test/js_test.go | 1445 ++++++++++++++--------------------------------- 11 files changed, 1829 insertions(+), 1301 deletions(-) diff --git a/.travis.yml b/.travis.yml index 89c5c11f4..bec2429be 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,5 +15,5 @@ before_script: - find . -type f -name "*.go" | xargs misspell -error -locale US - staticcheck ./... script: -- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast; fi +- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off +- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi diff --git a/context.go b/context.go index 53e5ebb0e..aa8c00ebf 100644 --- a/context.go +++ b/context.go @@ -122,12 +122,7 @@ func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal, } s.mu.Lock() - err := s.validateNextMsgState() - // Unless this is from an internal call, reject use of this API. - // Users should use Fetch() instead. - if err == nil && !pullSubInternal && s.jsi != nil && s.jsi.pull { - err = ErrTypeSubscription - } + err := s.validateNextMsgState(pullSubInternal) if err != nil { s.mu.Unlock() return nil, err diff --git a/example_test.go b/example_test.go index 0143c8deb..73cdbc776 100644 --- a/example_test.go +++ b/example_test.go @@ -323,7 +323,8 @@ func ExampleJetStream() { }, nats.ManualAck()) // Async queue subscription where members load balance the - // received messages together. + // received messages together. Since no consumer name is specified, + // the queue name will be used as a durable name. js.QueueSubscribe("foo", "group", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) @@ -333,7 +334,10 @@ func ExampleJetStream() { msg, _ := sub.NextMsg(2 * time.Second) msg.Ack() - // QueueSubscribe with group or load balancing. + // We can add a member to the group, with this member using + // the synchronous version of the QueueSubscribe. + // Since no consumer name is specified, the queue name will be + // used as a durable name. sub, _ = js.QueueSubscribeSync("foo", "group") msg, _ = sub.NextMsg(2 * time.Second) msg.Ack() diff --git a/go_test.mod b/go_test.mod index a0a0b93ba..3904ad4eb 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.3.4 + github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 5997e8833..32fa570aa 100644 --- a/go_test.sum +++ b/go_test.sum @@ -17,9 +17,9 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.3.4 h1:WcNa6HDFX8gjZPHb8CJ9wxRHEjJSlhWUb/MKb6/mlUY= -github.com/nats-io/nats-server/v2 v2.3.4/go.mod h1:3mtbaN5GkCo/Z5T3nNj0I0/W1fPkKzLiDC6jjWJKp98= -github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178 h1:6/bt9zMGA1D/i3ROeq8GjF8Tig5BVFh4V3gI+qpoWIs= +github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178/go.mod h1:7mTh0KSxKc63xAVop97cFCIGRkWCv6HoX9lMXRSNOhU= +github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/js.go b/js.go index 4309740f5..6740659e2 100644 --- a/js.go +++ b/js.go @@ -96,6 +96,12 @@ const ( hbcThresh = 2 ) +// Types of control messages, so far heartbeat and flow control +const ( + jsCtrlHB = 1 + jsCtrlFC = 2 +) + // JetStream allows persistent messaging through JetStream. type JetStream interface { // Publish publishes a message to JetStream. @@ -766,6 +772,7 @@ func Context(ctx context.Context) ContextOpt { type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` DeliverSubject string `json:"deliver_subject,omitempty"` + DeliverGroup string `json:"deliver_group,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` @@ -795,12 +802,14 @@ type ConsumerInfo struct { NumWaiting int `json:"num_waiting"` NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` + PushBound bool `json:"push_bound,omitempty"` } // SequencePair includes the consumer and stream sequence info from a JetStream consumer. type SequencePair struct { - Consumer uint64 `json:"consumer_seq"` - Stream uint64 `json:"stream_seq"` + Consumer uint64 `json:"consumer_seq"` + Stream uint64 `json:"stream_seq"` + Last *time.Time `json:"last_active,omitempty"` } // nextRequest is for getting next messages for pull based consumers from JetStream. @@ -812,7 +821,6 @@ type nextRequest struct { // jsSub includes JetStream subscription info. type jsSub struct { - mu sync.RWMutex js *js // For pull subscribers, this is the next message subject to send requests to. @@ -823,8 +831,7 @@ type jsSub struct { stream string deliver string pull bool - durable bool - attached bool + dc bool // Delete JS consumer // Ordered consumers ordered bool @@ -835,29 +842,24 @@ type jsSub struct { // Heartbeats and Flow Control handling from push consumers. hbc *time.Timer hbi time.Duration - hbs bool active bool - fc bool cmeta string - fcs map[uint64]string + fcr string + fcd uint64 } -func (jsi *jsSub) unsubscribe(drainMode bool) error { - jsi.mu.Lock() - durable, attached := jsi.durable, jsi.attached - stream, consumer := jsi.stream, jsi.consumer - js := jsi.js - if jsi.hbc != nil { - jsi.hbc.Stop() - jsi.hbc = nil - } - jsi.mu.Unlock() - - if drainMode && (durable || attached) { - // Skip deleting consumer for durables/attached - // consumers when using drain mode. +// Deletes the JS Consumer. +// No connection nor subscription lock must be held on entry. +func (sub *Subscription) deleteConsumer() error { + sub.mu.Lock() + jsi := sub.jsi + if jsi == nil { + sub.mu.Unlock() return nil } + stream, consumer := jsi.stream, jsi.consumer + js := jsi.js + sub.mu.Unlock() return js.DeleteConsumer(stream, consumer) } @@ -875,6 +877,21 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { } // Subscribe will create a subscription to the appropriate stream and consumer. +// +// IMPORTANT NOTES: +// * If Bind() and Durable() options are not specified, the library will +// send a request to the server to create an ephemeral JetStream consumer, +// which will be deleted after an Unsubscribe() or Drain(), or automatically +// by the server after a short period of time after the NATS subscription is +// gone. +// * If Durable() only is specified, the library will attempt to lookup a JetStream +// consumer with this name and if found, will bind to it and not attempt to +// delete it. However, if not found, the library will send a request to create +// such durable JetStream consumer, but will still attempt to delete it after +// an Unsubscribe() or Drain(). +// * If Bind() option is provided, the library will attempt to lookup the +// consumer with the given name, and if the lookup fails, then the Subscribe() +// call will return an error. func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { return nil, ErrBadSubscription @@ -883,12 +900,15 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti } // SubscribeSync will create a sync subscription to the appropriate stream and consumer. +// See important note in Subscribe() func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts) } // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. +// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// See important note in Subscribe() func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { return nil, ErrBadSubscription @@ -897,17 +917,22 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) } // QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics. +// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// See important note in Subscribe() func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, queue, nil, mch, true, false, opts) } // ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel. +// See important note in Subscribe() func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts) } // ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel. +// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// See important note in Subscribe() func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, queue, nil, ch, false, false, opts) } @@ -918,6 +943,55 @@ func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable))) } +func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (string, error) { + ccfg := &info.Config + + // Make sure this new subject matches or is a subset. + if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { + return _EMPTY_, ErrSubjectMismatch + } + + // Prevent binding a subscription against incompatible consumer types. + if isPullMode && ccfg.DeliverSubject != _EMPTY_ { + return _EMPTY_, ErrPullSubscribeToPushConsumer + } else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ { + return _EMPTY_, ErrPullSubscribeRequired + } + + // If pull mode, nothing else to check here. + if isPullMode { + return _EMPTY_, nil + } + + // At this point, we know the user wants push mode, and the JS consumer is + // really push mode. + + dg := info.Config.DeliverGroup + if dg == _EMPTY_ { + // Prevent an user from attempting to create a queue subscription on + // a JS consumer that was not created with a deliver group. + if queue != _EMPTY_ { + return _EMPTY_, fmt.Errorf("cannot create a queue subscription for a consumer without a deliver group") + } else if info.PushBound { + // Need to reject a non queue subscription to a non queue consumer + // if the consumer is already bound. + return _EMPTY_, fmt.Errorf("consumer is already bound to a subscription") + } + } else { + // If the JS consumer has a deliver group, we need to fail a non queue + // subscription attempt: + if queue == _EMPTY_ { + return _EMPTY_, fmt.Errorf("cannot create a subscription for a consumer with a deliver group %q", dg) + } else if queue != dg { + // Here the user's queue group name does not match the one associated + // with the JS consumer. + return _EMPTY_, fmt.Errorf("cannot create a queue subscription %q for a consumer with a deliver group %q", + queue, dg) + } + } + return ccfg.DeliverSubject, nil +} + func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) { cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} o := subOpts{cfg: &cfg} @@ -929,20 +1003,44 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } } - badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy + // Note that these may change based on the consumer info response we may get. hasHeartbeats := o.cfg.Heartbeat > 0 hasFC := o.cfg.FlowControl - if isPullMode && badPullAck { - return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) + + // Some checks for pull subscribers + if isPullMode { + // Check for bad ack policy + if o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy { + return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) + } + // No deliver subject should be provided + if o.cfg.DeliverSubject != _EMPTY_ || o.boundSubject != _EMPTY_ { + return nil, ErrPullSubscribeToPushConsumer + } + } + + // Some check/setting specific to queue subs + if queue != _EMPTY_ { + // Queue subscriber cannot have HB or FC (since messages will be randomly dispatched + // to members). We may in the future have a separate NATS subscription that all members + // would subscribe to and server would send on. + if o.cfg.Heartbeat > 0 || o.cfg.FlowControl { + // Not making this a public ErrXXX in case we allow in the future. + return nil, fmt.Errorf("nats: queue subscription doesn't support idle heartbeat nor flow control") + } + + // If this is a queue subscription and no consumer nor durable name was specified, + // then we will use the queue name as a durable name. + if queue != _EMPTY_ && o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ { + o.cfg.Durable = queue + } } var ( err error shouldCreate bool - ccfg *ConsumerConfig info *ConsumerInfo deliver string - attached bool stream = o.stream consumer = o.consumer isDurable = o.cfg.Durable != _EMPTY_ @@ -951,6 +1049,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, lookupErr bool nc = js.nc nms string + hbi time.Duration + ccreq *createConsumerRequest // In case we need to hold onto it for ordered consumers. ) // Do some quick checks here for ordered consumers. We do these here instead of spread out @@ -990,126 +1090,106 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } hasFC, hasHeartbeats = true, true o.mack = true // To avoid auto-ack wrapping call below. + hbi = o.cfg.Heartbeat } - // In case a consumer has not been set explicitly, then the - // durable name will be used as the consumer name. - if consumer == _EMPTY_ { - consumer = o.cfg.Durable - } - - // Find the stream mapped to the subject if not bound to a stream already. - if o.stream == _EMPTY_ { - stream, err = js.lookupStreamBySubject(subj) - if err != nil { - return nil, err - } + // With a bound subject, we go directly create the NATS subscription + // and skip all lookup/create. + if o.boundSubject != _EMPTY_ { + deliver = o.boundSubject } else { - stream = o.stream - } - - // With an explicit durable name, we can lookup the consumer first - // to which it should be attaching to. - if consumer != _EMPTY_ { - info, err = js.ConsumerInfo(stream, consumer) - notFoundErr = errors.Is(err, ErrConsumerNotFound) - lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded - } - - switch { - case info != nil: - // Attach using the found consumer config. - ccfg = &info.Config - attached = true - - // Make sure this new subject matches or is a subset. - if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { - return nil, ErrSubjectMismatch + // In case a consumer has not been set explicitly, then the + // durable name will be used as the consumer name. + if consumer == _EMPTY_ { + consumer = o.cfg.Durable } - // Prevent binding a subscription against incompatible consumer types. - if isPullMode && ccfg.DeliverSubject != _EMPTY_ { - return nil, ErrPullSubscribeToPushConsumer - } else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ { - return nil, ErrPullSubscribeRequired - } - if ccfg.DeliverSubject != _EMPTY_ { - deliver = ccfg.DeliverSubject - } else if !isPullMode { - deliver = nc.newInbox() - } - case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): - // If the consumer is being bound and we got an error on pull subscribe then allow the error. - if !(isPullMode && lookupErr && consumerBound) { - return nil, err + // Find the stream mapped to the subject if not bound to a stream already. + if o.stream == _EMPTY_ { + stream, err = js.lookupStreamBySubject(subj) + if err != nil { + return nil, err + } + } else { + stream = o.stream } - default: - // Attempt to create consumer if not found nor using Bind. - shouldCreate = true - if o.cfg.DeliverSubject != _EMPTY_ { - deliver = o.cfg.DeliverSubject - } else if !isPullMode { - deliver = nc.newInbox() + + // With an explicit durable name, we can lookup the consumer first + // to which it should be attaching to. + if consumer != _EMPTY_ { + info, err = js.ConsumerInfo(stream, consumer) + notFoundErr = errors.Is(err, ErrConsumerNotFound) + lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded } - } - var sub *Subscription + switch { + case info != nil: + deliver, err = processConsInfo(info, isPullMode, subj, queue) + if err != nil { + return nil, err + } + icfg := &info.Config + hasFC, hbi = icfg.FlowControl, icfg.Heartbeat + hasHeartbeats = hbi > 0 + case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): + // If the consumer is being bound and we got an error on pull subscribe then allow the error. + if !(isPullMode && lookupErr && consumerBound) { + return nil, err + } + default: + // Attempt to create consumer if not found nor using Bind. + shouldCreate = true + if o.cfg.DeliverSubject != _EMPTY_ { + deliver = o.cfg.DeliverSubject + } else if !isPullMode { + deliver = nc.newInbox() + cfg.DeliverSubject = deliver + } - // Check if we are manual ack. - if cb != nil && !o.mack { - ocb := cb - cb = func(m *Msg) { ocb(m); m.Ack() } - } + // Do filtering always, server will clear as needed. + cfg.FilterSubject = subj - // In case we need to hold onto it for ordered consumers. - var ccreq *createConsumerRequest + // Pass the queue to the consumer config + if queue != _EMPTY_ { + cfg.DeliverGroup = queue + } - // If we are creating or updating let's update cfg. - if shouldCreate { - if !isPullMode { - cfg.DeliverSubject = deliver - } - // Do filtering always, server will clear as needed. - cfg.FilterSubject = subj - - // If not set default to ack explicit. - if cfg.AckPolicy == ackPolicyNotSet { - cfg.AckPolicy = AckExplicitPolicy - } - // If we have acks at all and the MaxAckPending is not set go ahead - // and set to the internal max. - // TODO(dlc) - We should be able to update this if client updates PendingLimits. - if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { - if !isPullMode && cb != nil && hasFC { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 - } else if ch != nil { - cfg.MaxAckPending = cap(ch) - } else { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit + // If not set default to ack explicit. + if cfg.AckPolicy == ackPolicyNotSet { + cfg.AckPolicy = AckExplicitPolicy } + // If we have acks at all and the MaxAckPending is not set go ahead + // and set to the internal max. + // TODO(dlc) - We should be able to update this if client updates PendingLimits. + if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { + if !isPullMode && cb != nil && hasFC { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 + } else if ch != nil { + cfg.MaxAckPending = cap(ch) + } else { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit + } + } + // Create request here. + ccreq = &createConsumerRequest{ + Stream: stream, + Config: &cfg, + } + hbi = cfg.Heartbeat } - // Create request here. - ccreq = &createConsumerRequest{ - Stream: stream, - Config: &cfg, - } - } - if isPullMode { - nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) - deliver = nc.newInbox() + if isPullMode { + nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) + deliver = nc.newInbox() + } } jsi := &jsSub{ js: js, stream: stream, consumer: consumer, - durable: isDurable, - attached: attached, deliver: deliver, - hbs: hasHeartbeats, - hbi: o.cfg.Heartbeat, - fc: hasFC, + hbi: hbi, ordered: o.ordered, ccreq: ccreq, dseq: 1, @@ -1118,7 +1198,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, psubj: subj, } - sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi) + // Check if we are manual ack. + if cb != nil && !o.mack { + ocb := cb + cb = func(m *Msg) { ocb(m); m.Ack() } + } + sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi) if err != nil { return nil, err } @@ -1176,9 +1261,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if !isPullMode { cleanUpSub() } - // Multiple subscribers could compete in creating the first consumer - // that will be shared using the same durable name. If this happens, then - // do a lookup of the consumer info and resubscribe using the latest info. if consumer != _EMPTY_ && (strings.Contains(cinfo.Error.Description, `consumer already exists`) || strings.Contains(cinfo.Error.Description, `consumer name already in use`)) { @@ -1187,23 +1269,16 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if err != nil { return nil, err } - ccfg = &info.Config - - // Validate that the original subject does still match. - if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { - return nil, ErrSubjectMismatch + deliver, err = processConsInfo(info, isPullMode, subj, queue) + if err != nil { + return nil, err } - - // Update attached status. - jsi.attached = true - - // Use the deliver subject from latest consumer config to attach. - if info.Config.DeliverSubject != _EMPTY_ { + if !isPullMode { // We can't reuse the channel, so if one was passed, we need to create a new one. if ch != nil { ch = make(chan *Msg, cap(ch)) } - jsi.deliver = info.Config.DeliverSubject + jsi.deliver = deliver // Recreate the subscription here. sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) if err != nil { @@ -1216,10 +1291,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } return nil, fmt.Errorf("nats: %s", cinfo.Error.Description) } - } else if consumer == _EMPTY_ { - // Update our consumer name here which is filled in when we create the consumer. + } else { + // Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain() sub.mu.Lock() - sub.jsi.consumer = info.Name + sub.jsi.dc = true + // If this is an ephemeral, we did not have a consumer name, we get it from the info + // after the AddConsumer returns. + if consumer == _EMPTY_ { + sub.jsi.consumer = info.Name + } sub.mu.Unlock() } } @@ -1256,23 +1336,36 @@ func (ecs *ErrConsumerSequenceMismatch) Error() string { ) } -// isControlMessage will return true if this is an empty control status message. -func isControlMessage(msg *Msg) bool { - return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg +// isJSControlMessage will return true if this is an empty control status message +// and indicate what type of control message it is, say jsCtrlHB or jsCtrlFC +func isJSControlMessage(msg *Msg) (bool, int) { + if len(msg.Data) > 0 || msg.Header.Get(statusHdr) != controlMsg { + return false, 0 + } + val := msg.Header.Get(descrHdr) + if strings.HasPrefix(val, "Idle") { + return true, jsCtrlHB + } + if strings.HasPrefix(val, "Flow") { + return true, jsCtrlFC + } + return true, 0 } -func (jsi *jsSub) trackSequences(reply string) { - jsi.mu.Lock() - jsi.cmeta = reply - jsi.mu.Unlock() +// Keeps track of the incoming message's reply subject so that the consumer's +// state (deliver sequence, etc..) can be checked against heartbeats. +// Runs under the subscription lock +func (sub *Subscription) trackSequences(reply string) { + sub.jsi.cmeta = reply } // Check to make sure messages are arriving in order. // Returns true if the sub had to be replaced. Will cause upper layers to return. +// The caller has verified that sub.jsi != nil and that this is not a control message. // Lock should be held. func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { // Ignore msgs with no reply like HBs and flowcontrol, they are handled elsewhere. - if m.Reply == _EMPTY_ || sub.jsi == nil || isControlMessage(m) { + if m.Reply == _EMPTY_ { return false } @@ -1281,19 +1374,15 @@ func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { if err != nil { return false } - sseq, dseq := uint64(parseNum(tokens[5])), uint64(parseNum(tokens[6])) + sseq, dseq := uint64(parseNum(tokens[ackStreamSeqTokenPos])), uint64(parseNum(tokens[ackConsumerSeqTokenPos])) jsi := sub.jsi - jsi.mu.Lock() if dseq != jsi.dseq { - rseq := jsi.sseq + 1 - jsi.mu.Unlock() - sub.resetOrderedConsumer(rseq) + sub.resetOrderedConsumer(jsi.sseq + 1) return true } // Update our tracking here. jsi.dseq, jsi.sseq = dseq+1, sseq - jsi.mu.Unlock() return false } @@ -1322,8 +1411,7 @@ func (sub *Subscription) applyNewSID() (osid int64) { // Lock should be held. func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc := sub.conn - closed := sub.closed - if sub.jsi == nil || nc == nil || closed { + if sub.jsi == nil || nc == nil || sub.closed { return } @@ -1334,8 +1422,8 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { newDeliver := nc.newInbox() sub.Subject = newDeliver - // Snapshot jsi under sub lock here. - jsi := sub.jsi + // Snapshot the new sid under sub lock. + nsid := sub.sid // We are still in the low level readloop for the connection so we need // to spin a go routine to try to create the new consumer. @@ -1345,7 +1433,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { // This is done here in this go routine to prevent lock inversion. nc.mu.Lock() nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_)) - nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, sub.sid)) + nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid)) nc.kickFlusher() nc.mu.Unlock() @@ -1354,11 +1442,12 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc.unsubscribe(sub, 0, true) } - jsi.mu.Lock() + sub.mu.Lock() + jsi := sub.jsi // Reset some items in jsi. jsi.dseq = 1 jsi.cmeta = _EMPTY_ - jsi.fcs = nil + jsi.fcr, jsi.fcd = _EMPTY_, 0 jsi.deliver = newDeliver // Reset consumer request for starting policy. cfg := jsi.ccreq.Config @@ -1369,7 +1458,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream) j, err := json.Marshal(jsi.ccreq) js := jsi.js - jsi.mu.Unlock() + sub.mu.Unlock() if err != nil { pushErr(err) @@ -1397,86 +1486,70 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { return } - jsi.mu.Lock() + sub.mu.Lock() jsi.consumer = cinfo.Name - jsi.mu.Unlock() + sub.mu.Unlock() }() } // checkForFlowControlResponse will check to see if we should send a flow control response -// based on the delivered index. -// Lock should be held. -func (sub *Subscription) checkForFlowControlResponse(delivered uint64) { - jsi, nc := sub.jsi, sub.conn - if jsi == nil { - return - } - - jsi.mu.Lock() - defer jsi.mu.Unlock() - - if len(jsi.fcs) == 0 { - return - } - - if reply := jsi.fcs[delivered]; reply != _EMPTY_ { - delete(jsi.fcs, delivered) - nc.Publish(reply, nil) +// based on the subscription current delivered index and the target. +// Runs under subscription lock +func (sub *Subscription) checkForFlowControlResponse() string { + // Caller has verified that there is a sub.jsi and fc + jsi := sub.jsi + if jsi.fcd == sub.delivered { + fcr := jsi.fcr + jsi.fcr, jsi.fcd = _EMPTY_, 0 + return fcr } + return _EMPTY_ } // Record an inbound flow control message. -func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) { - jsi.mu.Lock() - if jsi.fcs == nil { - jsi.fcs = make(map[uint64]string) - } - jsi.fcs[dfuture] = reply - jsi.mu.Unlock() +// Runs under subscription lock +func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) { + jsi := sub.jsi + jsi.fcr, jsi.fcd = reply, dfuture } // Checks for activity from our consumer. // If we do not think we are active send an async error. func (sub *Subscription) activityCheck() { + sub.mu.Lock() jsi := sub.jsi if jsi == nil { + sub.mu.Unlock() return } - jsi.mu.Lock() active := jsi.active jsi.hbc.Reset(jsi.hbi) jsi.active = false - jsi.mu.Unlock() - - if !active { - sub.mu.Lock() - nc := sub.conn - closed := sub.closed - sub.mu.Unlock() + nc := sub.conn + closed := sub.closed + sub.mu.Unlock() - if !closed { - nc.mu.Lock() - errCB := nc.Opts.AsyncErrorCB - if errCB != nil { - nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) - } - nc.mu.Unlock() + if !active && !closed { + nc.mu.Lock() + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { + nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) } + nc.mu.Unlock() } } // scheduleHeartbeatCheck sets up the timer check to make sure we are active // or receiving idle heartbeats.. func (sub *Subscription) scheduleHeartbeatCheck() { + sub.mu.Lock() + defer sub.mu.Unlock() + jsi := sub.jsi if jsi == nil { return } - jsi.mu.Lock() - defer jsi.mu.Unlock() - if jsi.hbc == nil { jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck) } else { @@ -1497,10 +1570,10 @@ func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) { // checkForSequenceMismatch will make sure we have not missed any messages since last seen. func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) { // Process heartbeat received, get latest control metadata if present. - jsi.mu.Lock() + s.mu.Lock() ctrl, ordered := jsi.cmeta, jsi.ordered jsi.active = true - jsi.mu.Unlock() + s.mu.Unlock() if ctrl == _EMPTY_ { return @@ -1513,7 +1586,7 @@ func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) // Consumer sequence. var ldseq string - dseq := tokens[6] + dseq := tokens[ackConsumerSeqTokenPos] hdr := msg.Header[lastConsumerSeqHdr] if len(hdr) == 1 { ldseq = hdr[0] @@ -1524,7 +1597,7 @@ func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) if ldseq != dseq { // Dispatch async error including details such as // from where the consumer could be restarted. - sseq := parseNum(tokens[5]) + sseq := parseNum(tokens[ackStreamSeqTokenPos]) if ordered { s.mu.Lock() s.resetOrderedConsumer(jsi.sseq + 1) @@ -1585,6 +1658,10 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool + // For specifying simply the subject the NATS susbcription should + // be created on. No stream or consumer name lookup/creation will + // be done. + boundSubject string } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. @@ -1729,6 +1806,13 @@ func RateLimit(n uint64) SubOpt { } // BindStream binds a consumer to a stream explicitly based on a name. +// When a stream name is not specified, the library uses the subscribe +// subject as a way to find the stream name. It is done by making a request +// to the server to get list of stream names that have a fileter for this +// subject. If the returned list contains a single stream, then this +// stream name will be used, otherwise the `ErrNoMatchingStream` is returned. +// To avoid the stream lookup, provide the stream name with this function. +// See also `Bind()`. func BindStream(stream string) SubOpt { return subOptFn(func(opts *subOpts) error { if opts.stream != _EMPTY_ && opts.stream != stream { @@ -1782,6 +1866,32 @@ func IdleHeartbeat(duration time.Duration) SubOpt { }) } +// DeliverSubject specifies the JetStream consumer deliver subject. +// This applies only in cases where the no consumer exists and it will be +// created by the library by the subscribe API. +// If a consumer exists, then the NATS subscription will be created to +// the JetStream consumer's DeliverSubject, not necessarily this subject. +func DeliverSubject(subject string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverSubject = subject + return nil + }) +} + +// BindDeliverSubject specifies the deliver subject of a JetStream consumer +// that the subscription should subscribe to. +// +// NOTE: This is an "expert" API and should only be used when consumer lookup or +// creation by the library is not possible (for instance cross accounts). +// Since no lookup of the JetStream consumer is done, there is no way for +// the library to check the validity of this JetStream subscription. +func BindDeliverSubject(subject string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.boundSubject = subject + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -2154,13 +2264,28 @@ type MsgMetadata struct { Timestamp time.Time Stream string Consumer string + Domain string } +const ( + ackDomainTokenPos = 2 + ackAccHashTokenPos = 3 + ackStreamTokenPos = 4 + ackConsumerTokenPos = 5 + ackNumDeliveredTokenPos = 6 + ackStreamSeqTokenPos = 7 + ackConsumerSeqTokenPos = 8 + ackTimestampSeqTokenPos = 9 + ackNumPendingTokenPos = 10 +) + func getMetadataFields(subject string) ([]string, error) { - const expectedTokens = 9 - const btsep = '.' + const noDomainNoHashsExpectedAckTokens = 9 + const withDomainNoHashExpectedAckTokens = 10 + const withDomainAndHashExpectedAckTokens = 11 - tsa := [expectedTokens]string{} + const btsep = '.' + tsa := [withDomainAndHashExpectedAckTokens]string{} start, tokens := 0, tsa[:0] for i := 0; i < len(subject); i++ { if subject[i] == btsep { @@ -2169,9 +2294,39 @@ func getMetadataFields(subject string) ([]string, error) { } } tokens = append(tokens, subject[start:]) - if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { + // + // Newer server will include an account hash in the subject, and possibly the domain. + // So the subject could be: + // + // no domain: $JS.ACK...... + // with domain: $JS.ACK....... + // + // So old server number of tokens is 9, newer is 10 or 11. + // + l := len(tokens) + if l < noDomainNoHashsExpectedAckTokens || l > withDomainAndHashExpectedAckTokens { return nil, ErrNotJSMessage } + if tokens[0] != "$JS" || tokens[1] != "ACK" { + return nil, ErrNotJSMessage + } + // To make the rest of the library agnostic of that, we always return the tokens + // as if it is coming from a new server will all possible tokens. If domain or account + // hash are not specified, the tokens at those locations will simply be empty. + if l == noDomainNoHashsExpectedAckTokens || l == withDomainNoHashExpectedAckTokens { + // Extend the array (we know the backend is big enough) + // Compute how many tokens we need to insert. + itc := withDomainAndHashExpectedAckTokens - l + for i := 0; i < itc; i++ { + tokens = append(tokens, _EMPTY_) + } + // Move to the right anything that is after "ACK" token. + copy(tokens[ackDomainTokenPos+itc:], tokens[ackDomainTokenPos:]) + // Set the missing tokens to empty + for i := 0; i < itc; i++ { + tokens[ackDomainTokenPos+i] = _EMPTY_ + } + } return tokens, nil } @@ -2188,14 +2343,15 @@ func (m *Msg) Metadata() (*MsgMetadata, error) { } meta := &MsgMetadata{ - NumDelivered: uint64(parseNum(tokens[4])), - NumPending: uint64(parseNum(tokens[8])), - Timestamp: time.Unix(0, parseNum(tokens[7])), - Stream: tokens[2], - Consumer: tokens[3], - } - meta.Sequence.Stream = uint64(parseNum(tokens[5])) - meta.Sequence.Consumer = uint64(parseNum(tokens[6])) + Domain: tokens[ackDomainTokenPos], + NumDelivered: uint64(parseNum(tokens[ackNumDeliveredTokenPos])), + NumPending: uint64(parseNum(tokens[ackNumPendingTokenPos])), + Timestamp: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])), + Stream: tokens[ackStreamTokenPos], + Consumer: tokens[ackConsumerTokenPos], + } + meta.Sequence.Stream = uint64(parseNum(tokens[ackStreamSeqTokenPos])) + meta.Sequence.Consumer = uint64(parseNum(tokens[ackConsumerSeqTokenPos])) return meta, nil } diff --git a/js_test.go b/js_test.go index bad65cc62..b268c56ae 100644 --- a/js_test.go +++ b/js_test.go @@ -20,8 +20,11 @@ package nats import ( "crypto/sha256" "encoding/base64" + "fmt" + "io/ioutil" "math/rand" "os" + "reflect" "strings" "sync" "sync/atomic" @@ -39,6 +42,25 @@ func RunBasicJetStreamServer() *server.Server { return natsserver.RunServer(&opts) } +func RunServerWithConfig(configFile string) (*server.Server, *server.Options) { + return natsserver.RunServerWithConfig(configFile) +} + +func createConfFile(t *testing.T, content []byte) string { + t.Helper() + conf, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("Error creating conf file: %v", err) + } + fName := conf.Name() + conf.Close() + if err := ioutil.WriteFile(fName, content, 0666); err != nil { + os.Remove(fName) + t.Fatalf("Error writing conf file: %v", err) + } + return fName +} + // Need access to internals for loss testing. func TestJetStreamOrderedConsumer(t *testing.T) { s := RunBasicJetStreamServer() @@ -344,9 +366,9 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) { testSubError(deleteConsumer) } -// We want to make sure we do the right thing with lots of concurrent durable consumer requests. +// We want to make sure we do the right thing with lots of concurrent queue durable consumer requests. // One should win and the others should share the delivery subject with the first one who wins. -func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) { +func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -382,7 +404,7 @@ func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - sub, _ := js.SubscribeSync("foo", Durable("dlc")) + sub, _ := js.QueueSubscribeSync("foo", "bar") subs <- sub }() } @@ -398,16 +420,30 @@ func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) { t.Fatalf("Expected exactly one consumer, got %d", si.State.Consumers) } - // Now send one message and make sure all subs get it. - js.Publish("foo", []byte("Hello")) - time.Sleep(250 * time.Millisecond) // Allow time for delivery. + // Now send some messages and make sure they are distributed. + total := 1000 + for i := 0; i < total; i++ { + js.Publish("foo", []byte("Hello")) + } - for sub := range subs { - pending, _, _ := sub.Pending() - if pending != 1 { - t.Fatalf("Expected each durable to receive 1 msg, this sub got %d", pending) + timeout := time.Now().Add(2 * time.Second) + got := 0 + for time.Now().Before(timeout) { + got = 0 + for sub := range subs { + pending, _, _ := sub.Pending() + // If a single sub has the total, then probably something is not right. + if pending == total { + t.Fatalf("A single member should not have gotten all messages") + } + got += pending + } + if got == total { + // We are done! + return } } + t.Fatalf("Expected %v messages, got only %v", total, got) } func TestJetStreamSubscribeReconnect(t *testing.T) { @@ -495,3 +531,218 @@ func TestJetStreamSubscribeReconnect(t *testing.T) { // Make sure we can send and receive the msg sendAndReceive("msg2") } + +func TestJetStreamAckTokens(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create the stream using our client API. + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.SubscribeSync("foo") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + now := time.Now() + for _, test := range []struct { + name string + expected *MsgMetadata + str string + err bool + }{ + { + "valid token size but not js ack", + nil, + "one.two.stream.consumer.1.2.3.4.5", + true, + }, + { + "valid token size but not js ack", + nil, + "one.two.hash.stream.consumer.1.2.3.4.5", + true, + }, + { + "valid token size but not js ack", + nil, + "one.two.domain.hash.stream.consumer.1.2.3.4.5", + true, + }, + { + "invalid token size", + nil, + "$JS.ACK.stream.consumer.1.2.3.4", + true, + }, + { + "invalid token size", + nil, + "$JS.ACK.domain.hash.stream.consumer.1.2.3.4.5.6", + true, + }, + { + "no domain no hash", + &MsgMetadata{ + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "", + false, + }, + { + "no domain with hash", + &MsgMetadata{ + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "ACCHASH.", + false, + }, + { + "with domain with hash", + &MsgMetadata{ + Domain: "HUB", + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "HUB.ACCHASH.", + false, + }, + } { + t.Run(test.name, func(t *testing.T) { + msg := NewMsg("foo") + msg.Sub = sub + if test.err { + msg.Reply = test.str + } else { + msg.Reply = fmt.Sprintf("$JS.ACK.%sTEST.cons.1.2.3.%v.4", test.str, now.UnixNano()) + } + + meta, err := msg.Metadata() + if test.err { + if err == nil || meta != nil { + t.Fatalf("Expected error for content: %q, got meta=%+v err=%v", test.str, meta, err) + } + // Expected error, we are done + return + } + if err != nil { + t.Fatalf("Expected: %+v with reply: %q, got error %v", test.expected, msg.Reply, err) + } + if meta.Timestamp.UnixNano() != now.UnixNano() { + t.Fatalf("Timestamp is bad: %v vs %v", now.UnixNano(), meta.Timestamp.UnixNano()) + } + meta.Timestamp = time.Time{} + test.expected.Timestamp = time.Time{} + if !reflect.DeepEqual(test.expected, meta) { + t.Fatalf("Expected %+v, got %+v", test.expected, meta) + } + }) + } +} + +func TestJetStreamFlowControlStalled(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"a"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.SubscribeSync("a", + DeliverSubject("ds"), + Durable("dur"), + IdleHeartbeat(200*time.Millisecond), + EnableFlowControl()); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + // Drop all incoming FC control messages. + fcLoss := func(m *Msg) *Msg { + if _, ctrlType := isJSControlMessage(m); ctrlType == jsCtrlFC { + return nil + } + return m + } + nc.addMsgFilter("ds", fcLoss) + + // Have a subscription on the FC subject to make sure that the library + // respond to the requests for un-stall + checkSub, err := nc.SubscribeSync("$JS.FC.>") + if err != nil { + t.Fatalf("Error on sub: %v", err) + } + + // Publish bunch of messages. + payload := make([]byte, 1024) + for i := 0; i < 250; i++ { + nc.Publish("a", payload) + } + + // Now wait that we respond to a stalled FC + if _, err := checkSub.NextMsg(2 * time.Second); err != nil { + t.Fatal("Library did not send FC") + } +} diff --git a/nats.go b/nats.go index 093a11de3..f04817e85 100644 --- a/nats.go +++ b/nats.go @@ -2574,21 +2574,22 @@ func (nc *Conn) waitForMsgs(s *Subscription) { mcb := s.mcb max = s.max closed = s.closed + var fcReply string if !s.closed { s.delivered++ delivered = s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 + fcReply = s.checkForFlowControlResponse() s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) - } } } s.mu.Unlock() + // Respond to flow control if applicable + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if closed { break } @@ -2688,8 +2689,8 @@ func (nc *Conn) processMsg(data []byte) { var h Header var err error var ctrlMsg bool - var hasFC bool - var hasHBs bool + var ctrlType int + var fcReply string if nc.ps.ma.hdr > 0 { hbuf := msgPayload[:nc.ps.ma.hdr] @@ -2728,9 +2729,18 @@ func (nc *Conn) processMsg(data []byte) { // Skip flow control messages in case of using a JetStream context. jsi := sub.jsi if jsi != nil { - ctrlMsg, hasHBs, hasFC = isControlMessage(m), jsi.hbs, jsi.fc + // There has to be a header for it to be a control message. + if h != nil { + ctrlMsg, ctrlType = isJSControlMessage(m) + if ctrlMsg && ctrlType == jsCtrlHB { + // Check if the hearbeat has a "Consumer Stalled" header, if + // so, the value is the FC reply to send a nil message to. + // We will send it at the end of this function. + fcReply = m.Header.Get(consumerStalledHdr) + } + } // Check for ordered consumer here. If checkOrdered returns true that means it detected a gap. - if jsi.ordered && sub.checkOrderedMsgs(m) { + if !ctrlMsg && jsi.ordered && sub.checkOrderedMsgs(m) { sub.mu.Unlock() return } @@ -2777,19 +2787,19 @@ func (nc *Conn) processMsg(data []byte) { sub.pTail = m } } - if jsi != nil && hasHBs { + if jsi != nil { // Store the ACK metadata from the message to // compare later on with the received heartbeat. - jsi.trackSequences(m.Reply) + sub.trackSequences(m.Reply) } - } else if hasFC && m.Reply != _EMPTY_ { + } else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ { // This is a flow control message. // If we have no pending, go ahead and send in place. if sub.pMsgs <= 0 { - nc.Publish(m.Reply, nil) + fcReply = m.Reply } else { // Schedule a reply after the previous message is delivered. - jsi.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) + sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) } } @@ -2797,8 +2807,12 @@ func (nc *Conn) processMsg(data []byte) { sub.sc = false sub.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + // Handle control heartbeat messages. - if ctrlMsg && hasHBs && m.Reply == _EMPTY_ { + if ctrlMsg && ctrlType == jsCtrlHB && m.Reply == _EMPTY_ { nc.checkForSequenceMismatch(m, sub, jsi) } @@ -3161,6 +3175,7 @@ const ( descrHdr = "Description" lastConsumerSeqHdr = "Nats-Last-Consumer" lastStreamSeqHdr = "Nats-Last-Stream" + consumerStalledHdr = "Nats-Consumer-Stalled" noResponders = "503" noMessagesSts = "404" reqTimeoutSts = "408" @@ -3808,6 +3823,12 @@ func (nc *Conn) removeSub(s *Subscription) { } s.mch = nil + // If JS subscription then stop HB timer. + if jsi := s.jsi; jsi != nil && jsi.hbc != nil { + jsi.hbc.Stop() + jsi.hbc = nil + } + // Mark as invalid s.closed = true if s.pCond != nil { @@ -3857,6 +3878,15 @@ func (s *Subscription) IsValid() bool { // Drain will remove interest but continue callbacks until all messages // have been processed. +// +// For a JetStream subscription, if the library has created the JetStream +// consumer, the library will send a DeleteConsumer request to the server +// when the Drain operation completes. If a failure occurs when deleting +// the JetStream consumer, an error will be reported to the asynchronous +// error callback. +// If you do not wish the JetStream consumer to be automatically deleted, +// ensure that the consumer is not created by the library, which means +// create the consumer with AddConsumer and bind to this consumer. func (s *Subscription) Drain() error { if s == nil { return ErrBadSubscription @@ -3871,6 +3901,14 @@ func (s *Subscription) Drain() error { } // Unsubscribe will remove interest in the given subject. +// +// For a JetStream subscription, if the library has created the JetStream +// consumer, it will send a DeleteConsumer request to the server (if the +// unsubscribe itself was successful). If the delete operation fails, the +// error will be returned. +// If you do not wish the JetStream consumer to be automatically deleted, +// ensure that the consumer is not created by the library, which means +// create the consumer with AddConsumer and bind to this consumer. func (s *Subscription) Unsubscribe() error { if s == nil { return ErrBadSubscription @@ -3878,6 +3916,7 @@ func (s *Subscription) Unsubscribe() error { s.mu.Lock() conn := s.conn closed := s.closed + dc := s.jsi != nil && s.jsi.dc s.mu.Unlock() if conn == nil || conn.IsClosed() { return ErrConnectionClosed @@ -3888,7 +3927,11 @@ func (s *Subscription) Unsubscribe() error { if conn.IsDraining() { return ErrConnectionDraining } - return conn.unsubscribe(s, 0, false) + err := conn.unsubscribe(s, 0, false) + if err == nil && dc { + err = s.deleteConsumer() + } + return err } // checkDrained will watch for a subscription to be fully drained @@ -3902,6 +3945,12 @@ func (nc *Conn) checkDrained(sub *Subscription) { // is correct and the server will not send additional information. nc.Flush() + sub.mu.Lock() + // For JS subscriptions, check if we are going to delete the + // JS consumer when drain completes. + dc := sub.jsi != nil && sub.jsi.dc + sub.mu.Unlock() + // Once we are here we just wait for Pending to reach 0 or // any other state to exit this go routine. for { @@ -3921,6 +3970,15 @@ func (nc *Conn) checkDrained(sub *Subscription) { nc.mu.Lock() nc.removeSub(sub) nc.mu.Unlock() + if dc { + if err := sub.deleteConsumer(); err != nil { + nc.mu.Lock() + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { + nc.ach.push(func() { errCB(nc, sub, err) }) + } + nc.mu.Unlock() + } + } return } @@ -3959,18 +4017,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { sub.mu.Unlock() } - // For JetStream consumers, need to clean up ephemeral consumers - // or delete durable ones if called with Unsubscribe. - sub.mu.Lock() - jsi := sub.jsi - sub.mu.Unlock() - if jsi != nil && maxStr == _EMPTY_ { - err := jsi.unsubscribe(drainMode) - if err != nil { - return err - } - } - nc.mu.Lock() // ok here, but defer is expensive defer nc.mu.Unlock() @@ -4014,7 +4060,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { } s.mu.Lock() - err := s.validateNextMsgState() + err := s.validateNextMsgState(false) if err != nil { s.mu.Unlock() return nil, err @@ -4065,7 +4111,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { // validateNextMsgState checks whether the subscription is in a valid // state to call NextMsg and be delivered another message synchronously. // This should be called while holding the lock. -func (s *Subscription) validateNextMsgState() error { +func (s *Subscription) validateNextMsgState(pullSubInternal bool) error { if s.connClosed { return ErrConnectionClosed } @@ -4083,7 +4129,11 @@ func (s *Subscription) validateNextMsgState() error { s.sc = false return ErrSlowConsumer } - + // Unless this is from an internal call, reject use of this API. + // Users should use Fetch() instead. + if !pullSubInternal && s.jsi != nil && s.jsi.pull { + return ErrTypeSubscription + } return nil } @@ -4108,17 +4158,13 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { nc := s.conn max := s.max + var fcReply string // Update some stats. s.delivered++ delivered := s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 + fcReply = s.checkForFlowControlResponse() s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) - } } if s.typ == SyncSubscription { @@ -4127,6 +4173,10 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { } s.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if max > 0 { if delivered > max { return ErrMaxMessages @@ -4729,6 +4779,8 @@ func (nc *Conn) drainConnection() { // will be drained and can not publish any additional messages. Upon draining // of the publishers, the connection will be closed. Use the ClosedCB() // option to know when the connection has moved from draining to closed. +// +// See note in Subscription.Drain for JetStream subscriptions. func (nc *Conn) Drain() error { nc.mu.Lock() if nc.isClosed() { diff --git a/norace_test.go b/norace_test.go index f704de832..5864a113c 100644 --- a/norace_test.go +++ b/norace_test.go @@ -16,7 +16,10 @@ package nats import ( + "context" + "fmt" "os" + "strings" "testing" "time" ) @@ -175,3 +178,639 @@ func TestNoRaceJetStreamConsumerSlowConsumer(t *testing.T) { case <-done: } } + +func TestNoRaceJetStreamPushFlowControlHeartbeats_SubscribeSync(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errHandler := ErrorHandler(func(c *Conn, sub *Subscription, err error) { + t.Logf("WARN: %s", err) + }) + + nc, err := Connect(s.ClientURL(), errHandler) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Burst and try to hit the flow control limit of the server. + const totalMsgs = 16536 + payload := strings.Repeat("A", 1024) + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("i:%d/", i)+payload)); err != nil { + t.Fatal(err) + } + } + + hbTimer := 100 * time.Millisecond + sub, err := js.SubscribeSync("foo", + AckWait(30*time.Second), + MaxDeliver(1), + EnableFlowControl(), + IdleHeartbeat(hbTimer), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + if info.Config.Heartbeat != hbTimer { + t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) + } + + m, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + meta, err := m.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending > totalMsgs { + t.Logf("WARN: More pending messages than expected (%v), got: %v", totalMsgs, meta.NumPending) + } + err = m.Ack() + if err != nil { + t.Fatal(err) + } + + recvd := 1 + timeout := time.Now().Add(10 * time.Second) + for time.Now().Before(timeout) { + m, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + if len(m.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + if err := m.AckSync(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + recvd++ + + if recvd == totalMsgs { + break + } + } + + t.Run("idle heartbeats", func(t *testing.T) { + // Delay to get a few heartbeats. + time.Sleep(4 * hbTimer) + + timeout = time.Now().Add(5 * time.Second) + for time.Now().Before(timeout) { + msg, err := sub.NextMsg(200 * time.Millisecond) + if err != nil { + if err == ErrTimeout { + // If timeout, ok to stop checking for the test. + break + } + t.Fatal(err) + } + if len(msg.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + recvd++ + meta, err := msg.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending == 0 { + break + } + } + if recvd > totalMsgs { + t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) + } + }) + + t.Run("with context", func(t *testing.T) { + sub, err := js.SubscribeSync("foo", + AckWait(30*time.Second), + Durable("bar"), + EnableFlowControl(), + IdleHeartbeat(hbTimer), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err = sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + + recvd = 0 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + default: + } + + m, err := sub.NextMsgWithContext(ctx) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + if len(m.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + if err := m.Ack(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + recvd++ + + if recvd >= totalMsgs { + break + } + } + + // Delay to get a few heartbeats. + time.Sleep(4 * hbTimer) + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + FOR_LOOP: + for { + select { + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + break FOR_LOOP + } + default: + } + + msg, err := sub.NextMsgWithContext(ctx) + if err != nil { + if err == context.DeadlineExceeded { + break + } + t.Fatal(err) + } + if len(msg.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + recvd++ + meta, err := msg.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending == 0 { + break + } + } + if recvd > totalMsgs { + t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) + } + }) +} + +func TestNoRaceJetStreamPushFlowControlHeartbeats_SubscribeAsync(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Burst and try to hit the flow control limit of the server. + const totalMsgs = 16536 + payload := strings.Repeat("A", 1024) + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(payload)); err != nil { + t.Fatal(err) + } + } + + recvd := make(chan *Msg, totalMsgs) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + errCh := make(chan error) + hbTimer := 100 * time.Millisecond + sub, err := js.Subscribe("foo", func(msg *Msg) { + if len(msg.Data) == 0 { + errCh <- fmt.Errorf("Unexpected empty message: %+v", msg) + } + recvd <- msg + + if len(recvd) == totalMsgs { + cancel() + } + }, EnableFlowControl(), IdleHeartbeat(hbTimer)) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + if info.Config.Heartbeat != hbTimer { + t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) + } + + <-ctx.Done() + + got := len(recvd) + expected := totalMsgs + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + // Wait for a couple of heartbeats to arrive and confirm there is no error. + select { + case <-time.After(1 * time.Second): + case err := <-errCh: + t.Fatal(err) + } +} + +func TestNoRaceJetStreamPushFlowControlHeartbeats_ChanSubscribe(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errHandler := ErrorHandler(func(c *Conn, sub *Subscription, err error) { + t.Logf("WARN: %s : %v", err, sub.Subject) + }) + + nc, err := Connect(s.ClientURL(), errHandler) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Burst and try to hit the flow control limit of the server. + const totalMsgs = 16536 + payload := strings.Repeat("A", 1024) + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("i:%d/", i)+payload)); err != nil { + t.Fatal(err) + } + } + + hbTimer := 100 * time.Millisecond + mch := make(chan *Msg, 16536) + sub, err := js.ChanSubscribe("foo", mch, + AckWait(30*time.Second), + MaxDeliver(1), + EnableFlowControl(), + IdleHeartbeat(hbTimer), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + if info.Config.Heartbeat != hbTimer { + t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) + } + + getNextMsg := func(mch chan *Msg, timeout time.Duration) (*Msg, error) { + t.Helper() + select { + case m := <-mch: + return m, nil + case <-time.After(timeout): + return nil, ErrTimeout + } + } + + m, err := getNextMsg(mch, 1*time.Second) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + meta, err := m.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending > totalMsgs { + t.Logf("WARN: More pending messages than expected (%v), got: %v", totalMsgs, meta.NumPending) + } + err = m.Ack() + if err != nil { + t.Fatal(err) + } + + recvd := 1 + ctx, done := context.WithTimeout(context.Background(), 10*time.Second) + defer done() + +Loop: + for { + select { + case <-ctx.Done(): + break Loop + case m := <-mch: + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + if len(m.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + if err := m.Ack(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + recvd++ + + if recvd == totalMsgs { + done() + } + } + } + + t.Run("idle heartbeats", func(t *testing.T) { + // Delay to get a few heartbeats. + time.Sleep(4 * hbTimer) + + ctx, done := context.WithTimeout(context.Background(), 1*time.Second) + defer done() + Loop: + for { + select { + case <-ctx.Done(): + break Loop + case msg := <-mch: + if err != nil { + if err == ErrTimeout { + // If timeout, ok to stop checking for the test. + break Loop + } + t.Fatal(err) + } + if len(msg.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + recvd++ + meta, err := msg.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending == 0 { + break Loop + } + } + } + if recvd > totalMsgs { + t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) + } + }) +} + +func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errCh := make(chan error) + errHandler := ErrorHandler(func(c *Conn, sub *Subscription, err error) { + errCh <- err + }) + nc, err := Connect(s.ClientURL(), errHandler) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + const totalMsgs = 10_000 + + js, err := nc.JetStream(PublishAsyncMaxPending(totalMsgs)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + go func() { + payload := strings.Repeat("O", 4096) + for i := 0; i < totalMsgs; i++ { + js.PublishAsync("foo", []byte(payload)) + } + }() + + // Small channel that blocks and then buffered channel that can deliver all + // messages without blocking. + recvd := make(chan *Msg, 64) + delivered := make(chan *Msg, totalMsgs) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + // Dispatch channel consumer + go func() { + for m := range recvd { + select { + case <-ctx.Done(): + return + default: + } + + delivered <- m + if len(delivered) == totalMsgs { + cancel() + } + } + }() + + sub, err := js.Subscribe("foo", func(msg *Msg) { + // Cause bottleneck by having channel block when full + // because of work taking long. + recvd <- msg + }, EnableFlowControl()) + + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + // Set this lower then normal to make sure we do not exceed bytes pending with FC turned on. + sub.SetPendingLimits(totalMsgs, 1024*1024) // This matches server window for flowcontrol. + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + <-ctx.Done() + + got := len(delivered) + expected := totalMsgs + if got != expected { + t.Errorf("Expected %d messages, got: %d", expected, got) + } + + // Wait for a couple of heartbeats to arrive and confirm there is no error. + select { + case <-time.After(1 * time.Second): + case err := <-errCh: + t.Errorf("error handler: %v", err) + } +} + +func TestNoRaceJetStreamChanSubscribeStall(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + jetstream: enabled + no_auth_user: pc + accounts: { + JS: { + jetstream: enabled + users: [ {user: pc, password: foo} ] + }, + } + `)) + defer os.Remove(conf) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create a stream. + if _, err = js.AddStream(&StreamConfig{Name: "STALL"}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.StreamInfo("STALL") + if err != nil { + t.Fatalf("stream lookup failed: %v", err) + } + + msg := []byte(strings.Repeat("A", 512)) + toSend := 100_000 + for i := 0; i < toSend; i++ { + // Use plain NATS here for speed. + nc.Publish("STALL", msg) + } + nc.Flush() + + batch := 100 + msgs := make(chan *Msg, batch-2) + sub, err := js.ChanSubscribe("STALL", msgs, + Durable("dlc"), + EnableFlowControl(), + MaxAckPending(batch-2), + ) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + for received := 0; received < toSend; { + select { + case m := <-msgs: + received++ + meta, _ := m.Metadata() + if meta.Sequence.Consumer != uint64(received) { + t.Fatalf("Missed something, wanted %d but got %d", received, meta.Sequence.Consumer) + } + m.Ack() + case <-time.After(time.Second): + t.Fatalf("Timeout waiting for messages, last received was %d", received) + } + } +} diff --git a/scripts/cov.sh b/scripts/cov.sh index dfbd785e8..f7057272a 100755 --- a/scripts/cov.sh +++ b/scripts/cov.sh @@ -3,10 +3,10 @@ rm -rf ./cov mkdir cov -go test -modfile=go_test.mod --failfast -v -race -covermode=atomic -coverprofile=./cov/nats.out -go test -modfile=go_test.mod --failfast -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test -go test -modfile=go_test.mod --failfast -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin -go test -modfile=go_test.mod --failfast -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto +go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/nats.out +go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test +go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin +go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto gocovmerge ./cov/*.out > acc.out rm -rf ./cov diff --git a/test/js_test.go b/test/js_test.go index 9e48c74bd..1f89ba4fa 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -285,17 +285,18 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { + expectConsumers := func(t *testing.T, expected int) { t.Helper() - var infos []*nats.ConsumerInfo - for info := range js.ConsumersInfo("TEST") { - infos = append(infos, info) - } - if len(infos) != expected { - t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) - } - - return infos + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("TEST") { + infos = append(infos, info) + } + if len(infos) != expected { + return fmt.Errorf("Expected %d consumers, got: %d", expected, len(infos)) + } + return nil + }) } // Create the stream using our client API. @@ -313,6 +314,16 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("stream lookup failed: %v", err) } + // Check that Queue subscribe with HB or FC fails. + _, err = js.QueueSubscribeSync("foo", "wq", nats.IdleHeartbeat(time.Second)) + if err == nil || !strings.Contains(err.Error(), "heartbeat") { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.QueueSubscribeSync("foo", "wq", nats.EnableFlowControl()) + if err == nil || !strings.Contains(err.Error(), "flow control") { + t.Fatalf("Unexpected error: %v", err) + } + msg := []byte("Hello JS") // Basic publish like NATS core. @@ -320,32 +331,40 @@ func TestJetStreamSubscribe(t *testing.T) { q := make(chan *nats.Msg, 4) + checkSub, err := nc.SubscribeSync("ivan") + if err != nil { + t.Fatalf("Error on sub: %v", err) + } + // Now create a simple ephemeral consumer. - sub, err := js.Subscribe("foo", func(m *nats.Msg) { + sub1, err := js.Subscribe("foo", func(m *nats.Msg) { q <- m - }) + }, nats.DeliverSubject("ivan")) if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer sub.Unsubscribe() + defer sub1.Unsubscribe() select { case m := <-q: if _, err := m.Metadata(); err != nil { t.Fatalf("Unexpected error: %v", err) } + if _, err := checkSub.NextMsg(time.Second); err != nil { + t.Fatal("Wrong deliver subject") + } case <-time.After(5 * time.Second): t.Fatalf("Did not receive the messages in time") } // Now do same but sync. - sub, err = js.SubscribeSync("foo") + sub2, err := js.SubscribeSync("foo") if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer sub.Unsubscribe() + defer sub2.Unsubscribe() - waitForPending := func(t *testing.T, n int) { + waitForPending := func(t *testing.T, sub *nats.Subscription, n int) { t.Helper() timeout := time.Now().Add(2 * time.Second) for time.Now().Before(timeout) { @@ -358,7 +377,7 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Expected to receive %d messages, but got %d", n, msgs) } - waitForPending(t, 1) + waitForPending(t, sub2, 1) toSend := 10 for i := 0; i < toSend; i++ { @@ -367,7 +386,7 @@ func TestJetStreamSubscribe(t *testing.T) { done := make(chan bool, 1) var received int - sub, err = js.Subscribe("bar", func(m *nats.Msg) { + sub3, err := js.Subscribe("bar", func(m *nats.Msg) { received++ if received == toSend { done <- true @@ -377,7 +396,7 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } expectConsumers(t, 3) - defer sub.Unsubscribe() + defer sub3.Unsubscribe() select { case <-done: @@ -387,7 +406,7 @@ func TestJetStreamSubscribe(t *testing.T) { // If we are here we have received all of the messages. // We hang the ConsumerInfo option off of the subscription, so we use that to check status. - info, _ := sub.ConsumerInfo() + info, _ := sub3.ConsumerInfo() if info.Config.AckPolicy != nats.AckExplicitPolicy { t.Fatalf("Expected ack explicit policy, got %q", info.Config.AckPolicy) } @@ -398,17 +417,19 @@ func TestJetStreamSubscribe(t *testing.T) { if info.AckFloor.Consumer != uint64(toSend) { t.Fatalf("Expected to have ack'd all %d messages, got ack floor of %d", toSend, info.AckFloor.Consumer) } - sub.Unsubscribe() - expectConsumers(t, 2) + sub3.Unsubscribe() + sub2.Unsubscribe() + sub1.Unsubscribe() + expectConsumers(t, 0) // Now create a sync subscriber that is durable. dname := "derek" - sub, err = js.SubscribeSync("foo", nats.Durable(dname)) + sub, err := js.SubscribeSync("foo", nats.Durable(dname)) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() - expectConsumers(t, 3) + expectConsumers(t, 1) // Make sure we registered as a durable. info, _ = sub.ConsumerInfo() @@ -417,88 +438,87 @@ func TestJetStreamSubscribe(t *testing.T) { } deliver := info.Config.DeliverSubject - // Remove subscription, but do not delete consumer. + // Drain subscription, this will delete the consumer. + go func() { + time.Sleep(250 * time.Millisecond) + for { + if _, err := sub.NextMsg(500 * time.Millisecond); err != nil { + return + } + } + }() sub.Drain() nc.Flush() - expectConsumers(t, 3) + expectConsumers(t, 0) - // Reattach using the same consumer. + // This will recreate a new instance. sub, err = js.SubscribeSync("foo", nats.Durable(dname)) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if info, err := sub.ConsumerInfo(); err != nil || info.Config.DeliverSubject != deliver { - t.Fatal("Expected delivery subject to be the same after reattach") + if info, err := sub.ConsumerInfo(); err != nil || info.Config.DeliverSubject == deliver { + t.Fatal("Expected delivery subject to be different") } - expectConsumers(t, 3) + expectConsumers(t, 1) - // Subscribing again with same subject and durable name is not an error, - // but does not create a new consumer either. - sub, err = js.SubscribeSync("foo", nats.Durable(dname)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + // Subscribing again with same subject and durable name is an error. + if _, err := js.SubscribeSync("foo", nats.Durable(dname)); err == nil { + t.Fatal("Unexpected success") } - if info, err := sub.ConsumerInfo(); err != nil || info.Config.DeliverSubject != deliver { - t.Fatal("Expected delivery subject to be the same after reattach") - } - expectConsumers(t, 3) + expectConsumers(t, 1) - // Cleanup the consumer to be able to create again with a different delivery subject. - // this should be the same as `sub.Unsubscribe()'. - js.DeleteConsumer("TEST", dname) - expectConsumers(t, 2) + // Delete the durable. + sub.Unsubscribe() + expectConsumers(t, 0) - // Create again and make sure that works and that we attach to the same durable with different delivery. + // Create again and make sure that works. sub, err = js.SubscribeSync("foo", nats.Durable(dname)) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() - expectConsumers(t, 3) + expectConsumers(t, 1) if deliver == sub.Subject { t.Fatalf("Expected delivery subject to be different then %q", deliver) } - deliver = sub.Subject + sub.Unsubscribe() + expectConsumers(t, 0) - // Now test that we can attach to an existing durable. - sub, err = js.SubscribeSync("foo", nats.Durable(dname)) + // Create a queue group on "bar" with no explicit durable name, which + // means that the queue name will be used as the durable name. + sub1, err = js.QueueSubscribeSync("bar", "v0") if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer sub.Unsubscribe() + defer sub1.Unsubscribe() + waitForPending(t, sub1, 10) + expectConsumers(t, 1) - if deliver != sub.Subject { - t.Fatalf("Expected delivery subject to be the same when attaching, got different") + // Since the above JS consumer is created on subject "bar", trying to + // add a member to the same group but on subject "baz" should fail. + if _, err = js.QueueSubscribeSync("baz", "v0"); err == nil { + t.Fatal("Unexpected success") } - // New QueueSubscribeSync with the same durable name will not - // create new consumers. - sub, err = js.QueueSubscribeSync("foo", "v0", nats.Durable(dname)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer sub.Unsubscribe() - waitForPending(t, 0) - expectConsumers(t, 3) - - // QueueSubscribeSync with a wrong subject from the previous consumer - // is an error. - _, err = js.QueueSubscribeSync("bar", "v0", nats.Durable(dname)) - if err == nil { - t.Fatalf("Unexpected success") + // If the queue group is different, but we try to attach to the existing + // JS consumer that is created for group "v0", then this should fail. + if _, err = js.QueueSubscribeSync("bar", "v1", nats.Durable("v0")); err == nil { + t.Fatal("Unexpected success") } - - // QueueSubscribeSync with a different durable name will receive - // the messages. - qsubDurable := nats.Durable(dname + "-qsub") - sub, err = js.QueueSubscribeSync("bar", "v0", qsubDurable) + // However, if a durable name is specified, creating a queue sub with + // the same queue name is ok, but will feed from a different JS consumer. + sub2, err = js.QueueSubscribeSync("bar", "v0", nats.Durable("otherQueueDurable")) if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer sub.Unsubscribe() - waitForPending(t, 10) - expectConsumers(t, 4) + defer sub2.Unsubscribe() + waitForPending(t, sub2, 10) + expectConsumers(t, 2) + + sub1.Unsubscribe() + sub2.Unsubscribe() + expectConsumers(t, 0) // Now try pull based subscribers. @@ -508,26 +528,35 @@ func TestJetStreamSubscribe(t *testing.T) { } // Durable name is required for now. - sub, err = js.PullSubscribe("bar", "") - if err == nil { + if _, err = js.PullSubscribe("bar", ""); err == nil { t.Fatalf("Unexpected success") } - if err != nil && err.Error() != `nats: consumer in pull mode requires a durable name` { + if err.Error() != `nats: consumer in pull mode requires a durable name` { t.Errorf("Expected consumer in pull mode error, got %v", err) } - sub, err = js.PullSubscribe("bar", "foo", nats.Durable("bar")) - if err == nil { + if _, err = js.PullSubscribe("bar", "foo", nats.Durable("bar")); err == nil { t.Fatalf("Unexpected success") } - if err != nil && err.Error() != `nats: option Durable set more than once` { + if err.Error() != `nats: option Durable set more than once` { t.Errorf("Expected consumer in pull mode error, got %v", err) } + // Can't specify DeliverSubject for pull subscribers + _, err = js.PullSubscribe("bar", "foo", nats.DeliverSubject("baz")) + if err != nats.ErrPullSubscribeToPushConsumer { + t.Fatalf("Unexpected error: %v", err) + } + // Can't specify BindDeliverSubject + _, err = js.PullSubscribe("bar", "foo", nats.BindDeliverSubject("baz")) + if err != nats.ErrPullSubscribeToPushConsumer { + t.Fatalf("Unexpected error: %v", err) + } batch := 5 sub, err = js.PullSubscribe("bar", "rip") if err != nil { t.Fatalf("Unexpected error: %v", err) } + expectConsumers(t, 1) // The first batch if available should be delivered and queued up. bmsgs, err := sub.Fetch(batch) @@ -550,7 +579,7 @@ func TestJetStreamSubscribe(t *testing.T) { if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch) { t.Fatalf("Expected ack floor to be %d, got %d", batch, info.AckFloor.Consumer) } - waitForPending(t, 0) + waitForPending(t, sub, 0) // Make a request for 10 but should only receive a few. bmsgs, err = sub.Fetch(10, nats.MaxWait(2*time.Second)) @@ -567,8 +596,6 @@ func TestJetStreamSubscribe(t *testing.T) { msg.Ack() } - sub.Drain() - // Now test attaching to a pull based durable. // Test that if we are attaching that the subjects will match up. rip from @@ -588,6 +615,8 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + // No new JS consumer was created. + expectConsumers(t, 1) // Fetch messages a couple of times. expected = 5 @@ -609,6 +638,17 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Expected ack pending of %d and pending to be %d, got %d %d", batch, toSend-batch, info.NumAckPending, info.NumPending) } + // Pull subscriptions can't use NextMsg variants. + if _, err := sub.NextMsg(time.Second); err != nats.ErrTypeSubscription { + t.Fatalf("Expected error %q, got %v", nats.ErrTypeSubscription, err) + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if _, err := sub.NextMsgWithContext(ctx); err != nats.ErrTypeSubscription { + t.Fatalf("Expected error %q, got %v", nats.ErrTypeSubscription, err) + } + cancel() + // Prevent invalid durable names if _, err := js.SubscribeSync("baz", nats.Durable("test.durable")); err != nats.ErrInvalidDurableName { t.Fatalf("Expected invalid durable name error") @@ -619,6 +659,7 @@ func TestJetStreamSubscribe(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + expectConsumers(t, 2) _, err = sub.NextMsg(1 * time.Second) if err != nil { @@ -637,6 +678,7 @@ func TestJetStreamSubscribe(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + expectConsumers(t, 3) m, err := sub.NextMsg(1 * time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -652,13 +694,14 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected consumer name, got: %v", meta.Consumer) } - qsubDurable = nats.Durable(dname + "-qsub-chan") + qsubDurable := nats.Durable("qdur-chan") mch := make(chan *nats.Msg, 16536) sub, err = js.ChanQueueSubscribe("bar", "v1", mch, qsubDurable) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + expectConsumers(t, 4) var a, b *nats.MsgMetadata select { @@ -678,6 +721,8 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + // Not a new JS consumer + expectConsumers(t, 4) // Publish more messages so that at least one is received by // the channel queue subscriber. @@ -700,7 +745,7 @@ func TestJetStreamSubscribe(t *testing.T) { } // Both ChanQueueSubscribers use the same consumer. - expectConsumers(t, 8) + expectConsumers(t, 4) } func TestJetStreamAckPending_Pull(t *testing.T) { @@ -861,471 +906,13 @@ func TestJetStreamAckPending_Pull(t *testing.T) { } } - _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)) - if err != nats.ErrTimeout { - t.Errorf("Expected timeout, got: %v", err) - } -} - -func TestJetStreamAckPending_Push(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer nc.Close() - - js, err := nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - const totalMsgs = 3 - for i := 0; i < totalMsgs; i++ { - if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil { - t.Fatal(err) - } - } - - sub, err := js.SubscribeSync("foo", - nats.Durable("dname-wait"), - nats.AckWait(100*time.Millisecond), - nats.MaxDeliver(5), - nats.MaxAckPending(3), - ) - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() - - // 3 messages delivered 5 times. - expected := 15 - timeout := time.Now().Add(2 * time.Second) - pending := 0 - for time.Now().Before(timeout) { - if pending, _, _ = sub.Pending(); pending >= expected { - break - } - time.Sleep(10 * time.Millisecond) - } - if pending < expected { - t.Errorf("Expected %v, got %v", expected, pending) - } - - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - - got := info.NumRedelivered - expected = 3 - if got < expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - got = info.NumAckPending - expected = 3 - if got < expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - got = info.NumWaiting - expected = 0 - if got != expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - got = int(info.NumPending) - expected = 0 - if got != expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - got = info.Config.MaxAckPending - expected = 3 - if got != expected { - t.Errorf("Expected %v, got %v", expected, pending) - } - - got = info.Config.MaxDeliver - expected = 5 - if got != expected { - t.Errorf("Expected %v, got %v", expected, pending) - } - - acks := map[int]int{} - - ackPending := 3 - timeout = time.Now().Add(2 * time.Second) - for time.Now().Before(timeout) { - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if got, want := info.NumAckPending, ackPending; got > 0 && got != want { - t.Fatalf("unexpected num ack pending: got=%d, want=%d", got, want) - } - - // Continue to ack all messages until no more pending. - pending, _, _ = sub.Pending() - if pending == 0 { - break - } - - m, err := sub.NextMsg(100 * time.Millisecond) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - - if err := m.AckSync(); err != nil { - t.Fatalf("Error on ack message: %v", err) - } - - meta, err := m.Metadata() - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - acks[int(meta.Sequence.Stream)]++ - - if ackPending != 0 { - ackPending-- - } - if int(meta.NumPending) != ackPending { - t.Errorf("Expected %v, got %v", ackPending, meta.NumPending) - } - } - - got = len(acks) - expected = 3 - if got != expected { - t.Errorf("Expected %v, got %v", expected, got) - } - - expected = 5 - for _, got := range acks { - if got != expected { - t.Errorf("Expected %v, got %v", expected, got) - } - } - - _, err = sub.NextMsg(100 * time.Millisecond) - if err != nats.ErrTimeout { - t.Errorf("Expected timeout, got: %v", err) - } -} - -func TestJetStreamPushFlowControlHeartbeats_SubscribeSync(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - errHandler := nats.ErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { - t.Logf("WARN: %s", err) - }) - - nc, err := nats.Connect(s.ClientURL(), errHandler) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer nc.Close() - - js, err := nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Burst and try to hit the flow control limit of the server. - const totalMsgs = 16536 - payload := strings.Repeat("A", 1024) - for i := 0; i < totalMsgs; i++ { - if _, err := js.Publish("foo", []byte(fmt.Sprintf("i:%d/", i)+payload)); err != nil { - t.Fatal(err) - } - } - - hbTimer := 500 * time.Millisecond - sub, err := js.SubscribeSync("foo", - nats.AckWait(30*time.Second), - nats.MaxDeliver(1), - nats.EnableFlowControl(), - nats.IdleHeartbeat(hbTimer), - ) - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() - - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") - } - if info.Config.Heartbeat != hbTimer { - t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) - } - - m, err := sub.NextMsg(1 * time.Second) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - meta, err := m.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending > totalMsgs { - t.Logf("WARN: More pending messages than expected (%v), got: %v", totalMsgs, meta.NumPending) - } - err = m.Ack() - if err != nil { - t.Fatal(err) - } - - recvd := 1 - timeout := time.Now().Add(10 * time.Second) - for time.Now().Before(timeout) { - m, err := sub.NextMsg(1 * time.Second) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - if len(m.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - if err := m.AckSync(); err != nil { - t.Fatalf("Error on ack message: %v", err) - } - recvd++ - - if recvd == totalMsgs { - break - } - } - - t.Run("idle heartbeats", func(t *testing.T) { - // Delay to get a few heartbeats. - time.Sleep(2 * time.Second) - - timeout = time.Now().Add(5 * time.Second) - for time.Now().Before(timeout) { - msg, err := sub.NextMsg(200 * time.Millisecond) - if err != nil { - if err == nats.ErrTimeout { - // If timeout, ok to stop checking for the test. - break - } - t.Fatal(err) - } - if len(msg.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - recvd++ - meta, err := msg.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending == 0 { - break - } - } - if recvd > totalMsgs { - t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) - } - }) - - t.Run("with context", func(t *testing.T) { - sub, err := js.SubscribeSync("foo", - nats.AckWait(100*time.Millisecond), - nats.Durable("bar"), - nats.EnableFlowControl(), - nats.IdleHeartbeat(hbTimer), - ) - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() - - info, err = sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") - } - - recvd = 0 - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - for { - select { - case <-ctx.Done(): - t.Fatal(ctx.Err()) - default: - } - - m, err := sub.NextMsgWithContext(ctx) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - if len(m.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - if err := m.Ack(); err != nil { - t.Fatalf("Error on ack message: %v", err) - } - recvd++ - - if recvd >= totalMsgs { - break - } - } - - // Delay to get a few heartbeats. - time.Sleep(2 * time.Second) - for { - select { - case <-ctx.Done(): - if ctx.Err() == context.DeadlineExceeded { - return - } - default: - } - - msg, err := sub.NextMsgWithContext(ctx) - if err != nil { - if err == context.DeadlineExceeded { - break - } - t.Fatal(err) - } - if len(msg.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - meta, err := msg.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending == 0 { - break - } - } - }) -} - -func TestJetStreamPushFlowControlHeartbeats_SubscribeAsync(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer nc.Close() - - js, err := nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Burst and try to hit the flow control limit of the server. - const totalMsgs = 16536 - payload := strings.Repeat("A", 1024) - for i := 0; i < totalMsgs; i++ { - if _, err := js.Publish("foo", []byte(payload)); err != nil { - t.Fatal(err) - } - } - - recvd := make(chan *nats.Msg, totalMsgs) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - errCh := make(chan error) - hbTimer := 200 * time.Millisecond - sub, err := js.Subscribe("foo", func(msg *nats.Msg) { - if len(msg.Data) == 0 { - errCh <- fmt.Errorf("Unexpected empty message: %+v", msg) - } - recvd <- msg - - if len(recvd) == totalMsgs { - cancel() - } - }, nats.EnableFlowControl(), nats.IdleHeartbeat(hbTimer)) - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() - - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") - } - if info.Config.Heartbeat != hbTimer { - t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) - } - - <-ctx.Done() - - got := len(recvd) - expected := totalMsgs - if got != expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - // Wait for a couple of heartbeats to arrive and confirm there is no error. - select { - case <-time.After(1 * time.Second): - case err := <-errCh: - t.Fatal(err) + _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)) + if err != nats.ErrTimeout { + t.Errorf("Expected timeout, got: %v", err) } } -func TestJetStreamPushFlowControlHeartbeats_ChanSubscribe(t *testing.T) { +func TestJetStreamAckPending_Push(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -1333,11 +920,7 @@ func TestJetStreamPushFlowControlHeartbeats_ChanSubscribe(t *testing.T) { defer os.RemoveAll(config.StoreDir) } - errHandler := nats.ErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { - t.Logf("WARN: %s : %v", err, sub.Subject) - }) - - nc, err := nats.Connect(s.ClientURL(), errHandler) + nc, err := nats.Connect(s.ClientURL()) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1356,228 +939,137 @@ func TestJetStreamPushFlowControlHeartbeats_ChanSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - // Burst and try to hit the flow control limit of the server. - const totalMsgs = 16536 - payload := strings.Repeat("A", 1024) + const totalMsgs = 3 for i := 0; i < totalMsgs; i++ { - if _, err := js.Publish("foo", []byte(fmt.Sprintf("i:%d/", i)+payload)); err != nil { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil { t.Fatal(err) } } - hbTimer := 500 * time.Millisecond - mch := make(chan *nats.Msg, 16536) - sub, err := js.ChanSubscribe("foo", mch, - nats.AckWait(30*time.Second), - nats.MaxDeliver(1), - nats.EnableFlowControl(), - nats.IdleHeartbeat(hbTimer), + sub, err := js.SubscribeSync("foo", + nats.Durable("dname-wait"), + nats.AckWait(100*time.Millisecond), + nats.MaxDeliver(5), + nats.MaxAckPending(3), ) if err != nil { t.Fatal(err) } defer sub.Unsubscribe() - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") - } - if info.Config.Heartbeat != hbTimer { - t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) - } - - getNextMsg := func(mch chan *nats.Msg, timeout time.Duration) (*nats.Msg, error) { - t.Helper() - select { - case m := <-mch: - return m, nil - case <-time.After(timeout): - return nil, nats.ErrTimeout + // 3 messages delivered 5 times. + expected := 15 + timeout := time.Now().Add(2 * time.Second) + pending := 0 + for time.Now().Before(timeout) { + if pending, _, _ = sub.Pending(); pending >= expected { + break } + time.Sleep(10 * time.Millisecond) } - - m, err := getNextMsg(mch, 1*time.Second) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - meta, err := m.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending > totalMsgs { - t.Logf("WARN: More pending messages than expected (%v), got: %v", totalMsgs, meta.NumPending) + if pending < expected { + t.Errorf("Expected %v, got %v", expected, pending) } - err = m.Ack() + + info, err := sub.ConsumerInfo() if err != nil { t.Fatal(err) } - recvd := 1 - ctx, done := context.WithTimeout(context.Background(), 10*time.Second) - defer done() - -Loop: - for { - select { - case <-ctx.Done(): - break Loop - case m := <-mch: - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - if len(m.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - if err := m.Ack(); err != nil { - t.Fatalf("Error on ack message: %v", err) - } - recvd++ - - if recvd == totalMsgs { - done() - } - } + got := info.NumRedelivered + expected = 3 + if got < expected { + t.Errorf("Expected %v, got: %v", expected, got) } - t.Run("idle heartbeats", func(t *testing.T) { - // Delay to get a few heartbeats. - time.Sleep(2 * time.Second) - - ctx, done := context.WithTimeout(context.Background(), 5*time.Second) - defer done() - Loop: - for { - select { - case <-ctx.Done(): - break Loop - case msg := <-mch: - if err != nil { - if err == nats.ErrTimeout { - // If timeout, ok to stop checking for the test. - break - } - t.Fatal(err) - } - if len(msg.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - recvd++ - meta, err := msg.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending == 0 { - break Loop - } - } - } - if recvd > totalMsgs { - t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) - } - }) -} - -func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) + got = info.NumAckPending + expected = 3 + if got < expected { + t.Errorf("Expected %v, got: %v", expected, got) } - errCh := make(chan error) - errHandler := nats.ErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { - errCh <- err - }) - nc, err := nats.Connect(s.ClientURL(), errHandler) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + got = info.NumWaiting + expected = 0 + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) } - defer nc.Close() - const totalMsgs = 10_000 + got = int(info.NumPending) + expected = 0 + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) + } - js, err := nc.JetStream(nats.PublishAsyncMaxPending(totalMsgs)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + got = info.Config.MaxAckPending + expected = 3 + if got != expected { + t.Errorf("Expected %v, got %v", expected, pending) } - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + got = info.Config.MaxDeliver + expected = 5 + if got != expected { + t.Errorf("Expected %v, got %v", expected, pending) } - go func() { - payload := strings.Repeat("O", 4096) - for i := 0; i < totalMsgs; i++ { - js.PublishAsync("foo", []byte(payload)) - } - }() - // Small channel that blocks and then buffered channel that can deliver all - // messages without blocking. - recvd := make(chan *nats.Msg, 64) - delivered := make(chan *nats.Msg, totalMsgs) - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() + acks := map[int]int{} - // Dispatch channel consumer - go func() { - for m := range recvd { - select { - case <-ctx.Done(): - return - default: - } + ackPending := 3 + timeout = time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if got, want := info.NumAckPending, ackPending; got > 0 && got != want { + t.Fatalf("unexpected num ack pending: got=%d, want=%d", got, want) + } - delivered <- m - if len(delivered) == totalMsgs { - cancel() - } + // Continue to ack all messages until no more pending. + pending, _, _ = sub.Pending() + if pending == 0 { + break } - }() - sub, err := js.Subscribe("foo", func(msg *nats.Msg) { - // Cause bottleneck by having channel block when full - // because of work taking long. - recvd <- msg - }, nats.EnableFlowControl()) + m, err := sub.NextMsg(100 * time.Millisecond) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() + if err := m.AckSync(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } - // Set this lower then normal to make sure we do not exceed bytes pending with FC turned on. - sub.SetPendingLimits(totalMsgs, 1024*1024) // This matches server window for flowcontrol. + meta, err := m.Metadata() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + acks[int(meta.Sequence.Stream)]++ - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") + if ackPending != 0 { + ackPending-- + } + if int(meta.NumPending) != ackPending { + t.Errorf("Expected %v, got %v", ackPending, meta.NumPending) + } } - <-ctx.Done() - got := len(delivered) - expected := totalMsgs + got = len(acks) + expected = 3 if got != expected { - t.Errorf("Expected %d messages, got: %d", expected, got) + t.Errorf("Expected %v, got %v", expected, got) } - // Wait for a couple of heartbeats to arrive and confirm there is no error. - select { - case <-time.After(1 * time.Second): - case err := <-errCh: - t.Errorf("error handler: %v", err) + expected = 5 + for _, got := range acks { + if got != expected { + t.Errorf("Expected %v, got %v", expected, got) + } + } + + _, err = sub.NextMsg(100 * time.Millisecond) + if err != nats.ErrTimeout { + t.Errorf("Expected timeout, got: %v", err) } } @@ -2479,11 +1971,16 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } waitForPending(t, toSend) - // Bind has the same effect as above since it would not attempt to create and lookup will work. - sub, err = js.SubscribeSync("ORDERS", nats.Bind("ORDERS", "d4")) + // It is also possible to create a subscription with a BindDeliverSubject() API + // that will not try to do lookup nor create a JS consumer object. + sub, err = js.SubscribeSync("ignored", nats.BindDeliverSubject("p.d4")) if err != nil { t.Fatalf("Unexpected error: %v", err) } + js.Publish("orders", []byte("msg")) + if _, err := sub.NextMsg(time.Second); err != nil { + t.Fatalf("Error getting message: %v", err) + } // Even if there are no permissions or import to check that a consumer exists, // it is still possible to bind subscription to it. @@ -2509,7 +2006,8 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } defer nc.Close() - js, err = nc.JetStream() + // Since we know that the lookup will fail, we use a smaller timeout than the 5s default. + js, err = nc.JetStream(nats.MaxWait(500 * time.Millisecond)) if err != nil { t.Fatal(err) } @@ -2768,113 +2266,36 @@ func TestJetStreamInterfaces(t *testing.T) { defer nc.Close() var js nats.JetStream - var jsm nats.JetStreamManager - var jsctx nats.JetStreamContext - - // JetStream that can publish/subscribe but cannot manage streams. - js, err = nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - js.Publish("foo", []byte("hello")) - - // JetStream context that can manage streams/consumers but cannot produce messages. - jsm, err = nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - jsm.AddStream(&nats.StreamConfig{Name: "FOO"}) - - // JetStream context that can both manage streams/consumers - // as well as publish/subscribe. - jsctx, err = nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - jsctx.AddStream(&nats.StreamConfig{Name: "BAR"}) - jsctx.Publish("bar", []byte("hello world")) - - publishMsg := func(js nats.JetStream, payload []byte) { - js.Publish("foo", payload) - } - publishMsg(js, []byte("hello world")) -} - -func TestJetStreamChanSubscribeStall(t *testing.T) { - conf := createConfFile(t, []byte(` - listen: 127.0.0.1:-1 - jetstream: enabled - no_auth_user: pc - accounts: { - JS: { - jetstream: enabled - users: [ {user: pc, password: foo} ] - }, - } - `)) - defer os.Remove(conf) - - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer nc.Close() - - js, err := nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + var jsm nats.JetStreamManager + var jsctx nats.JetStreamContext - // Create a stream. - if _, err = js.AddStream(&nats.StreamConfig{Name: "STALL"}); err != nil { + // JetStream that can publish/subscribe but cannot manage streams. + js, err = nc.JetStream() + if err != nil { t.Fatalf("Unexpected error: %v", err) } + js.Publish("foo", []byte("hello")) - _, err = js.StreamInfo("STALL") + // JetStream context that can manage streams/consumers but cannot produce messages. + jsm, err = nc.JetStream() if err != nil { - t.Fatalf("stream lookup failed: %v", err) - } - - msg := []byte(strings.Repeat("A", 512)) - toSend := 100_000 - for i := 0; i < toSend; i++ { - // Use plain NATS here for speed. - nc.Publish("STALL", msg) + t.Fatalf("Unexpected error: %v", err) } - nc.Flush() + jsm.AddStream(&nats.StreamConfig{Name: "FOO"}) - batch := 100 - msgs := make(chan *nats.Msg, batch-2) - sub, err := js.ChanSubscribe("STALL", msgs, - nats.Durable("dlc"), - nats.EnableFlowControl(), - nats.MaxAckPending(batch-2), - ) + // JetStream context that can both manage streams/consumers + // as well as publish/subscribe. + jsctx, err = nc.JetStream() if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer sub.Unsubscribe() + jsctx.AddStream(&nats.StreamConfig{Name: "BAR"}) + jsctx.Publish("bar", []byte("hello world")) - for received := 0; received < toSend; { - select { - case m := <-msgs: - received++ - meta, _ := m.Metadata() - if meta.Sequence.Consumer != uint64(received) { - t.Fatalf("Missed something, wanted %d but got %d", received, meta.Sequence.Consumer) - } - m.Ack() - case <-time.After(time.Second): - t.Fatalf("Timeout waiting for messages, last received was %d", received) - } + publishMsg := func(js nats.JetStream, payload []byte) { + js.Publish("foo", payload) } + publishMsg(js, []byte("hello world")) } func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) { @@ -3618,17 +3039,25 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { + fetchConsumers := func(t *testing.T, expected int) { t.Helper() - var infos []*nats.ConsumerInfo - for info := range js.ConsumersInfo("foo") { - infos = append(infos, info) - } - if len(infos) != expected { - t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) - } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("foo") { + infos = append(infos, info) + } + if len(infos) != expected { + return fmt.Errorf("Expected %d consumers, got: %d", expected, len(infos)) + } + return nil + }) + } - return infos + deleteAllConsumers := func(t *testing.T) { + t.Helper() + for cn := range js.ConsumerNames("foo") { + js.DeleteConsumer("foo", cn) + } } js.Publish("foo.A", []byte("A")) @@ -3636,27 +3065,41 @@ func TestJetStream_Unsubscribe(t *testing.T) { js.Publish("foo.C", []byte("C")) t.Run("consumers deleted on unsubscribe", func(t *testing.T) { - subA, err := js.SubscribeSync("foo.A") + sub, err := js.SubscribeSync("foo.A") if err != nil { t.Fatal(err) } - err = subA.Unsubscribe() + if err := sub.Unsubscribe(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + sub, err = js.SubscribeSync("foo.B", nats.Durable("B")) if err != nil { + t.Fatal(err) + } + if err := sub.Unsubscribe(); err != nil { t.Errorf("Unexpected error: %v", err) } - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + sub, err = js.Subscribe("foo.C", func(_ *nats.Msg) {}) if err != nil { t.Fatal(err) } - err = subB.Unsubscribe() + if err := sub.Unsubscribe(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + sub, err = js.Subscribe("foo.C", func(_ *nats.Msg) {}, nats.Durable("C")) if err != nil { + t.Fatal(err) + } + if err := sub.Unsubscribe(); err != nil { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 0) }) - t.Run("attached pull consumer deleted on unsubscribe", func(t *testing.T) { + t.Run("not deleted on unsubscribe if consumer created externally", func(t *testing.T) { // Created by JetStreamManagement if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ Durable: "wq", @@ -3684,43 +3127,35 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Expected %v, got %v", expected, got) } subC.Unsubscribe() - fetchConsumers(t, 0) + fetchConsumers(t, 1) + deleteAllConsumers(t) }) - t.Run("ephemeral consumers deleted on drain", func(t *testing.T) { - subA, err := js.SubscribeSync("foo.A") + t.Run("consumers deleted on drain", func(t *testing.T) { + subA, err := js.Subscribe("foo.A", func(_ *nats.Msg) {}) if err != nil { t.Fatal(err) } + fetchConsumers(t, 1) err = subA.Drain() if err != nil { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 0) + deleteAllConsumers(t) }) - t.Run("durable consumers not deleted on drain", func(t *testing.T) { - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + t.Run("durable consumers deleted on drain", func(t *testing.T) { + subB, err := js.Subscribe("foo.B", func(_ *nats.Msg) {}, nats.Durable("B")) if err != nil { t.Fatal(err) } - err = subB.Drain() - if err != nil { - t.Errorf("Unexpected error: %v", err) - } fetchConsumers(t, 1) - }) - - t.Run("reattached durable consumers not deleted on drain", func(t *testing.T) { - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) - if err != nil { - t.Fatal(err) - } err = subB.Drain() if err != nil { t.Errorf("Unexpected error: %v", err) } - fetchConsumers(t, 1) + fetchConsumers(t, 0) }) } @@ -3782,8 +3217,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { t.Fatal(err) } - // sub.Drain() or nc.Drain() does not delete the durable consumers, - // just makes client go away. Ephemerals will get deleted though. + // sub.Drain() or nc.Drain() delete JS consumer, same than Unsubscribe() nc.Drain() <-ctx.Done() fetchConsumers(t, 0) @@ -3804,8 +3238,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - _, err = js.SubscribeSync("foo.A") - if err != nil { + if _, err := js.SubscribeSync("foo.A"); err != nil { t.Fatalf("Unexpected error: %v", err) } subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) @@ -3834,7 +3267,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { jsm.Publish("foo.B", []byte("B.2")) jsm.Publish("foo.C", []byte("C.2")) - t.Run("reattached durables consumers can be deleted with unsubscribe", func(t *testing.T) { + t.Run("reattached durables consumers cannot be deleted with unsubscribe", func(t *testing.T) { nc, err := nats.Connect(serverURL) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3871,20 +3304,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { jsm.Publish("foo.B", []byte("B.3")) - // Attach again to the same subject with the durable. - dupSub, err := js.SubscribeSync("foo.B", nats.Durable("B")) - if err != nil { - t.Fatal(err) - } - - // The same durable is already used, so this dup durable - // subscription won't receive the message. - _, err = dupSub.NextMsg(1 * time.Second) - if err == nil { - t.Fatalf("Expected error: %v", err) - } - - // Original sub can still receive the same message. + // Sub can still receive the same message. resp, err = subB.NextMsg(1 * time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3901,17 +3321,8 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { t.Errorf("Unexpected error: %v", err) } - err = dupSub.Unsubscribe() - if err == nil { - t.Fatalf("Unexpected success") - } - if !errors.Is(err, nats.ErrConsumerNotFound) { - t.Errorf("Expected consumer not found error, got: %v", err) - } - - // Remains an ephemeral consumer that did not get deleted - // when Close() was called. - fetchConsumers(t, 1) + // Since library did not create, the JS consumers remain. + fetchConsumers(t, 2) }) } @@ -3947,7 +3358,7 @@ func TestJetStream_UnsubscribeDeleteNoPermissions(t *testing.T) { } defer nc.Close() - js, err := nc.JetStream() + js, err := nc.JetStream(nats.MaxWait(time.Second)) if err != nil { t.Fatal(err) } @@ -4315,14 +3726,14 @@ func waitForJSReady(t *testing.T, nc *nats.Conn) { var err error timeout := time.Now().Add(10 * time.Second) for time.Now().Before(timeout) { - js, err := nc.JetStream() + // Use a smaller MaxWait here since if it fails, we don't want + // to wait for too long since we are going to try again. + js, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond)) if err != nil { t.Fatal(err) } _, err = js.AccountInfo() if err != nil { - // Backoff for a bit until cluster ready. - time.Sleep(250 * time.Millisecond) continue } return @@ -4882,15 +4293,6 @@ func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { continue } - t.Run(fmt.Sprintf("sub n=%d r=%d", n, r), func(t *testing.T) { - name := fmt.Sprintf("SUB%d%d", n, r) - stream := &nats.StreamConfig{ - Name: name, - Replicas: n, - } - withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleSubscribe) - }) - t.Run(fmt.Sprintf("qsub n=%d r=%d", n, r), func(t *testing.T) { name := fmt.Sprintf("MSUB%d%d", n, r) stream := &nats.StreamConfig{ @@ -4912,89 +4314,6 @@ func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { } } -func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs ...*jsServer) { - srv := srvs[0] - nc, err := nats.Connect(srv.ClientURL()) - if err != nil { - t.Fatal(err) - } - defer nc.Close() - - var wg sync.WaitGroup - ctx, done := context.WithTimeout(context.Background(), 2*time.Second) - defer done() - - js, err := nc.JetStream() - if err != nil { - t.Fatal(err) - } - - size := 5 - subs := make([]*nats.Subscription, size) - errCh := make(chan error, size) - - // We are testing auto-bind here so create one first and expect others to bind to it. - sub, err := js.SubscribeSync(subject, nats.Durable("shared")) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - subs[0] = sub - - for i := 1; i < size; i++ { - wg.Add(1) - go func(n int) { - defer wg.Done() - var sub *nats.Subscription - var err error - for attempt := 0; attempt < 5; attempt++ { - sub, err = js.SubscribeSync(subject, nats.Durable("shared")) - if err != nil { - time.Sleep(1 * time.Second) - continue - } - break - } - if err != nil { - errCh <- err - } else { - subs[n] = sub - } - }(i) - } - - go func() { - // Unblock the main context when done. - wg.Wait() - done() - }() - - wg.Wait() - for i := 0; i < size*2; i++ { - js.Publish(subject, []byte("test")) - } - - delivered := 0 - for _, sub := range subs { - if sub == nil { - continue - } - if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs > 0 { - delivered++ - } - } - if delivered < 2 { - t.Fatalf("Expected more than one subscriber to receive a message, got: %v", delivered) - } - - select { - case <-ctx.Done(): - case err := <-errCh: - if err != nil { - t.Fatalf("Unexpected error with multiple subscribers: %v", err) - } - } -} - func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, srvs ...*jsServer) { srv := srvs[0] nc, err := nats.Connect(srv.ClientURL()) @@ -5161,7 +4480,7 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr } func TestJetStream_ClusterReconnect(t *testing.T) { - t.Skip("This test need to be revisited, fails often even without those changes") + t.Skip("This test need to be revisited") n := 3 replicas := []int{1, 3} @@ -6147,7 +5466,7 @@ func TestJetStreamBindConsumer(t *testing.T) { } // Push Consumer Bind Only - _, err = js.SubscribeSync("foo", nats.Bind("foo", "push")) + sub, err := js.SubscribeSync("foo", nats.Bind("foo", "push")) if err != nil { t.Fatal(err) } @@ -6160,12 +5479,54 @@ func TestJetStreamBindConsumer(t *testing.T) { if err == nil || !strings.Contains(err.Error(), `nats: duplicate stream name (foo and foo2)`) { t.Fatalf("Unexpected error: %v", err) } + sub.Unsubscribe() + + checkConsInactive := func() { + t.Helper() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + ci, _ := js.ConsumerInfo("foo", "push") + if ci != nil && !ci.PushBound { + return nil + } + return fmt.Errorf("Conusmer %q still active", "push") + }) + } + checkConsInactive() // Duplicate stream name is fine. - _, err = js.SubscribeSync("foo", nats.BindStream("foo"), nats.Bind("foo", "push")) + sub, err = js.SubscribeSync("foo", nats.BindStream("foo"), nats.Bind("foo", "push")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Cannot have 2 instances for same durable + _, err = js.SubscribeSync("foo", nats.Durable("push")) + if err == nil || !strings.Contains(err.Error(), "already bound") { + t.Fatalf("Unexpected error: %v", err) + } + // Cannot start a queue sub since plain durable is active + _, err = js.QueueSubscribeSync("foo", "wq", nats.Durable("push")) + if err == nil || !strings.Contains(err.Error(), "without a deliver group") { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + checkConsInactive() + + // Create a queue sub + _, err = js.QueueSubscribeSync("foo", "wq1", nats.Durable("qpush")) if err != nil { t.Fatalf("Unexpected error: %v", err) } + // Can't create a plain sub on that durable + _, err = js.SubscribeSync("foo", nats.Durable("qpush")) + if err == nil || !strings.Contains(err.Error(), "cannot create a subscription for a consumer with a deliver group") { + t.Fatalf("Unexpected error: %v", err) + } + // Try to attach different queue group + _, err = js.QueueSubscribeSync("foo", "wq2", nats.Durable("qpush")) + if err == nil || !strings.Contains(err.Error(), "cannot create a queue subscription") { + t.Fatalf("Unexpected error: %v", err) + } // Pull consumer _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ @@ -6201,30 +5562,36 @@ func TestJetStreamBindConsumer(t *testing.T) { t.Fatal(err) } - // Bind to ephemeral consumer by setting the consumer. - sub2, err := js.SubscribeSync("foo", nats.Bind("foo", cinfo.Name)) - if err != nil { + // Cannot bind to ephemeral consumer because it is active. + _, err = js.SubscribeSync("foo", nats.Bind("foo", cinfo.Name)) + if err == nil || !strings.Contains(err.Error(), "already bound") { t.Fatalf("Unexpected error: %v", err) } - sub3, err := nc.SubscribeSync(cinfo.Config.DeliverSubject) + + // However, one can create an ephemeral Queue subscription and bind several members to it. + sub2, err := js.QueueSubscribeSync("foo", "wq3") if err != nil { t.Fatalf("Unexpected error: %v", err) } - js.Publish("foo", []byte("hi 1")) - js.Publish("foo", []byte("hi 2")) - js.Publish("foo", []byte("hi 3")) - - _, err = sub1.NextMsg(1 * time.Second) - if err != nil { - t.Fatal(err) + // Consumer all + for i := 0; i < 25; i++ { + msg, err := sub2.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on NextMsg: %v", err) + } + msg.AckSync() } - _, err = sub2.NextMsg(1 * time.Second) + cinfo, _ = sub2.ConsumerInfo() + sub3, err := js.QueueSubscribeSync("foo", "wq3", nats.Bind("foo", cinfo.Name)) if err != nil { - t.Fatal(err) + t.Fatalf("Unexpected error: %v", err) } - _, err = sub3.NextMsg(1 * time.Second) - if err != nil { - t.Fatal(err) + for i := 0; i < 100; i++ { + js.Publish("foo", []byte("new")) + } + // We expect sub3 to at least get a message + if _, err := sub3.NextMsg(time.Second); err != nil { + t.Fatalf("Second member failed to get a message: %v", err) } } @@ -6417,3 +5784,67 @@ func TestJetStreamMaxMsgsPerSubject(t *testing.T) { }) } } + +func TestJetStreamDrainFailsToDeleteConsumer(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errCh := make(chan error, 1) + nc, err := nats.Connect(s.ClientURL(), nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { + select { + case errCh <- err: + default: + } + })) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js.Publish("foo", []byte("hi")) + + blockCh := make(chan struct{}) + sub, err := js.Subscribe("foo", func(m *nats.Msg) { + <-blockCh + }, nats.Durable("dur")) + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + + // Initiate the drain... it won't complete because we have blocked the + // message callback. + sub.Drain() + + // Now delete the JS consumer + if err := js.DeleteConsumer("TEST", "dur"); err != nil { + t.Fatalf("Error deleting consumer: %v", err) + } + + // Now unblock and make sure we get the async error + close(blockCh) + + select { + case err := <-errCh: + if !strings.Contains(err.Error(), "consumer not found") { + t.Fatalf("Unexpected async error: %v", err) + } + case <-time.After(time.Second): + t.Fatal("Did not get async error") + } +} From f730c0230b25510eff77049674044dba52de9b91 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 16 Aug 2021 09:23:53 -0600 Subject: [PATCH 2/4] Add Domain to PubAck Signed-off-by: Ivan Kozlovic --- js.go | 1 + test/js_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/js.go b/js.go index 6740659e2..cee4fd7fd 100644 --- a/js.go +++ b/js.go @@ -273,6 +273,7 @@ type PubAck struct { Stream string `json:"stream"` Sequence uint64 `json:"seq"` Duplicate bool `json:"duplicate,omitempty"` + Domain string `json:"domain,omitempty"` } // Headers for published messages. diff --git a/test/js_test.go b/test/js_test.go index 1f89ba4fa..24109ab95 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -5848,3 +5848,47 @@ func TestJetStreamDrainFailsToDeleteConsumer(t *testing.T) { t.Fatal("Did not get async error") } } + +func TestJetStreamDomainInPubAck(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + jetstream: {domain: "HUB"} + `)) + defer os.Remove(conf) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer os.RemoveAll(config.StoreDir) + } + + // Client for API requests. + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Got error during initialization %v", err) + } + + cfg := &nats.StreamConfig{ + Name: "TEST", + Storage: nats.MemoryStorage, + Subjects: []string{"foo"}, + } + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + pa, err := js.Publish("foo", []byte("msg")) + if err != nil { + t.Fatalf("Error on publish: %v", err) + } + if pa.Domain != "HUB" { + t.Fatalf("Expected PubAck to have domain of %q, got %q", "HUB", pa.Domain) + } +} From e077154668d6a25d54330b5d3898923d9458050b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 16 Aug 2021 12:18:15 -0600 Subject: [PATCH 3/4] Addresses comments change requests based on code review Signed-off-by: Ivan Kozlovic --- example_test.go | 15 +++++++++++---- js.go | 14 +++++++++++--- nats.go | 3 ++- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/example_test.go b/example_test.go index 73cdbc776..ccad2a5ba 100644 --- a/example_test.go +++ b/example_test.go @@ -323,8 +323,17 @@ func ExampleJetStream() { }, nats.ManualAck()) // Async queue subscription where members load balance the - // received messages together. Since no consumer name is specified, - // the queue name will be used as a durable name. + // received messages together. + // If no consumer name is specified, either with nats.Bind() + // or nats.Durable() options, the queue name is used as the + // durable name (that is, as if you were passing the + // nats.Durable() option. + // It is recommended to use nats.Bind() or nats.Durable() + // and preferably create the JetStream consumer beforehand + // (using js.AddConsumer) so that the JS consumer is not + // deleted on an Unsubscribe() or Drain() when the member + // that created the consumer goes away first. + // Check Godoc for the QueueSubscribe() API for more details. js.QueueSubscribe("foo", "group", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) @@ -336,8 +345,6 @@ func ExampleJetStream() { // We can add a member to the group, with this member using // the synchronous version of the QueueSubscribe. - // Since no consumer name is specified, the queue name will be - // used as a durable name. sub, _ = js.QueueSubscribeSync("foo", "group") msg, _ = sub.NextMsg(2 * time.Second) msg.Ack() diff --git a/js.go b/js.go index cee4fd7fd..0218126b8 100644 --- a/js.go +++ b/js.go @@ -879,6 +879,12 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { // Subscribe will create a subscription to the appropriate stream and consumer. // +// The stream and consumer names can be provided with the nats.Bind() option. +// For creating an ephemeral (where the consumer name is picked by the server), +// you can provide the stream name with nats.BindStream(). +// If no stream name is specified, the library will attempt to figure out which +// stream the subscription is for. See important notes below for more details. +// // IMPORTANT NOTES: // * If Bind() and Durable() options are not specified, the library will // send a request to the server to create an ephemeral JetStream consumer, @@ -1868,9 +1874,11 @@ func IdleHeartbeat(duration time.Duration) SubOpt { } // DeliverSubject specifies the JetStream consumer deliver subject. -// This applies only in cases where the no consumer exists and it will be -// created by the library by the subscribe API. -// If a consumer exists, then the NATS subscription will be created to +// +// This option is used only in situations where the consumer does not exist +// and a creation request is sent to the server. If not provided, an inbox +// will be selected. +// If a consumer exists, then the NATS subscription will be created on // the JetStream consumer's DeliverSubject, not necessarily this subject. func DeliverSubject(subject string) SubOpt { return subOptFn(func(opts *subOpts) error { diff --git a/nats.go b/nats.go index f04817e85..6f9067c32 100644 --- a/nats.go +++ b/nats.go @@ -3908,7 +3908,8 @@ func (s *Subscription) Drain() error { // error will be returned. // If you do not wish the JetStream consumer to be automatically deleted, // ensure that the consumer is not created by the library, which means -// create the consumer with AddConsumer and bind to this consumer. +// create the consumer with AddConsumer and bind to this consumer (using +// the nats.Bind() option). func (s *Subscription) Unsubscribe() error { if s == nil { return ErrBadSubscription From 4b8ebba8a326b145f4f3c40fb44b5f84d136faf9 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 16 Aug 2021 18:54:34 -0600 Subject: [PATCH 4/4] Replaced BindDeliverSubject with SubjectIsDelivery As discussed with Matthias who came up with the idea, this is better because then we make use of the provided subject. Otherwise it was looking weird to have something which meaning was: ``` js.SubscribeSync("ignored", nats.BindDeliverSubject("p.d4")) ``` Instead you would now have: ``` sub, err = js.SubscribeSync("p.d4", nats.SubjectIsDelivery()) ``` Signed-off-by: Ivan Kozlovic --- js.go | 31 +++++++++++++++++++------------ test/js_test.go | 35 +++++++++++++++++++++++++++++------ 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/js.go b/js.go index 0218126b8..92b6e5449 100644 --- a/js.go +++ b/js.go @@ -1010,6 +1010,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } } + // If no stream name is specified, or if option SubjectIsDelivery is + // specified, the subject cannot be empty. + if subj == _EMPTY_ && (o.stream == _EMPTY_ || o.subjIsDelivery) { + return nil, fmt.Errorf("nats: subject required") + } + // Note that these may change based on the consumer info response we may get. hasHeartbeats := o.cfg.Heartbeat > 0 hasFC := o.cfg.FlowControl @@ -1021,7 +1027,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) } // No deliver subject should be provided - if o.cfg.DeliverSubject != _EMPTY_ || o.boundSubject != _EMPTY_ { + if o.cfg.DeliverSubject != _EMPTY_ || o.subjIsDelivery { return nil, ErrPullSubscribeToPushConsumer } } @@ -1100,10 +1106,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, hbi = o.cfg.Heartbeat } - // With a bound subject, we go directly create the NATS subscription + // With this option, we go directly create the NATS subscription // and skip all lookup/create. - if o.boundSubject != _EMPTY_ { - deliver = o.boundSubject + if o.subjIsDelivery { + deliver = subj } else { // In case a consumer has not been set explicitly, then the // durable name will be used as the consumer name. @@ -1665,10 +1671,10 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool - // For specifying simply the subject the NATS susbcription should - // be created on. No stream or consumer name lookup/creation will - // be done. - boundSubject string + // Means that the subject passed to subscribe call will be used + // for the low level NATS subscription and no stream nor consumer + // lookup/creation will be done. + subjIsDelivery bool } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. @@ -1887,16 +1893,17 @@ func DeliverSubject(subject string) SubOpt { }) } -// BindDeliverSubject specifies the deliver subject of a JetStream consumer -// that the subscription should subscribe to. +// SubjectIsDelivery specifies that the subject parameter in the subscribe +// call shall be used to create the NATS subscription and matches the +// JetStream consumer's deliver subject. // // NOTE: This is an "expert" API and should only be used when consumer lookup or // creation by the library is not possible (for instance cross accounts). // Since no lookup of the JetStream consumer is done, there is no way for // the library to check the validity of this JetStream subscription. -func BindDeliverSubject(subject string) SubOpt { +func SubjectIsDelivery() SubOpt { return subOptFn(func(opts *subOpts) error { - opts.boundSubject = subject + opts.subjIsDelivery = true return nil }) } diff --git a/test/js_test.go b/test/js_test.go index 24109ab95..619dca410 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -314,6 +314,17 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("stream lookup failed: %v", err) } + // If stream name is not specified, then the subject is required. + if _, err := js.SubscribeSync(""); err == nil || !strings.Contains(err.Error(), "required") { + t.Fatalf("Unexpected error: %v", err) + } + // Check that if stream name is present, then technically the subject does not have to. + sub, err := js.SubscribeSync("", nats.BindStream("TEST")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + // Check that Queue subscribe with HB or FC fails. _, err = js.QueueSubscribeSync("foo", "wq", nats.IdleHeartbeat(time.Second)) if err == nil || !strings.Contains(err.Error(), "heartbeat") { @@ -424,7 +435,7 @@ func TestJetStreamSubscribe(t *testing.T) { // Now create a sync subscriber that is durable. dname := "derek" - sub, err := js.SubscribeSync("foo", nats.Durable(dname)) + sub, err = js.SubscribeSync("foo", nats.Durable(dname)) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -545,11 +556,22 @@ func TestJetStreamSubscribe(t *testing.T) { if err != nats.ErrPullSubscribeToPushConsumer { t.Fatalf("Unexpected error: %v", err) } - // Can't specify BindDeliverSubject - _, err = js.PullSubscribe("bar", "foo", nats.BindDeliverSubject("baz")) + // Can't specify SubjectIsDelivery() for pull subscribers + _, err = js.PullSubscribe("bar", "foo", nats.SubjectIsDelivery()) if err != nats.ErrPullSubscribeToPushConsumer { t.Fatalf("Unexpected error: %v", err) } + // If stream name is not specified, need the subject. + _, err = js.PullSubscribe("", "rip") + if err == nil || !strings.Contains(err.Error(), "required") { + t.Fatalf("Unexpected error: %v", err) + } + // If stream provided, it should be ok. + sub, err = js.PullSubscribe("", "rip", nats.BindStream("TEST")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() batch := 5 sub, err = js.PullSubscribe("bar", "rip") @@ -1971,9 +1993,10 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } waitForPending(t, toSend) - // It is also possible to create a subscription with a BindDeliverSubject() API - // that will not try to do lookup nor create a JS consumer object. - sub, err = js.SubscribeSync("ignored", nats.BindDeliverSubject("p.d4")) + // It is also possible to create a subscription with a SubjectIsDelivery() + // option that says that the given subject will be used to create the low + // level NATS subscription and no lookup/create attempt will be made. + sub, err = js.SubscribeSync("p.d4", nats.SubjectIsDelivery()) if err != nil { t.Fatalf("Unexpected error: %v", err) }