diff --git a/.travis.yml b/.travis.yml index 89c5c11f4..bec2429be 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,5 +15,5 @@ before_script: - find . -type f -name "*.go" | xargs misspell -error -locale US - staticcheck ./... script: -- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast; fi +- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off +- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi diff --git a/context.go b/context.go index 53e5ebb0e..aa8c00ebf 100644 --- a/context.go +++ b/context.go @@ -122,12 +122,7 @@ func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal, } s.mu.Lock() - err := s.validateNextMsgState() - // Unless this is from an internal call, reject use of this API. - // Users should use Fetch() instead. - if err == nil && !pullSubInternal && s.jsi != nil && s.jsi.pull { - err = ErrTypeSubscription - } + err := s.validateNextMsgState(pullSubInternal) if err != nil { s.mu.Unlock() return nil, err diff --git a/example_test.go b/example_test.go index 0143c8deb..ccad2a5ba 100644 --- a/example_test.go +++ b/example_test.go @@ -324,6 +324,16 @@ func ExampleJetStream() { // Async queue subscription where members load balance the // received messages together. + // If no consumer name is specified, either with nats.Bind() + // or nats.Durable() options, the queue name is used as the + // durable name (that is, as if you were passing the + // nats.Durable() option. + // It is recommended to use nats.Bind() or nats.Durable() + // and preferably create the JetStream consumer beforehand + // (using js.AddConsumer) so that the JS consumer is not + // deleted on an Unsubscribe() or Drain() when the member + // that created the consumer goes away first. + // Check Godoc for the QueueSubscribe() API for more details. js.QueueSubscribe("foo", "group", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) @@ -333,7 +343,8 @@ func ExampleJetStream() { msg, _ := sub.NextMsg(2 * time.Second) msg.Ack() - // QueueSubscribe with group or load balancing. + // We can add a member to the group, with this member using + // the synchronous version of the QueueSubscribe. sub, _ = js.QueueSubscribeSync("foo", "group") msg, _ = sub.NextMsg(2 * time.Second) msg.Ack() diff --git a/go_test.mod b/go_test.mod index a0a0b93ba..3904ad4eb 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.3.4 + github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 5997e8833..32fa570aa 100644 --- a/go_test.sum +++ b/go_test.sum @@ -17,9 +17,9 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.3.4 h1:WcNa6HDFX8gjZPHb8CJ9wxRHEjJSlhWUb/MKb6/mlUY= -github.com/nats-io/nats-server/v2 v2.3.4/go.mod h1:3mtbaN5GkCo/Z5T3nNj0I0/W1fPkKzLiDC6jjWJKp98= -github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178 h1:6/bt9zMGA1D/i3ROeq8GjF8Tig5BVFh4V3gI+qpoWIs= +github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178/go.mod h1:7mTh0KSxKc63xAVop97cFCIGRkWCv6HoX9lMXRSNOhU= +github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/js.go b/js.go index 4309740f5..92b6e5449 100644 --- a/js.go +++ b/js.go @@ -96,6 +96,12 @@ const ( hbcThresh = 2 ) +// Types of control messages, so far heartbeat and flow control +const ( + jsCtrlHB = 1 + jsCtrlFC = 2 +) + // JetStream allows persistent messaging through JetStream. type JetStream interface { // Publish publishes a message to JetStream. @@ -267,6 +273,7 @@ type PubAck struct { Stream string `json:"stream"` Sequence uint64 `json:"seq"` Duplicate bool `json:"duplicate,omitempty"` + Domain string `json:"domain,omitempty"` } // Headers for published messages. @@ -766,6 +773,7 @@ func Context(ctx context.Context) ContextOpt { type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` DeliverSubject string `json:"deliver_subject,omitempty"` + DeliverGroup string `json:"deliver_group,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` @@ -795,12 +803,14 @@ type ConsumerInfo struct { NumWaiting int `json:"num_waiting"` NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` + PushBound bool `json:"push_bound,omitempty"` } // SequencePair includes the consumer and stream sequence info from a JetStream consumer. type SequencePair struct { - Consumer uint64 `json:"consumer_seq"` - Stream uint64 `json:"stream_seq"` + Consumer uint64 `json:"consumer_seq"` + Stream uint64 `json:"stream_seq"` + Last *time.Time `json:"last_active,omitempty"` } // nextRequest is for getting next messages for pull based consumers from JetStream. @@ -812,7 +822,6 @@ type nextRequest struct { // jsSub includes JetStream subscription info. type jsSub struct { - mu sync.RWMutex js *js // For pull subscribers, this is the next message subject to send requests to. @@ -823,8 +832,7 @@ type jsSub struct { stream string deliver string pull bool - durable bool - attached bool + dc bool // Delete JS consumer // Ordered consumers ordered bool @@ -835,29 +843,24 @@ type jsSub struct { // Heartbeats and Flow Control handling from push consumers. hbc *time.Timer hbi time.Duration - hbs bool active bool - fc bool cmeta string - fcs map[uint64]string + fcr string + fcd uint64 } -func (jsi *jsSub) unsubscribe(drainMode bool) error { - jsi.mu.Lock() - durable, attached := jsi.durable, jsi.attached - stream, consumer := jsi.stream, jsi.consumer - js := jsi.js - if jsi.hbc != nil { - jsi.hbc.Stop() - jsi.hbc = nil - } - jsi.mu.Unlock() - - if drainMode && (durable || attached) { - // Skip deleting consumer for durables/attached - // consumers when using drain mode. +// Deletes the JS Consumer. +// No connection nor subscription lock must be held on entry. +func (sub *Subscription) deleteConsumer() error { + sub.mu.Lock() + jsi := sub.jsi + if jsi == nil { + sub.mu.Unlock() return nil } + stream, consumer := jsi.stream, jsi.consumer + js := jsi.js + sub.mu.Unlock() return js.DeleteConsumer(stream, consumer) } @@ -875,6 +878,27 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { } // Subscribe will create a subscription to the appropriate stream and consumer. +// +// The stream and consumer names can be provided with the nats.Bind() option. +// For creating an ephemeral (where the consumer name is picked by the server), +// you can provide the stream name with nats.BindStream(). +// If no stream name is specified, the library will attempt to figure out which +// stream the subscription is for. See important notes below for more details. +// +// IMPORTANT NOTES: +// * If Bind() and Durable() options are not specified, the library will +// send a request to the server to create an ephemeral JetStream consumer, +// which will be deleted after an Unsubscribe() or Drain(), or automatically +// by the server after a short period of time after the NATS subscription is +// gone. +// * If Durable() only is specified, the library will attempt to lookup a JetStream +// consumer with this name and if found, will bind to it and not attempt to +// delete it. However, if not found, the library will send a request to create +// such durable JetStream consumer, but will still attempt to delete it after +// an Unsubscribe() or Drain(). +// * If Bind() option is provided, the library will attempt to lookup the +// consumer with the given name, and if the lookup fails, then the Subscribe() +// call will return an error. func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { return nil, ErrBadSubscription @@ -883,12 +907,15 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti } // SubscribeSync will create a sync subscription to the appropriate stream and consumer. +// See important note in Subscribe() func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts) } // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. +// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// See important note in Subscribe() func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { return nil, ErrBadSubscription @@ -897,17 +924,22 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) } // QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics. +// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// See important note in Subscribe() func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, queue, nil, mch, true, false, opts) } // ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel. +// See important note in Subscribe() func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts) } // ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel. +// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// See important note in Subscribe() func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, queue, nil, ch, false, false, opts) } @@ -918,6 +950,55 @@ func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable))) } +func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (string, error) { + ccfg := &info.Config + + // Make sure this new subject matches or is a subset. + if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { + return _EMPTY_, ErrSubjectMismatch + } + + // Prevent binding a subscription against incompatible consumer types. + if isPullMode && ccfg.DeliverSubject != _EMPTY_ { + return _EMPTY_, ErrPullSubscribeToPushConsumer + } else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ { + return _EMPTY_, ErrPullSubscribeRequired + } + + // If pull mode, nothing else to check here. + if isPullMode { + return _EMPTY_, nil + } + + // At this point, we know the user wants push mode, and the JS consumer is + // really push mode. + + dg := info.Config.DeliverGroup + if dg == _EMPTY_ { + // Prevent an user from attempting to create a queue subscription on + // a JS consumer that was not created with a deliver group. + if queue != _EMPTY_ { + return _EMPTY_, fmt.Errorf("cannot create a queue subscription for a consumer without a deliver group") + } else if info.PushBound { + // Need to reject a non queue subscription to a non queue consumer + // if the consumer is already bound. + return _EMPTY_, fmt.Errorf("consumer is already bound to a subscription") + } + } else { + // If the JS consumer has a deliver group, we need to fail a non queue + // subscription attempt: + if queue == _EMPTY_ { + return _EMPTY_, fmt.Errorf("cannot create a subscription for a consumer with a deliver group %q", dg) + } else if queue != dg { + // Here the user's queue group name does not match the one associated + // with the JS consumer. + return _EMPTY_, fmt.Errorf("cannot create a queue subscription %q for a consumer with a deliver group %q", + queue, dg) + } + } + return ccfg.DeliverSubject, nil +} + func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) { cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} o := subOpts{cfg: &cfg} @@ -929,20 +1010,50 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } } - badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy + // If no stream name is specified, or if option SubjectIsDelivery is + // specified, the subject cannot be empty. + if subj == _EMPTY_ && (o.stream == _EMPTY_ || o.subjIsDelivery) { + return nil, fmt.Errorf("nats: subject required") + } + + // Note that these may change based on the consumer info response we may get. hasHeartbeats := o.cfg.Heartbeat > 0 hasFC := o.cfg.FlowControl - if isPullMode && badPullAck { - return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) + + // Some checks for pull subscribers + if isPullMode { + // Check for bad ack policy + if o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy { + return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) + } + // No deliver subject should be provided + if o.cfg.DeliverSubject != _EMPTY_ || o.subjIsDelivery { + return nil, ErrPullSubscribeToPushConsumer + } + } + + // Some check/setting specific to queue subs + if queue != _EMPTY_ { + // Queue subscriber cannot have HB or FC (since messages will be randomly dispatched + // to members). We may in the future have a separate NATS subscription that all members + // would subscribe to and server would send on. + if o.cfg.Heartbeat > 0 || o.cfg.FlowControl { + // Not making this a public ErrXXX in case we allow in the future. + return nil, fmt.Errorf("nats: queue subscription doesn't support idle heartbeat nor flow control") + } + + // If this is a queue subscription and no consumer nor durable name was specified, + // then we will use the queue name as a durable name. + if queue != _EMPTY_ && o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ { + o.cfg.Durable = queue + } } var ( err error shouldCreate bool - ccfg *ConsumerConfig info *ConsumerInfo deliver string - attached bool stream = o.stream consumer = o.consumer isDurable = o.cfg.Durable != _EMPTY_ @@ -951,6 +1062,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, lookupErr bool nc = js.nc nms string + hbi time.Duration + ccreq *createConsumerRequest // In case we need to hold onto it for ordered consumers. ) // Do some quick checks here for ordered consumers. We do these here instead of spread out @@ -990,126 +1103,106 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } hasFC, hasHeartbeats = true, true o.mack = true // To avoid auto-ack wrapping call below. + hbi = o.cfg.Heartbeat } - // In case a consumer has not been set explicitly, then the - // durable name will be used as the consumer name. - if consumer == _EMPTY_ { - consumer = o.cfg.Durable - } - - // Find the stream mapped to the subject if not bound to a stream already. - if o.stream == _EMPTY_ { - stream, err = js.lookupStreamBySubject(subj) - if err != nil { - return nil, err - } + // With this option, we go directly create the NATS subscription + // and skip all lookup/create. + if o.subjIsDelivery { + deliver = subj } else { - stream = o.stream - } - - // With an explicit durable name, we can lookup the consumer first - // to which it should be attaching to. - if consumer != _EMPTY_ { - info, err = js.ConsumerInfo(stream, consumer) - notFoundErr = errors.Is(err, ErrConsumerNotFound) - lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded - } - - switch { - case info != nil: - // Attach using the found consumer config. - ccfg = &info.Config - attached = true - - // Make sure this new subject matches or is a subset. - if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { - return nil, ErrSubjectMismatch + // In case a consumer has not been set explicitly, then the + // durable name will be used as the consumer name. + if consumer == _EMPTY_ { + consumer = o.cfg.Durable } - // Prevent binding a subscription against incompatible consumer types. - if isPullMode && ccfg.DeliverSubject != _EMPTY_ { - return nil, ErrPullSubscribeToPushConsumer - } else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ { - return nil, ErrPullSubscribeRequired - } - if ccfg.DeliverSubject != _EMPTY_ { - deliver = ccfg.DeliverSubject - } else if !isPullMode { - deliver = nc.newInbox() - } - case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): - // If the consumer is being bound and we got an error on pull subscribe then allow the error. - if !(isPullMode && lookupErr && consumerBound) { - return nil, err + // Find the stream mapped to the subject if not bound to a stream already. + if o.stream == _EMPTY_ { + stream, err = js.lookupStreamBySubject(subj) + if err != nil { + return nil, err + } + } else { + stream = o.stream } - default: - // Attempt to create consumer if not found nor using Bind. - shouldCreate = true - if o.cfg.DeliverSubject != _EMPTY_ { - deliver = o.cfg.DeliverSubject - } else if !isPullMode { - deliver = nc.newInbox() + + // With an explicit durable name, we can lookup the consumer first + // to which it should be attaching to. + if consumer != _EMPTY_ { + info, err = js.ConsumerInfo(stream, consumer) + notFoundErr = errors.Is(err, ErrConsumerNotFound) + lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded } - } - var sub *Subscription + switch { + case info != nil: + deliver, err = processConsInfo(info, isPullMode, subj, queue) + if err != nil { + return nil, err + } + icfg := &info.Config + hasFC, hbi = icfg.FlowControl, icfg.Heartbeat + hasHeartbeats = hbi > 0 + case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): + // If the consumer is being bound and we got an error on pull subscribe then allow the error. + if !(isPullMode && lookupErr && consumerBound) { + return nil, err + } + default: + // Attempt to create consumer if not found nor using Bind. + shouldCreate = true + if o.cfg.DeliverSubject != _EMPTY_ { + deliver = o.cfg.DeliverSubject + } else if !isPullMode { + deliver = nc.newInbox() + cfg.DeliverSubject = deliver + } - // Check if we are manual ack. - if cb != nil && !o.mack { - ocb := cb - cb = func(m *Msg) { ocb(m); m.Ack() } - } + // Do filtering always, server will clear as needed. + cfg.FilterSubject = subj - // In case we need to hold onto it for ordered consumers. - var ccreq *createConsumerRequest + // Pass the queue to the consumer config + if queue != _EMPTY_ { + cfg.DeliverGroup = queue + } - // If we are creating or updating let's update cfg. - if shouldCreate { - if !isPullMode { - cfg.DeliverSubject = deliver - } - // Do filtering always, server will clear as needed. - cfg.FilterSubject = subj - - // If not set default to ack explicit. - if cfg.AckPolicy == ackPolicyNotSet { - cfg.AckPolicy = AckExplicitPolicy - } - // If we have acks at all and the MaxAckPending is not set go ahead - // and set to the internal max. - // TODO(dlc) - We should be able to update this if client updates PendingLimits. - if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { - if !isPullMode && cb != nil && hasFC { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 - } else if ch != nil { - cfg.MaxAckPending = cap(ch) - } else { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit + // If not set default to ack explicit. + if cfg.AckPolicy == ackPolicyNotSet { + cfg.AckPolicy = AckExplicitPolicy } + // If we have acks at all and the MaxAckPending is not set go ahead + // and set to the internal max. + // TODO(dlc) - We should be able to update this if client updates PendingLimits. + if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { + if !isPullMode && cb != nil && hasFC { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 + } else if ch != nil { + cfg.MaxAckPending = cap(ch) + } else { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit + } + } + // Create request here. + ccreq = &createConsumerRequest{ + Stream: stream, + Config: &cfg, + } + hbi = cfg.Heartbeat } - // Create request here. - ccreq = &createConsumerRequest{ - Stream: stream, - Config: &cfg, - } - } - if isPullMode { - nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) - deliver = nc.newInbox() + if isPullMode { + nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) + deliver = nc.newInbox() + } } jsi := &jsSub{ js: js, stream: stream, consumer: consumer, - durable: isDurable, - attached: attached, deliver: deliver, - hbs: hasHeartbeats, - hbi: o.cfg.Heartbeat, - fc: hasFC, + hbi: hbi, ordered: o.ordered, ccreq: ccreq, dseq: 1, @@ -1118,7 +1211,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, psubj: subj, } - sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi) + // Check if we are manual ack. + if cb != nil && !o.mack { + ocb := cb + cb = func(m *Msg) { ocb(m); m.Ack() } + } + sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi) if err != nil { return nil, err } @@ -1176,9 +1274,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if !isPullMode { cleanUpSub() } - // Multiple subscribers could compete in creating the first consumer - // that will be shared using the same durable name. If this happens, then - // do a lookup of the consumer info and resubscribe using the latest info. if consumer != _EMPTY_ && (strings.Contains(cinfo.Error.Description, `consumer already exists`) || strings.Contains(cinfo.Error.Description, `consumer name already in use`)) { @@ -1187,23 +1282,16 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if err != nil { return nil, err } - ccfg = &info.Config - - // Validate that the original subject does still match. - if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { - return nil, ErrSubjectMismatch + deliver, err = processConsInfo(info, isPullMode, subj, queue) + if err != nil { + return nil, err } - - // Update attached status. - jsi.attached = true - - // Use the deliver subject from latest consumer config to attach. - if info.Config.DeliverSubject != _EMPTY_ { + if !isPullMode { // We can't reuse the channel, so if one was passed, we need to create a new one. if ch != nil { ch = make(chan *Msg, cap(ch)) } - jsi.deliver = info.Config.DeliverSubject + jsi.deliver = deliver // Recreate the subscription here. sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) if err != nil { @@ -1216,10 +1304,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } return nil, fmt.Errorf("nats: %s", cinfo.Error.Description) } - } else if consumer == _EMPTY_ { - // Update our consumer name here which is filled in when we create the consumer. + } else { + // Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain() sub.mu.Lock() - sub.jsi.consumer = info.Name + sub.jsi.dc = true + // If this is an ephemeral, we did not have a consumer name, we get it from the info + // after the AddConsumer returns. + if consumer == _EMPTY_ { + sub.jsi.consumer = info.Name + } sub.mu.Unlock() } } @@ -1256,23 +1349,36 @@ func (ecs *ErrConsumerSequenceMismatch) Error() string { ) } -// isControlMessage will return true if this is an empty control status message. -func isControlMessage(msg *Msg) bool { - return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg +// isJSControlMessage will return true if this is an empty control status message +// and indicate what type of control message it is, say jsCtrlHB or jsCtrlFC +func isJSControlMessage(msg *Msg) (bool, int) { + if len(msg.Data) > 0 || msg.Header.Get(statusHdr) != controlMsg { + return false, 0 + } + val := msg.Header.Get(descrHdr) + if strings.HasPrefix(val, "Idle") { + return true, jsCtrlHB + } + if strings.HasPrefix(val, "Flow") { + return true, jsCtrlFC + } + return true, 0 } -func (jsi *jsSub) trackSequences(reply string) { - jsi.mu.Lock() - jsi.cmeta = reply - jsi.mu.Unlock() +// Keeps track of the incoming message's reply subject so that the consumer's +// state (deliver sequence, etc..) can be checked against heartbeats. +// Runs under the subscription lock +func (sub *Subscription) trackSequences(reply string) { + sub.jsi.cmeta = reply } // Check to make sure messages are arriving in order. // Returns true if the sub had to be replaced. Will cause upper layers to return. +// The caller has verified that sub.jsi != nil and that this is not a control message. // Lock should be held. func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { // Ignore msgs with no reply like HBs and flowcontrol, they are handled elsewhere. - if m.Reply == _EMPTY_ || sub.jsi == nil || isControlMessage(m) { + if m.Reply == _EMPTY_ { return false } @@ -1281,19 +1387,15 @@ func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { if err != nil { return false } - sseq, dseq := uint64(parseNum(tokens[5])), uint64(parseNum(tokens[6])) + sseq, dseq := uint64(parseNum(tokens[ackStreamSeqTokenPos])), uint64(parseNum(tokens[ackConsumerSeqTokenPos])) jsi := sub.jsi - jsi.mu.Lock() if dseq != jsi.dseq { - rseq := jsi.sseq + 1 - jsi.mu.Unlock() - sub.resetOrderedConsumer(rseq) + sub.resetOrderedConsumer(jsi.sseq + 1) return true } // Update our tracking here. jsi.dseq, jsi.sseq = dseq+1, sseq - jsi.mu.Unlock() return false } @@ -1322,8 +1424,7 @@ func (sub *Subscription) applyNewSID() (osid int64) { // Lock should be held. func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc := sub.conn - closed := sub.closed - if sub.jsi == nil || nc == nil || closed { + if sub.jsi == nil || nc == nil || sub.closed { return } @@ -1334,8 +1435,8 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { newDeliver := nc.newInbox() sub.Subject = newDeliver - // Snapshot jsi under sub lock here. - jsi := sub.jsi + // Snapshot the new sid under sub lock. + nsid := sub.sid // We are still in the low level readloop for the connection so we need // to spin a go routine to try to create the new consumer. @@ -1345,7 +1446,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { // This is done here in this go routine to prevent lock inversion. nc.mu.Lock() nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_)) - nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, sub.sid)) + nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid)) nc.kickFlusher() nc.mu.Unlock() @@ -1354,11 +1455,12 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc.unsubscribe(sub, 0, true) } - jsi.mu.Lock() + sub.mu.Lock() + jsi := sub.jsi // Reset some items in jsi. jsi.dseq = 1 jsi.cmeta = _EMPTY_ - jsi.fcs = nil + jsi.fcr, jsi.fcd = _EMPTY_, 0 jsi.deliver = newDeliver // Reset consumer request for starting policy. cfg := jsi.ccreq.Config @@ -1369,7 +1471,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream) j, err := json.Marshal(jsi.ccreq) js := jsi.js - jsi.mu.Unlock() + sub.mu.Unlock() if err != nil { pushErr(err) @@ -1397,86 +1499,70 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { return } - jsi.mu.Lock() + sub.mu.Lock() jsi.consumer = cinfo.Name - jsi.mu.Unlock() + sub.mu.Unlock() }() } // checkForFlowControlResponse will check to see if we should send a flow control response -// based on the delivered index. -// Lock should be held. -func (sub *Subscription) checkForFlowControlResponse(delivered uint64) { - jsi, nc := sub.jsi, sub.conn - if jsi == nil { - return - } - - jsi.mu.Lock() - defer jsi.mu.Unlock() - - if len(jsi.fcs) == 0 { - return - } - - if reply := jsi.fcs[delivered]; reply != _EMPTY_ { - delete(jsi.fcs, delivered) - nc.Publish(reply, nil) +// based on the subscription current delivered index and the target. +// Runs under subscription lock +func (sub *Subscription) checkForFlowControlResponse() string { + // Caller has verified that there is a sub.jsi and fc + jsi := sub.jsi + if jsi.fcd == sub.delivered { + fcr := jsi.fcr + jsi.fcr, jsi.fcd = _EMPTY_, 0 + return fcr } + return _EMPTY_ } // Record an inbound flow control message. -func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) { - jsi.mu.Lock() - if jsi.fcs == nil { - jsi.fcs = make(map[uint64]string) - } - jsi.fcs[dfuture] = reply - jsi.mu.Unlock() +// Runs under subscription lock +func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) { + jsi := sub.jsi + jsi.fcr, jsi.fcd = reply, dfuture } // Checks for activity from our consumer. // If we do not think we are active send an async error. func (sub *Subscription) activityCheck() { + sub.mu.Lock() jsi := sub.jsi if jsi == nil { + sub.mu.Unlock() return } - jsi.mu.Lock() active := jsi.active jsi.hbc.Reset(jsi.hbi) jsi.active = false - jsi.mu.Unlock() - - if !active { - sub.mu.Lock() - nc := sub.conn - closed := sub.closed - sub.mu.Unlock() + nc := sub.conn + closed := sub.closed + sub.mu.Unlock() - if !closed { - nc.mu.Lock() - errCB := nc.Opts.AsyncErrorCB - if errCB != nil { - nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) - } - nc.mu.Unlock() + if !active && !closed { + nc.mu.Lock() + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { + nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) } + nc.mu.Unlock() } } // scheduleHeartbeatCheck sets up the timer check to make sure we are active // or receiving idle heartbeats.. func (sub *Subscription) scheduleHeartbeatCheck() { + sub.mu.Lock() + defer sub.mu.Unlock() + jsi := sub.jsi if jsi == nil { return } - jsi.mu.Lock() - defer jsi.mu.Unlock() - if jsi.hbc == nil { jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck) } else { @@ -1497,10 +1583,10 @@ func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) { // checkForSequenceMismatch will make sure we have not missed any messages since last seen. func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) { // Process heartbeat received, get latest control metadata if present. - jsi.mu.Lock() + s.mu.Lock() ctrl, ordered := jsi.cmeta, jsi.ordered jsi.active = true - jsi.mu.Unlock() + s.mu.Unlock() if ctrl == _EMPTY_ { return @@ -1513,7 +1599,7 @@ func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) // Consumer sequence. var ldseq string - dseq := tokens[6] + dseq := tokens[ackConsumerSeqTokenPos] hdr := msg.Header[lastConsumerSeqHdr] if len(hdr) == 1 { ldseq = hdr[0] @@ -1524,7 +1610,7 @@ func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) if ldseq != dseq { // Dispatch async error including details such as // from where the consumer could be restarted. - sseq := parseNum(tokens[5]) + sseq := parseNum(tokens[ackStreamSeqTokenPos]) if ordered { s.mu.Lock() s.resetOrderedConsumer(jsi.sseq + 1) @@ -1585,6 +1671,10 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool + // Means that the subject passed to subscribe call will be used + // for the low level NATS subscription and no stream nor consumer + // lookup/creation will be done. + subjIsDelivery bool } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. @@ -1729,6 +1819,13 @@ func RateLimit(n uint64) SubOpt { } // BindStream binds a consumer to a stream explicitly based on a name. +// When a stream name is not specified, the library uses the subscribe +// subject as a way to find the stream name. It is done by making a request +// to the server to get list of stream names that have a fileter for this +// subject. If the returned list contains a single stream, then this +// stream name will be used, otherwise the `ErrNoMatchingStream` is returned. +// To avoid the stream lookup, provide the stream name with this function. +// See also `Bind()`. func BindStream(stream string) SubOpt { return subOptFn(func(opts *subOpts) error { if opts.stream != _EMPTY_ && opts.stream != stream { @@ -1782,6 +1879,35 @@ func IdleHeartbeat(duration time.Duration) SubOpt { }) } +// DeliverSubject specifies the JetStream consumer deliver subject. +// +// This option is used only in situations where the consumer does not exist +// and a creation request is sent to the server. If not provided, an inbox +// will be selected. +// If a consumer exists, then the NATS subscription will be created on +// the JetStream consumer's DeliverSubject, not necessarily this subject. +func DeliverSubject(subject string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverSubject = subject + return nil + }) +} + +// SubjectIsDelivery specifies that the subject parameter in the subscribe +// call shall be used to create the NATS subscription and matches the +// JetStream consumer's deliver subject. +// +// NOTE: This is an "expert" API and should only be used when consumer lookup or +// creation by the library is not possible (for instance cross accounts). +// Since no lookup of the JetStream consumer is done, there is no way for +// the library to check the validity of this JetStream subscription. +func SubjectIsDelivery() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.subjIsDelivery = true + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -2154,13 +2280,28 @@ type MsgMetadata struct { Timestamp time.Time Stream string Consumer string + Domain string } +const ( + ackDomainTokenPos = 2 + ackAccHashTokenPos = 3 + ackStreamTokenPos = 4 + ackConsumerTokenPos = 5 + ackNumDeliveredTokenPos = 6 + ackStreamSeqTokenPos = 7 + ackConsumerSeqTokenPos = 8 + ackTimestampSeqTokenPos = 9 + ackNumPendingTokenPos = 10 +) + func getMetadataFields(subject string) ([]string, error) { - const expectedTokens = 9 - const btsep = '.' + const noDomainNoHashsExpectedAckTokens = 9 + const withDomainNoHashExpectedAckTokens = 10 + const withDomainAndHashExpectedAckTokens = 11 - tsa := [expectedTokens]string{} + const btsep = '.' + tsa := [withDomainAndHashExpectedAckTokens]string{} start, tokens := 0, tsa[:0] for i := 0; i < len(subject); i++ { if subject[i] == btsep { @@ -2169,9 +2310,39 @@ func getMetadataFields(subject string) ([]string, error) { } } tokens = append(tokens, subject[start:]) - if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { + // + // Newer server will include an account hash in the subject, and possibly the domain. + // So the subject could be: + // + // no domain: $JS.ACK...... + // with domain: $JS.ACK....... + // + // So old server number of tokens is 9, newer is 10 or 11. + // + l := len(tokens) + if l < noDomainNoHashsExpectedAckTokens || l > withDomainAndHashExpectedAckTokens { return nil, ErrNotJSMessage } + if tokens[0] != "$JS" || tokens[1] != "ACK" { + return nil, ErrNotJSMessage + } + // To make the rest of the library agnostic of that, we always return the tokens + // as if it is coming from a new server will all possible tokens. If domain or account + // hash are not specified, the tokens at those locations will simply be empty. + if l == noDomainNoHashsExpectedAckTokens || l == withDomainNoHashExpectedAckTokens { + // Extend the array (we know the backend is big enough) + // Compute how many tokens we need to insert. + itc := withDomainAndHashExpectedAckTokens - l + for i := 0; i < itc; i++ { + tokens = append(tokens, _EMPTY_) + } + // Move to the right anything that is after "ACK" token. + copy(tokens[ackDomainTokenPos+itc:], tokens[ackDomainTokenPos:]) + // Set the missing tokens to empty + for i := 0; i < itc; i++ { + tokens[ackDomainTokenPos+i] = _EMPTY_ + } + } return tokens, nil } @@ -2188,14 +2359,15 @@ func (m *Msg) Metadata() (*MsgMetadata, error) { } meta := &MsgMetadata{ - NumDelivered: uint64(parseNum(tokens[4])), - NumPending: uint64(parseNum(tokens[8])), - Timestamp: time.Unix(0, parseNum(tokens[7])), - Stream: tokens[2], - Consumer: tokens[3], - } - meta.Sequence.Stream = uint64(parseNum(tokens[5])) - meta.Sequence.Consumer = uint64(parseNum(tokens[6])) + Domain: tokens[ackDomainTokenPos], + NumDelivered: uint64(parseNum(tokens[ackNumDeliveredTokenPos])), + NumPending: uint64(parseNum(tokens[ackNumPendingTokenPos])), + Timestamp: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])), + Stream: tokens[ackStreamTokenPos], + Consumer: tokens[ackConsumerTokenPos], + } + meta.Sequence.Stream = uint64(parseNum(tokens[ackStreamSeqTokenPos])) + meta.Sequence.Consumer = uint64(parseNum(tokens[ackConsumerSeqTokenPos])) return meta, nil } diff --git a/js_test.go b/js_test.go index bad65cc62..b268c56ae 100644 --- a/js_test.go +++ b/js_test.go @@ -20,8 +20,11 @@ package nats import ( "crypto/sha256" "encoding/base64" + "fmt" + "io/ioutil" "math/rand" "os" + "reflect" "strings" "sync" "sync/atomic" @@ -39,6 +42,25 @@ func RunBasicJetStreamServer() *server.Server { return natsserver.RunServer(&opts) } +func RunServerWithConfig(configFile string) (*server.Server, *server.Options) { + return natsserver.RunServerWithConfig(configFile) +} + +func createConfFile(t *testing.T, content []byte) string { + t.Helper() + conf, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("Error creating conf file: %v", err) + } + fName := conf.Name() + conf.Close() + if err := ioutil.WriteFile(fName, content, 0666); err != nil { + os.Remove(fName) + t.Fatalf("Error writing conf file: %v", err) + } + return fName +} + // Need access to internals for loss testing. func TestJetStreamOrderedConsumer(t *testing.T) { s := RunBasicJetStreamServer() @@ -344,9 +366,9 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) { testSubError(deleteConsumer) } -// We want to make sure we do the right thing with lots of concurrent durable consumer requests. +// We want to make sure we do the right thing with lots of concurrent queue durable consumer requests. // One should win and the others should share the delivery subject with the first one who wins. -func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) { +func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -382,7 +404,7 @@ func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - sub, _ := js.SubscribeSync("foo", Durable("dlc")) + sub, _ := js.QueueSubscribeSync("foo", "bar") subs <- sub }() } @@ -398,16 +420,30 @@ func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) { t.Fatalf("Expected exactly one consumer, got %d", si.State.Consumers) } - // Now send one message and make sure all subs get it. - js.Publish("foo", []byte("Hello")) - time.Sleep(250 * time.Millisecond) // Allow time for delivery. + // Now send some messages and make sure they are distributed. + total := 1000 + for i := 0; i < total; i++ { + js.Publish("foo", []byte("Hello")) + } - for sub := range subs { - pending, _, _ := sub.Pending() - if pending != 1 { - t.Fatalf("Expected each durable to receive 1 msg, this sub got %d", pending) + timeout := time.Now().Add(2 * time.Second) + got := 0 + for time.Now().Before(timeout) { + got = 0 + for sub := range subs { + pending, _, _ := sub.Pending() + // If a single sub has the total, then probably something is not right. + if pending == total { + t.Fatalf("A single member should not have gotten all messages") + } + got += pending + } + if got == total { + // We are done! + return } } + t.Fatalf("Expected %v messages, got only %v", total, got) } func TestJetStreamSubscribeReconnect(t *testing.T) { @@ -495,3 +531,218 @@ func TestJetStreamSubscribeReconnect(t *testing.T) { // Make sure we can send and receive the msg sendAndReceive("msg2") } + +func TestJetStreamAckTokens(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create the stream using our client API. + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.SubscribeSync("foo") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + now := time.Now() + for _, test := range []struct { + name string + expected *MsgMetadata + str string + err bool + }{ + { + "valid token size but not js ack", + nil, + "one.two.stream.consumer.1.2.3.4.5", + true, + }, + { + "valid token size but not js ack", + nil, + "one.two.hash.stream.consumer.1.2.3.4.5", + true, + }, + { + "valid token size but not js ack", + nil, + "one.two.domain.hash.stream.consumer.1.2.3.4.5", + true, + }, + { + "invalid token size", + nil, + "$JS.ACK.stream.consumer.1.2.3.4", + true, + }, + { + "invalid token size", + nil, + "$JS.ACK.domain.hash.stream.consumer.1.2.3.4.5.6", + true, + }, + { + "no domain no hash", + &MsgMetadata{ + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "", + false, + }, + { + "no domain with hash", + &MsgMetadata{ + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "ACCHASH.", + false, + }, + { + "with domain with hash", + &MsgMetadata{ + Domain: "HUB", + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "HUB.ACCHASH.", + false, + }, + } { + t.Run(test.name, func(t *testing.T) { + msg := NewMsg("foo") + msg.Sub = sub + if test.err { + msg.Reply = test.str + } else { + msg.Reply = fmt.Sprintf("$JS.ACK.%sTEST.cons.1.2.3.%v.4", test.str, now.UnixNano()) + } + + meta, err := msg.Metadata() + if test.err { + if err == nil || meta != nil { + t.Fatalf("Expected error for content: %q, got meta=%+v err=%v", test.str, meta, err) + } + // Expected error, we are done + return + } + if err != nil { + t.Fatalf("Expected: %+v with reply: %q, got error %v", test.expected, msg.Reply, err) + } + if meta.Timestamp.UnixNano() != now.UnixNano() { + t.Fatalf("Timestamp is bad: %v vs %v", now.UnixNano(), meta.Timestamp.UnixNano()) + } + meta.Timestamp = time.Time{} + test.expected.Timestamp = time.Time{} + if !reflect.DeepEqual(test.expected, meta) { + t.Fatalf("Expected %+v, got %+v", test.expected, meta) + } + }) + } +} + +func TestJetStreamFlowControlStalled(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"a"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.SubscribeSync("a", + DeliverSubject("ds"), + Durable("dur"), + IdleHeartbeat(200*time.Millisecond), + EnableFlowControl()); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + // Drop all incoming FC control messages. + fcLoss := func(m *Msg) *Msg { + if _, ctrlType := isJSControlMessage(m); ctrlType == jsCtrlFC { + return nil + } + return m + } + nc.addMsgFilter("ds", fcLoss) + + // Have a subscription on the FC subject to make sure that the library + // respond to the requests for un-stall + checkSub, err := nc.SubscribeSync("$JS.FC.>") + if err != nil { + t.Fatalf("Error on sub: %v", err) + } + + // Publish bunch of messages. + payload := make([]byte, 1024) + for i := 0; i < 250; i++ { + nc.Publish("a", payload) + } + + // Now wait that we respond to a stalled FC + if _, err := checkSub.NextMsg(2 * time.Second); err != nil { + t.Fatal("Library did not send FC") + } +} diff --git a/nats.go b/nats.go index 093a11de3..6f9067c32 100644 --- a/nats.go +++ b/nats.go @@ -2574,21 +2574,22 @@ func (nc *Conn) waitForMsgs(s *Subscription) { mcb := s.mcb max = s.max closed = s.closed + var fcReply string if !s.closed { s.delivered++ delivered = s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 + fcReply = s.checkForFlowControlResponse() s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) - } } } s.mu.Unlock() + // Respond to flow control if applicable + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if closed { break } @@ -2688,8 +2689,8 @@ func (nc *Conn) processMsg(data []byte) { var h Header var err error var ctrlMsg bool - var hasFC bool - var hasHBs bool + var ctrlType int + var fcReply string if nc.ps.ma.hdr > 0 { hbuf := msgPayload[:nc.ps.ma.hdr] @@ -2728,9 +2729,18 @@ func (nc *Conn) processMsg(data []byte) { // Skip flow control messages in case of using a JetStream context. jsi := sub.jsi if jsi != nil { - ctrlMsg, hasHBs, hasFC = isControlMessage(m), jsi.hbs, jsi.fc + // There has to be a header for it to be a control message. + if h != nil { + ctrlMsg, ctrlType = isJSControlMessage(m) + if ctrlMsg && ctrlType == jsCtrlHB { + // Check if the hearbeat has a "Consumer Stalled" header, if + // so, the value is the FC reply to send a nil message to. + // We will send it at the end of this function. + fcReply = m.Header.Get(consumerStalledHdr) + } + } // Check for ordered consumer here. If checkOrdered returns true that means it detected a gap. - if jsi.ordered && sub.checkOrderedMsgs(m) { + if !ctrlMsg && jsi.ordered && sub.checkOrderedMsgs(m) { sub.mu.Unlock() return } @@ -2777,19 +2787,19 @@ func (nc *Conn) processMsg(data []byte) { sub.pTail = m } } - if jsi != nil && hasHBs { + if jsi != nil { // Store the ACK metadata from the message to // compare later on with the received heartbeat. - jsi.trackSequences(m.Reply) + sub.trackSequences(m.Reply) } - } else if hasFC && m.Reply != _EMPTY_ { + } else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ { // This is a flow control message. // If we have no pending, go ahead and send in place. if sub.pMsgs <= 0 { - nc.Publish(m.Reply, nil) + fcReply = m.Reply } else { // Schedule a reply after the previous message is delivered. - jsi.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) + sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) } } @@ -2797,8 +2807,12 @@ func (nc *Conn) processMsg(data []byte) { sub.sc = false sub.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + // Handle control heartbeat messages. - if ctrlMsg && hasHBs && m.Reply == _EMPTY_ { + if ctrlMsg && ctrlType == jsCtrlHB && m.Reply == _EMPTY_ { nc.checkForSequenceMismatch(m, sub, jsi) } @@ -3161,6 +3175,7 @@ const ( descrHdr = "Description" lastConsumerSeqHdr = "Nats-Last-Consumer" lastStreamSeqHdr = "Nats-Last-Stream" + consumerStalledHdr = "Nats-Consumer-Stalled" noResponders = "503" noMessagesSts = "404" reqTimeoutSts = "408" @@ -3808,6 +3823,12 @@ func (nc *Conn) removeSub(s *Subscription) { } s.mch = nil + // If JS subscription then stop HB timer. + if jsi := s.jsi; jsi != nil && jsi.hbc != nil { + jsi.hbc.Stop() + jsi.hbc = nil + } + // Mark as invalid s.closed = true if s.pCond != nil { @@ -3857,6 +3878,15 @@ func (s *Subscription) IsValid() bool { // Drain will remove interest but continue callbacks until all messages // have been processed. +// +// For a JetStream subscription, if the library has created the JetStream +// consumer, the library will send a DeleteConsumer request to the server +// when the Drain operation completes. If a failure occurs when deleting +// the JetStream consumer, an error will be reported to the asynchronous +// error callback. +// If you do not wish the JetStream consumer to be automatically deleted, +// ensure that the consumer is not created by the library, which means +// create the consumer with AddConsumer and bind to this consumer. func (s *Subscription) Drain() error { if s == nil { return ErrBadSubscription @@ -3871,6 +3901,15 @@ func (s *Subscription) Drain() error { } // Unsubscribe will remove interest in the given subject. +// +// For a JetStream subscription, if the library has created the JetStream +// consumer, it will send a DeleteConsumer request to the server (if the +// unsubscribe itself was successful). If the delete operation fails, the +// error will be returned. +// If you do not wish the JetStream consumer to be automatically deleted, +// ensure that the consumer is not created by the library, which means +// create the consumer with AddConsumer and bind to this consumer (using +// the nats.Bind() option). func (s *Subscription) Unsubscribe() error { if s == nil { return ErrBadSubscription @@ -3878,6 +3917,7 @@ func (s *Subscription) Unsubscribe() error { s.mu.Lock() conn := s.conn closed := s.closed + dc := s.jsi != nil && s.jsi.dc s.mu.Unlock() if conn == nil || conn.IsClosed() { return ErrConnectionClosed @@ -3888,7 +3928,11 @@ func (s *Subscription) Unsubscribe() error { if conn.IsDraining() { return ErrConnectionDraining } - return conn.unsubscribe(s, 0, false) + err := conn.unsubscribe(s, 0, false) + if err == nil && dc { + err = s.deleteConsumer() + } + return err } // checkDrained will watch for a subscription to be fully drained @@ -3902,6 +3946,12 @@ func (nc *Conn) checkDrained(sub *Subscription) { // is correct and the server will not send additional information. nc.Flush() + sub.mu.Lock() + // For JS subscriptions, check if we are going to delete the + // JS consumer when drain completes. + dc := sub.jsi != nil && sub.jsi.dc + sub.mu.Unlock() + // Once we are here we just wait for Pending to reach 0 or // any other state to exit this go routine. for { @@ -3921,6 +3971,15 @@ func (nc *Conn) checkDrained(sub *Subscription) { nc.mu.Lock() nc.removeSub(sub) nc.mu.Unlock() + if dc { + if err := sub.deleteConsumer(); err != nil { + nc.mu.Lock() + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { + nc.ach.push(func() { errCB(nc, sub, err) }) + } + nc.mu.Unlock() + } + } return } @@ -3959,18 +4018,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { sub.mu.Unlock() } - // For JetStream consumers, need to clean up ephemeral consumers - // or delete durable ones if called with Unsubscribe. - sub.mu.Lock() - jsi := sub.jsi - sub.mu.Unlock() - if jsi != nil && maxStr == _EMPTY_ { - err := jsi.unsubscribe(drainMode) - if err != nil { - return err - } - } - nc.mu.Lock() // ok here, but defer is expensive defer nc.mu.Unlock() @@ -4014,7 +4061,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { } s.mu.Lock() - err := s.validateNextMsgState() + err := s.validateNextMsgState(false) if err != nil { s.mu.Unlock() return nil, err @@ -4065,7 +4112,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { // validateNextMsgState checks whether the subscription is in a valid // state to call NextMsg and be delivered another message synchronously. // This should be called while holding the lock. -func (s *Subscription) validateNextMsgState() error { +func (s *Subscription) validateNextMsgState(pullSubInternal bool) error { if s.connClosed { return ErrConnectionClosed } @@ -4083,7 +4130,11 @@ func (s *Subscription) validateNextMsgState() error { s.sc = false return ErrSlowConsumer } - + // Unless this is from an internal call, reject use of this API. + // Users should use Fetch() instead. + if !pullSubInternal && s.jsi != nil && s.jsi.pull { + return ErrTypeSubscription + } return nil } @@ -4108,17 +4159,13 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { nc := s.conn max := s.max + var fcReply string // Update some stats. s.delivered++ delivered := s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 + fcReply = s.checkForFlowControlResponse() s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) - } } if s.typ == SyncSubscription { @@ -4127,6 +4174,10 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { } s.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if max > 0 { if delivered > max { return ErrMaxMessages @@ -4729,6 +4780,8 @@ func (nc *Conn) drainConnection() { // will be drained and can not publish any additional messages. Upon draining // of the publishers, the connection will be closed. Use the ClosedCB() // option to know when the connection has moved from draining to closed. +// +// See note in Subscription.Drain for JetStream subscriptions. func (nc *Conn) Drain() error { nc.mu.Lock() if nc.isClosed() { diff --git a/norace_test.go b/norace_test.go index f704de832..5864a113c 100644 --- a/norace_test.go +++ b/norace_test.go @@ -16,7 +16,10 @@ package nats import ( + "context" + "fmt" "os" + "strings" "testing" "time" ) @@ -175,3 +178,639 @@ func TestNoRaceJetStreamConsumerSlowConsumer(t *testing.T) { case <-done: } } + +func TestNoRaceJetStreamPushFlowControlHeartbeats_SubscribeSync(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errHandler := ErrorHandler(func(c *Conn, sub *Subscription, err error) { + t.Logf("WARN: %s", err) + }) + + nc, err := Connect(s.ClientURL(), errHandler) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Burst and try to hit the flow control limit of the server. + const totalMsgs = 16536 + payload := strings.Repeat("A", 1024) + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("i:%d/", i)+payload)); err != nil { + t.Fatal(err) + } + } + + hbTimer := 100 * time.Millisecond + sub, err := js.SubscribeSync("foo", + AckWait(30*time.Second), + MaxDeliver(1), + EnableFlowControl(), + IdleHeartbeat(hbTimer), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + if info.Config.Heartbeat != hbTimer { + t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) + } + + m, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + meta, err := m.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending > totalMsgs { + t.Logf("WARN: More pending messages than expected (%v), got: %v", totalMsgs, meta.NumPending) + } + err = m.Ack() + if err != nil { + t.Fatal(err) + } + + recvd := 1 + timeout := time.Now().Add(10 * time.Second) + for time.Now().Before(timeout) { + m, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + if len(m.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + if err := m.AckSync(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + recvd++ + + if recvd == totalMsgs { + break + } + } + + t.Run("idle heartbeats", func(t *testing.T) { + // Delay to get a few heartbeats. + time.Sleep(4 * hbTimer) + + timeout = time.Now().Add(5 * time.Second) + for time.Now().Before(timeout) { + msg, err := sub.NextMsg(200 * time.Millisecond) + if err != nil { + if err == ErrTimeout { + // If timeout, ok to stop checking for the test. + break + } + t.Fatal(err) + } + if len(msg.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + recvd++ + meta, err := msg.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending == 0 { + break + } + } + if recvd > totalMsgs { + t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) + } + }) + + t.Run("with context", func(t *testing.T) { + sub, err := js.SubscribeSync("foo", + AckWait(30*time.Second), + Durable("bar"), + EnableFlowControl(), + IdleHeartbeat(hbTimer), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err = sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + + recvd = 0 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + default: + } + + m, err := sub.NextMsgWithContext(ctx) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + if len(m.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + if err := m.Ack(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + recvd++ + + if recvd >= totalMsgs { + break + } + } + + // Delay to get a few heartbeats. + time.Sleep(4 * hbTimer) + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + FOR_LOOP: + for { + select { + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + break FOR_LOOP + } + default: + } + + msg, err := sub.NextMsgWithContext(ctx) + if err != nil { + if err == context.DeadlineExceeded { + break + } + t.Fatal(err) + } + if len(msg.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + recvd++ + meta, err := msg.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending == 0 { + break + } + } + if recvd > totalMsgs { + t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) + } + }) +} + +func TestNoRaceJetStreamPushFlowControlHeartbeats_SubscribeAsync(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Burst and try to hit the flow control limit of the server. + const totalMsgs = 16536 + payload := strings.Repeat("A", 1024) + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(payload)); err != nil { + t.Fatal(err) + } + } + + recvd := make(chan *Msg, totalMsgs) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + errCh := make(chan error) + hbTimer := 100 * time.Millisecond + sub, err := js.Subscribe("foo", func(msg *Msg) { + if len(msg.Data) == 0 { + errCh <- fmt.Errorf("Unexpected empty message: %+v", msg) + } + recvd <- msg + + if len(recvd) == totalMsgs { + cancel() + } + }, EnableFlowControl(), IdleHeartbeat(hbTimer)) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + if info.Config.Heartbeat != hbTimer { + t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) + } + + <-ctx.Done() + + got := len(recvd) + expected := totalMsgs + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + // Wait for a couple of heartbeats to arrive and confirm there is no error. + select { + case <-time.After(1 * time.Second): + case err := <-errCh: + t.Fatal(err) + } +} + +func TestNoRaceJetStreamPushFlowControlHeartbeats_ChanSubscribe(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errHandler := ErrorHandler(func(c *Conn, sub *Subscription, err error) { + t.Logf("WARN: %s : %v", err, sub.Subject) + }) + + nc, err := Connect(s.ClientURL(), errHandler) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Burst and try to hit the flow control limit of the server. + const totalMsgs = 16536 + payload := strings.Repeat("A", 1024) + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("i:%d/", i)+payload)); err != nil { + t.Fatal(err) + } + } + + hbTimer := 100 * time.Millisecond + mch := make(chan *Msg, 16536) + sub, err := js.ChanSubscribe("foo", mch, + AckWait(30*time.Second), + MaxDeliver(1), + EnableFlowControl(), + IdleHeartbeat(hbTimer), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + if info.Config.Heartbeat != hbTimer { + t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) + } + + getNextMsg := func(mch chan *Msg, timeout time.Duration) (*Msg, error) { + t.Helper() + select { + case m := <-mch: + return m, nil + case <-time.After(timeout): + return nil, ErrTimeout + } + } + + m, err := getNextMsg(mch, 1*time.Second) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + meta, err := m.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending > totalMsgs { + t.Logf("WARN: More pending messages than expected (%v), got: %v", totalMsgs, meta.NumPending) + } + err = m.Ack() + if err != nil { + t.Fatal(err) + } + + recvd := 1 + ctx, done := context.WithTimeout(context.Background(), 10*time.Second) + defer done() + +Loop: + for { + select { + case <-ctx.Done(): + break Loop + case m := <-mch: + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + if len(m.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + if err := m.Ack(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + recvd++ + + if recvd == totalMsgs { + done() + } + } + } + + t.Run("idle heartbeats", func(t *testing.T) { + // Delay to get a few heartbeats. + time.Sleep(4 * hbTimer) + + ctx, done := context.WithTimeout(context.Background(), 1*time.Second) + defer done() + Loop: + for { + select { + case <-ctx.Done(): + break Loop + case msg := <-mch: + if err != nil { + if err == ErrTimeout { + // If timeout, ok to stop checking for the test. + break Loop + } + t.Fatal(err) + } + if len(msg.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + recvd++ + meta, err := msg.Metadata() + if err != nil { + t.Fatal(err) + } + if meta.NumPending == 0 { + break Loop + } + } + } + if recvd > totalMsgs { + t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) + } + }) +} + +func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errCh := make(chan error) + errHandler := ErrorHandler(func(c *Conn, sub *Subscription, err error) { + errCh <- err + }) + nc, err := Connect(s.ClientURL(), errHandler) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + const totalMsgs = 10_000 + + js, err := nc.JetStream(PublishAsyncMaxPending(totalMsgs)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + go func() { + payload := strings.Repeat("O", 4096) + for i := 0; i < totalMsgs; i++ { + js.PublishAsync("foo", []byte(payload)) + } + }() + + // Small channel that blocks and then buffered channel that can deliver all + // messages without blocking. + recvd := make(chan *Msg, 64) + delivered := make(chan *Msg, totalMsgs) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + // Dispatch channel consumer + go func() { + for m := range recvd { + select { + case <-ctx.Done(): + return + default: + } + + delivered <- m + if len(delivered) == totalMsgs { + cancel() + } + } + }() + + sub, err := js.Subscribe("foo", func(msg *Msg) { + // Cause bottleneck by having channel block when full + // because of work taking long. + recvd <- msg + }, EnableFlowControl()) + + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + // Set this lower then normal to make sure we do not exceed bytes pending with FC turned on. + sub.SetPendingLimits(totalMsgs, 1024*1024) // This matches server window for flowcontrol. + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + <-ctx.Done() + + got := len(delivered) + expected := totalMsgs + if got != expected { + t.Errorf("Expected %d messages, got: %d", expected, got) + } + + // Wait for a couple of heartbeats to arrive and confirm there is no error. + select { + case <-time.After(1 * time.Second): + case err := <-errCh: + t.Errorf("error handler: %v", err) + } +} + +func TestNoRaceJetStreamChanSubscribeStall(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + jetstream: enabled + no_auth_user: pc + accounts: { + JS: { + jetstream: enabled + users: [ {user: pc, password: foo} ] + }, + } + `)) + defer os.Remove(conf) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create a stream. + if _, err = js.AddStream(&StreamConfig{Name: "STALL"}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.StreamInfo("STALL") + if err != nil { + t.Fatalf("stream lookup failed: %v", err) + } + + msg := []byte(strings.Repeat("A", 512)) + toSend := 100_000 + for i := 0; i < toSend; i++ { + // Use plain NATS here for speed. + nc.Publish("STALL", msg) + } + nc.Flush() + + batch := 100 + msgs := make(chan *Msg, batch-2) + sub, err := js.ChanSubscribe("STALL", msgs, + Durable("dlc"), + EnableFlowControl(), + MaxAckPending(batch-2), + ) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + for received := 0; received < toSend; { + select { + case m := <-msgs: + received++ + meta, _ := m.Metadata() + if meta.Sequence.Consumer != uint64(received) { + t.Fatalf("Missed something, wanted %d but got %d", received, meta.Sequence.Consumer) + } + m.Ack() + case <-time.After(time.Second): + t.Fatalf("Timeout waiting for messages, last received was %d", received) + } + } +} diff --git a/scripts/cov.sh b/scripts/cov.sh index dfbd785e8..f7057272a 100755 --- a/scripts/cov.sh +++ b/scripts/cov.sh @@ -3,10 +3,10 @@ rm -rf ./cov mkdir cov -go test -modfile=go_test.mod --failfast -v -race -covermode=atomic -coverprofile=./cov/nats.out -go test -modfile=go_test.mod --failfast -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test -go test -modfile=go_test.mod --failfast -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin -go test -modfile=go_test.mod --failfast -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto +go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/nats.out +go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test +go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin +go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto gocovmerge ./cov/*.out > acc.out rm -rf ./cov diff --git a/test/js_test.go b/test/js_test.go index 9e48c74bd..619dca410 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -285,17 +285,18 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { + expectConsumers := func(t *testing.T, expected int) { t.Helper() - var infos []*nats.ConsumerInfo - for info := range js.ConsumersInfo("TEST") { - infos = append(infos, info) - } - if len(infos) != expected { - t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) - } - - return infos + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("TEST") { + infos = append(infos, info) + } + if len(infos) != expected { + return fmt.Errorf("Expected %d consumers, got: %d", expected, len(infos)) + } + return nil + }) } // Create the stream using our client API. @@ -313,6 +314,27 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("stream lookup failed: %v", err) } + // If stream name is not specified, then the subject is required. + if _, err := js.SubscribeSync(""); err == nil || !strings.Contains(err.Error(), "required") { + t.Fatalf("Unexpected error: %v", err) + } + // Check that if stream name is present, then technically the subject does not have to. + sub, err := js.SubscribeSync("", nats.BindStream("TEST")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + + // Check that Queue subscribe with HB or FC fails. + _, err = js.QueueSubscribeSync("foo", "wq", nats.IdleHeartbeat(time.Second)) + if err == nil || !strings.Contains(err.Error(), "heartbeat") { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.QueueSubscribeSync("foo", "wq", nats.EnableFlowControl()) + if err == nil || !strings.Contains(err.Error(), "flow control") { + t.Fatalf("Unexpected error: %v", err) + } + msg := []byte("Hello JS") // Basic publish like NATS core. @@ -320,32 +342,40 @@ func TestJetStreamSubscribe(t *testing.T) { q := make(chan *nats.Msg, 4) + checkSub, err := nc.SubscribeSync("ivan") + if err != nil { + t.Fatalf("Error on sub: %v", err) + } + // Now create a simple ephemeral consumer. - sub, err := js.Subscribe("foo", func(m *nats.Msg) { + sub1, err := js.Subscribe("foo", func(m *nats.Msg) { q <- m - }) + }, nats.DeliverSubject("ivan")) if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer sub.Unsubscribe() + defer sub1.Unsubscribe() select { case m := <-q: if _, err := m.Metadata(); err != nil { t.Fatalf("Unexpected error: %v", err) } + if _, err := checkSub.NextMsg(time.Second); err != nil { + t.Fatal("Wrong deliver subject") + } case <-time.After(5 * time.Second): t.Fatalf("Did not receive the messages in time") } // Now do same but sync. - sub, err = js.SubscribeSync("foo") + sub2, err := js.SubscribeSync("foo") if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer sub.Unsubscribe() + defer sub2.Unsubscribe() - waitForPending := func(t *testing.T, n int) { + waitForPending := func(t *testing.T, sub *nats.Subscription, n int) { t.Helper() timeout := time.Now().Add(2 * time.Second) for time.Now().Before(timeout) { @@ -358,7 +388,7 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Expected to receive %d messages, but got %d", n, msgs) } - waitForPending(t, 1) + waitForPending(t, sub2, 1) toSend := 10 for i := 0; i < toSend; i++ { @@ -367,7 +397,7 @@ func TestJetStreamSubscribe(t *testing.T) { done := make(chan bool, 1) var received int - sub, err = js.Subscribe("bar", func(m *nats.Msg) { + sub3, err := js.Subscribe("bar", func(m *nats.Msg) { received++ if received == toSend { done <- true @@ -377,7 +407,7 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } expectConsumers(t, 3) - defer sub.Unsubscribe() + defer sub3.Unsubscribe() select { case <-done: @@ -387,7 +417,7 @@ func TestJetStreamSubscribe(t *testing.T) { // If we are here we have received all of the messages. // We hang the ConsumerInfo option off of the subscription, so we use that to check status. - info, _ := sub.ConsumerInfo() + info, _ := sub3.ConsumerInfo() if info.Config.AckPolicy != nats.AckExplicitPolicy { t.Fatalf("Expected ack explicit policy, got %q", info.Config.AckPolicy) } @@ -398,8 +428,10 @@ func TestJetStreamSubscribe(t *testing.T) { if info.AckFloor.Consumer != uint64(toSend) { t.Fatalf("Expected to have ack'd all %d messages, got ack floor of %d", toSend, info.AckFloor.Consumer) } - sub.Unsubscribe() - expectConsumers(t, 2) + sub3.Unsubscribe() + sub2.Unsubscribe() + sub1.Unsubscribe() + expectConsumers(t, 0) // Now create a sync subscriber that is durable. dname := "derek" @@ -408,7 +440,7 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() - expectConsumers(t, 3) + expectConsumers(t, 1) // Make sure we registered as a durable. info, _ = sub.ConsumerInfo() @@ -417,88 +449,87 @@ func TestJetStreamSubscribe(t *testing.T) { } deliver := info.Config.DeliverSubject - // Remove subscription, but do not delete consumer. + // Drain subscription, this will delete the consumer. + go func() { + time.Sleep(250 * time.Millisecond) + for { + if _, err := sub.NextMsg(500 * time.Millisecond); err != nil { + return + } + } + }() sub.Drain() nc.Flush() - expectConsumers(t, 3) + expectConsumers(t, 0) - // Reattach using the same consumer. + // This will recreate a new instance. sub, err = js.SubscribeSync("foo", nats.Durable(dname)) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if info, err := sub.ConsumerInfo(); err != nil || info.Config.DeliverSubject != deliver { - t.Fatal("Expected delivery subject to be the same after reattach") + if info, err := sub.ConsumerInfo(); err != nil || info.Config.DeliverSubject == deliver { + t.Fatal("Expected delivery subject to be different") } - expectConsumers(t, 3) + expectConsumers(t, 1) - // Subscribing again with same subject and durable name is not an error, - // but does not create a new consumer either. - sub, err = js.SubscribeSync("foo", nats.Durable(dname)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + // Subscribing again with same subject and durable name is an error. + if _, err := js.SubscribeSync("foo", nats.Durable(dname)); err == nil { + t.Fatal("Unexpected success") } - if info, err := sub.ConsumerInfo(); err != nil || info.Config.DeliverSubject != deliver { - t.Fatal("Expected delivery subject to be the same after reattach") - } - expectConsumers(t, 3) + expectConsumers(t, 1) - // Cleanup the consumer to be able to create again with a different delivery subject. - // this should be the same as `sub.Unsubscribe()'. - js.DeleteConsumer("TEST", dname) - expectConsumers(t, 2) + // Delete the durable. + sub.Unsubscribe() + expectConsumers(t, 0) - // Create again and make sure that works and that we attach to the same durable with different delivery. + // Create again and make sure that works. sub, err = js.SubscribeSync("foo", nats.Durable(dname)) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() - expectConsumers(t, 3) + expectConsumers(t, 1) if deliver == sub.Subject { t.Fatalf("Expected delivery subject to be different then %q", deliver) } - deliver = sub.Subject + sub.Unsubscribe() + expectConsumers(t, 0) - // Now test that we can attach to an existing durable. - sub, err = js.SubscribeSync("foo", nats.Durable(dname)) + // Create a queue group on "bar" with no explicit durable name, which + // means that the queue name will be used as the durable name. + sub1, err = js.QueueSubscribeSync("bar", "v0") if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer sub.Unsubscribe() + defer sub1.Unsubscribe() + waitForPending(t, sub1, 10) + expectConsumers(t, 1) - if deliver != sub.Subject { - t.Fatalf("Expected delivery subject to be the same when attaching, got different") + // Since the above JS consumer is created on subject "bar", trying to + // add a member to the same group but on subject "baz" should fail. + if _, err = js.QueueSubscribeSync("baz", "v0"); err == nil { + t.Fatal("Unexpected success") } - // New QueueSubscribeSync with the same durable name will not - // create new consumers. - sub, err = js.QueueSubscribeSync("foo", "v0", nats.Durable(dname)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + // If the queue group is different, but we try to attach to the existing + // JS consumer that is created for group "v0", then this should fail. + if _, err = js.QueueSubscribeSync("bar", "v1", nats.Durable("v0")); err == nil { + t.Fatal("Unexpected success") } - defer sub.Unsubscribe() - waitForPending(t, 0) - expectConsumers(t, 3) - - // QueueSubscribeSync with a wrong subject from the previous consumer - // is an error. - _, err = js.QueueSubscribeSync("bar", "v0", nats.Durable(dname)) - if err == nil { - t.Fatalf("Unexpected success") - } - - // QueueSubscribeSync with a different durable name will receive - // the messages. - qsubDurable := nats.Durable(dname + "-qsub") - sub, err = js.QueueSubscribeSync("bar", "v0", qsubDurable) + // However, if a durable name is specified, creating a queue sub with + // the same queue name is ok, but will feed from a different JS consumer. + sub2, err = js.QueueSubscribeSync("bar", "v0", nats.Durable("otherQueueDurable")) if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer sub.Unsubscribe() - waitForPending(t, 10) - expectConsumers(t, 4) + defer sub2.Unsubscribe() + waitForPending(t, sub2, 10) + expectConsumers(t, 2) + + sub1.Unsubscribe() + sub2.Unsubscribe() + expectConsumers(t, 0) // Now try pull based subscribers. @@ -508,26 +539,46 @@ func TestJetStreamSubscribe(t *testing.T) { } // Durable name is required for now. - sub, err = js.PullSubscribe("bar", "") - if err == nil { + if _, err = js.PullSubscribe("bar", ""); err == nil { t.Fatalf("Unexpected success") } - if err != nil && err.Error() != `nats: consumer in pull mode requires a durable name` { + if err.Error() != `nats: consumer in pull mode requires a durable name` { t.Errorf("Expected consumer in pull mode error, got %v", err) } - sub, err = js.PullSubscribe("bar", "foo", nats.Durable("bar")) - if err == nil { + if _, err = js.PullSubscribe("bar", "foo", nats.Durable("bar")); err == nil { t.Fatalf("Unexpected success") } - if err != nil && err.Error() != `nats: option Durable set more than once` { + if err.Error() != `nats: option Durable set more than once` { t.Errorf("Expected consumer in pull mode error, got %v", err) } + // Can't specify DeliverSubject for pull subscribers + _, err = js.PullSubscribe("bar", "foo", nats.DeliverSubject("baz")) + if err != nats.ErrPullSubscribeToPushConsumer { + t.Fatalf("Unexpected error: %v", err) + } + // Can't specify SubjectIsDelivery() for pull subscribers + _, err = js.PullSubscribe("bar", "foo", nats.SubjectIsDelivery()) + if err != nats.ErrPullSubscribeToPushConsumer { + t.Fatalf("Unexpected error: %v", err) + } + // If stream name is not specified, need the subject. + _, err = js.PullSubscribe("", "rip") + if err == nil || !strings.Contains(err.Error(), "required") { + t.Fatalf("Unexpected error: %v", err) + } + // If stream provided, it should be ok. + sub, err = js.PullSubscribe("", "rip", nats.BindStream("TEST")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() batch := 5 sub, err = js.PullSubscribe("bar", "rip") if err != nil { t.Fatalf("Unexpected error: %v", err) } + expectConsumers(t, 1) // The first batch if available should be delivered and queued up. bmsgs, err := sub.Fetch(batch) @@ -550,7 +601,7 @@ func TestJetStreamSubscribe(t *testing.T) { if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch) { t.Fatalf("Expected ack floor to be %d, got %d", batch, info.AckFloor.Consumer) } - waitForPending(t, 0) + waitForPending(t, sub, 0) // Make a request for 10 but should only receive a few. bmsgs, err = sub.Fetch(10, nats.MaxWait(2*time.Second)) @@ -567,8 +618,6 @@ func TestJetStreamSubscribe(t *testing.T) { msg.Ack() } - sub.Drain() - // Now test attaching to a pull based durable. // Test that if we are attaching that the subjects will match up. rip from @@ -588,6 +637,8 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + // No new JS consumer was created. + expectConsumers(t, 1) // Fetch messages a couple of times. expected = 5 @@ -609,6 +660,17 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Expected ack pending of %d and pending to be %d, got %d %d", batch, toSend-batch, info.NumAckPending, info.NumPending) } + // Pull subscriptions can't use NextMsg variants. + if _, err := sub.NextMsg(time.Second); err != nats.ErrTypeSubscription { + t.Fatalf("Expected error %q, got %v", nats.ErrTypeSubscription, err) + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if _, err := sub.NextMsgWithContext(ctx); err != nats.ErrTypeSubscription { + t.Fatalf("Expected error %q, got %v", nats.ErrTypeSubscription, err) + } + cancel() + // Prevent invalid durable names if _, err := js.SubscribeSync("baz", nats.Durable("test.durable")); err != nats.ErrInvalidDurableName { t.Fatalf("Expected invalid durable name error") @@ -619,6 +681,7 @@ func TestJetStreamSubscribe(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + expectConsumers(t, 2) _, err = sub.NextMsg(1 * time.Second) if err != nil { @@ -637,6 +700,7 @@ func TestJetStreamSubscribe(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + expectConsumers(t, 3) m, err := sub.NextMsg(1 * time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -652,13 +716,14 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected consumer name, got: %v", meta.Consumer) } - qsubDurable = nats.Durable(dname + "-qsub-chan") + qsubDurable := nats.Durable("qdur-chan") mch := make(chan *nats.Msg, 16536) sub, err = js.ChanQueueSubscribe("bar", "v1", mch, qsubDurable) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + expectConsumers(t, 4) var a, b *nats.MsgMetadata select { @@ -678,6 +743,8 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + // Not a new JS consumer + expectConsumers(t, 4) // Publish more messages so that at least one is received by // the channel queue subscriber. @@ -700,7 +767,7 @@ func TestJetStreamSubscribe(t *testing.T) { } // Both ChanQueueSubscribers use the same consumer. - expectConsumers(t, 8) + expectConsumers(t, 4) } func TestJetStreamAckPending_Pull(t *testing.T) { @@ -861,471 +928,13 @@ func TestJetStreamAckPending_Pull(t *testing.T) { } } - _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)) - if err != nats.ErrTimeout { - t.Errorf("Expected timeout, got: %v", err) - } -} - -func TestJetStreamAckPending_Push(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer nc.Close() - - js, err := nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - const totalMsgs = 3 - for i := 0; i < totalMsgs; i++ { - if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil { - t.Fatal(err) - } - } - - sub, err := js.SubscribeSync("foo", - nats.Durable("dname-wait"), - nats.AckWait(100*time.Millisecond), - nats.MaxDeliver(5), - nats.MaxAckPending(3), - ) - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() - - // 3 messages delivered 5 times. - expected := 15 - timeout := time.Now().Add(2 * time.Second) - pending := 0 - for time.Now().Before(timeout) { - if pending, _, _ = sub.Pending(); pending >= expected { - break - } - time.Sleep(10 * time.Millisecond) - } - if pending < expected { - t.Errorf("Expected %v, got %v", expected, pending) - } - - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - - got := info.NumRedelivered - expected = 3 - if got < expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - got = info.NumAckPending - expected = 3 - if got < expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - got = info.NumWaiting - expected = 0 - if got != expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - got = int(info.NumPending) - expected = 0 - if got != expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - got = info.Config.MaxAckPending - expected = 3 - if got != expected { - t.Errorf("Expected %v, got %v", expected, pending) - } - - got = info.Config.MaxDeliver - expected = 5 - if got != expected { - t.Errorf("Expected %v, got %v", expected, pending) - } - - acks := map[int]int{} - - ackPending := 3 - timeout = time.Now().Add(2 * time.Second) - for time.Now().Before(timeout) { - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if got, want := info.NumAckPending, ackPending; got > 0 && got != want { - t.Fatalf("unexpected num ack pending: got=%d, want=%d", got, want) - } - - // Continue to ack all messages until no more pending. - pending, _, _ = sub.Pending() - if pending == 0 { - break - } - - m, err := sub.NextMsg(100 * time.Millisecond) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - - if err := m.AckSync(); err != nil { - t.Fatalf("Error on ack message: %v", err) - } - - meta, err := m.Metadata() - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - acks[int(meta.Sequence.Stream)]++ - - if ackPending != 0 { - ackPending-- - } - if int(meta.NumPending) != ackPending { - t.Errorf("Expected %v, got %v", ackPending, meta.NumPending) - } - } - - got = len(acks) - expected = 3 - if got != expected { - t.Errorf("Expected %v, got %v", expected, got) - } - - expected = 5 - for _, got := range acks { - if got != expected { - t.Errorf("Expected %v, got %v", expected, got) - } - } - - _, err = sub.NextMsg(100 * time.Millisecond) - if err != nats.ErrTimeout { - t.Errorf("Expected timeout, got: %v", err) - } -} - -func TestJetStreamPushFlowControlHeartbeats_SubscribeSync(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - errHandler := nats.ErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { - t.Logf("WARN: %s", err) - }) - - nc, err := nats.Connect(s.ClientURL(), errHandler) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer nc.Close() - - js, err := nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Burst and try to hit the flow control limit of the server. - const totalMsgs = 16536 - payload := strings.Repeat("A", 1024) - for i := 0; i < totalMsgs; i++ { - if _, err := js.Publish("foo", []byte(fmt.Sprintf("i:%d/", i)+payload)); err != nil { - t.Fatal(err) - } - } - - hbTimer := 500 * time.Millisecond - sub, err := js.SubscribeSync("foo", - nats.AckWait(30*time.Second), - nats.MaxDeliver(1), - nats.EnableFlowControl(), - nats.IdleHeartbeat(hbTimer), - ) - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() - - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") - } - if info.Config.Heartbeat != hbTimer { - t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) - } - - m, err := sub.NextMsg(1 * time.Second) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - meta, err := m.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending > totalMsgs { - t.Logf("WARN: More pending messages than expected (%v), got: %v", totalMsgs, meta.NumPending) - } - err = m.Ack() - if err != nil { - t.Fatal(err) - } - - recvd := 1 - timeout := time.Now().Add(10 * time.Second) - for time.Now().Before(timeout) { - m, err := sub.NextMsg(1 * time.Second) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - if len(m.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - if err := m.AckSync(); err != nil { - t.Fatalf("Error on ack message: %v", err) - } - recvd++ - - if recvd == totalMsgs { - break - } - } - - t.Run("idle heartbeats", func(t *testing.T) { - // Delay to get a few heartbeats. - time.Sleep(2 * time.Second) - - timeout = time.Now().Add(5 * time.Second) - for time.Now().Before(timeout) { - msg, err := sub.NextMsg(200 * time.Millisecond) - if err != nil { - if err == nats.ErrTimeout { - // If timeout, ok to stop checking for the test. - break - } - t.Fatal(err) - } - if len(msg.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - recvd++ - meta, err := msg.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending == 0 { - break - } - } - if recvd > totalMsgs { - t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) - } - }) - - t.Run("with context", func(t *testing.T) { - sub, err := js.SubscribeSync("foo", - nats.AckWait(100*time.Millisecond), - nats.Durable("bar"), - nats.EnableFlowControl(), - nats.IdleHeartbeat(hbTimer), - ) - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() - - info, err = sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") - } - - recvd = 0 - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - for { - select { - case <-ctx.Done(): - t.Fatal(ctx.Err()) - default: - } - - m, err := sub.NextMsgWithContext(ctx) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - if len(m.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - if err := m.Ack(); err != nil { - t.Fatalf("Error on ack message: %v", err) - } - recvd++ - - if recvd >= totalMsgs { - break - } - } - - // Delay to get a few heartbeats. - time.Sleep(2 * time.Second) - for { - select { - case <-ctx.Done(): - if ctx.Err() == context.DeadlineExceeded { - return - } - default: - } - - msg, err := sub.NextMsgWithContext(ctx) - if err != nil { - if err == context.DeadlineExceeded { - break - } - t.Fatal(err) - } - if len(msg.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - meta, err := msg.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending == 0 { - break - } - } - }) -} - -func TestJetStreamPushFlowControlHeartbeats_SubscribeAsync(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer nc.Close() - - js, err := nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Burst and try to hit the flow control limit of the server. - const totalMsgs = 16536 - payload := strings.Repeat("A", 1024) - for i := 0; i < totalMsgs; i++ { - if _, err := js.Publish("foo", []byte(payload)); err != nil { - t.Fatal(err) - } - } - - recvd := make(chan *nats.Msg, totalMsgs) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - errCh := make(chan error) - hbTimer := 200 * time.Millisecond - sub, err := js.Subscribe("foo", func(msg *nats.Msg) { - if len(msg.Data) == 0 { - errCh <- fmt.Errorf("Unexpected empty message: %+v", msg) - } - recvd <- msg - - if len(recvd) == totalMsgs { - cancel() - } - }, nats.EnableFlowControl(), nats.IdleHeartbeat(hbTimer)) - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() - - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") - } - if info.Config.Heartbeat != hbTimer { - t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) - } - - <-ctx.Done() - - got := len(recvd) - expected := totalMsgs - if got != expected { - t.Errorf("Expected %v, got: %v", expected, got) - } - - // Wait for a couple of heartbeats to arrive and confirm there is no error. - select { - case <-time.After(1 * time.Second): - case err := <-errCh: - t.Fatal(err) + _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)) + if err != nats.ErrTimeout { + t.Errorf("Expected timeout, got: %v", err) } } -func TestJetStreamPushFlowControlHeartbeats_ChanSubscribe(t *testing.T) { +func TestJetStreamAckPending_Push(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -1333,11 +942,7 @@ func TestJetStreamPushFlowControlHeartbeats_ChanSubscribe(t *testing.T) { defer os.RemoveAll(config.StoreDir) } - errHandler := nats.ErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { - t.Logf("WARN: %s : %v", err, sub.Subject) - }) - - nc, err := nats.Connect(s.ClientURL(), errHandler) + nc, err := nats.Connect(s.ClientURL()) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1356,228 +961,137 @@ func TestJetStreamPushFlowControlHeartbeats_ChanSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - // Burst and try to hit the flow control limit of the server. - const totalMsgs = 16536 - payload := strings.Repeat("A", 1024) + const totalMsgs = 3 for i := 0; i < totalMsgs; i++ { - if _, err := js.Publish("foo", []byte(fmt.Sprintf("i:%d/", i)+payload)); err != nil { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil { t.Fatal(err) } } - hbTimer := 500 * time.Millisecond - mch := make(chan *nats.Msg, 16536) - sub, err := js.ChanSubscribe("foo", mch, - nats.AckWait(30*time.Second), - nats.MaxDeliver(1), - nats.EnableFlowControl(), - nats.IdleHeartbeat(hbTimer), + sub, err := js.SubscribeSync("foo", + nats.Durable("dname-wait"), + nats.AckWait(100*time.Millisecond), + nats.MaxDeliver(5), + nats.MaxAckPending(3), ) if err != nil { t.Fatal(err) } defer sub.Unsubscribe() - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") - } - if info.Config.Heartbeat != hbTimer { - t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) - } - - getNextMsg := func(mch chan *nats.Msg, timeout time.Duration) (*nats.Msg, error) { - t.Helper() - select { - case m := <-mch: - return m, nil - case <-time.After(timeout): - return nil, nats.ErrTimeout + // 3 messages delivered 5 times. + expected := 15 + timeout := time.Now().Add(2 * time.Second) + pending := 0 + for time.Now().Before(timeout) { + if pending, _, _ = sub.Pending(); pending >= expected { + break } + time.Sleep(10 * time.Millisecond) } - - m, err := getNextMsg(mch, 1*time.Second) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - meta, err := m.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending > totalMsgs { - t.Logf("WARN: More pending messages than expected (%v), got: %v", totalMsgs, meta.NumPending) + if pending < expected { + t.Errorf("Expected %v, got %v", expected, pending) } - err = m.Ack() + + info, err := sub.ConsumerInfo() if err != nil { t.Fatal(err) } - recvd := 1 - ctx, done := context.WithTimeout(context.Background(), 10*time.Second) - defer done() - -Loop: - for { - select { - case <-ctx.Done(): - break Loop - case m := <-mch: - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - if len(m.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - if err := m.Ack(); err != nil { - t.Fatalf("Error on ack message: %v", err) - } - recvd++ - - if recvd == totalMsgs { - done() - } - } + got := info.NumRedelivered + expected = 3 + if got < expected { + t.Errorf("Expected %v, got: %v", expected, got) } - t.Run("idle heartbeats", func(t *testing.T) { - // Delay to get a few heartbeats. - time.Sleep(2 * time.Second) - - ctx, done := context.WithTimeout(context.Background(), 5*time.Second) - defer done() - Loop: - for { - select { - case <-ctx.Done(): - break Loop - case msg := <-mch: - if err != nil { - if err == nats.ErrTimeout { - // If timeout, ok to stop checking for the test. - break - } - t.Fatal(err) - } - if len(msg.Data) == 0 { - t.Fatalf("Unexpected empty message: %+v", m) - } - - recvd++ - meta, err := msg.Metadata() - if err != nil { - t.Fatal(err) - } - if meta.NumPending == 0 { - break Loop - } - } - } - if recvd > totalMsgs { - t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) - } - }) -} - -func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) + got = info.NumAckPending + expected = 3 + if got < expected { + t.Errorf("Expected %v, got: %v", expected, got) } - errCh := make(chan error) - errHandler := nats.ErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { - errCh <- err - }) - nc, err := nats.Connect(s.ClientURL(), errHandler) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + got = info.NumWaiting + expected = 0 + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) } - defer nc.Close() - const totalMsgs = 10_000 + got = int(info.NumPending) + expected = 0 + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) + } - js, err := nc.JetStream(nats.PublishAsyncMaxPending(totalMsgs)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + got = info.Config.MaxAckPending + expected = 3 + if got != expected { + t.Errorf("Expected %v, got %v", expected, pending) } - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + got = info.Config.MaxDeliver + expected = 5 + if got != expected { + t.Errorf("Expected %v, got %v", expected, pending) } - go func() { - payload := strings.Repeat("O", 4096) - for i := 0; i < totalMsgs; i++ { - js.PublishAsync("foo", []byte(payload)) - } - }() - // Small channel that blocks and then buffered channel that can deliver all - // messages without blocking. - recvd := make(chan *nats.Msg, 64) - delivered := make(chan *nats.Msg, totalMsgs) - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() + acks := map[int]int{} - // Dispatch channel consumer - go func() { - for m := range recvd { - select { - case <-ctx.Done(): - return - default: - } + ackPending := 3 + timeout = time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if got, want := info.NumAckPending, ackPending; got > 0 && got != want { + t.Fatalf("unexpected num ack pending: got=%d, want=%d", got, want) + } - delivered <- m - if len(delivered) == totalMsgs { - cancel() - } + // Continue to ack all messages until no more pending. + pending, _, _ = sub.Pending() + if pending == 0 { + break } - }() - sub, err := js.Subscribe("foo", func(msg *nats.Msg) { - // Cause bottleneck by having channel block when full - // because of work taking long. - recvd <- msg - }, nats.EnableFlowControl()) + m, err := sub.NextMsg(100 * time.Millisecond) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } - if err != nil { - t.Fatal(err) - } - defer sub.Unsubscribe() + if err := m.AckSync(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } - // Set this lower then normal to make sure we do not exceed bytes pending with FC turned on. - sub.SetPendingLimits(totalMsgs, 1024*1024) // This matches server window for flowcontrol. + meta, err := m.Metadata() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + acks[int(meta.Sequence.Stream)]++ - info, err := sub.ConsumerInfo() - if err != nil { - t.Fatal(err) - } - if !info.Config.FlowControl { - t.Fatal("Expected Flow Control to be enabled") + if ackPending != 0 { + ackPending-- + } + if int(meta.NumPending) != ackPending { + t.Errorf("Expected %v, got %v", ackPending, meta.NumPending) + } } - <-ctx.Done() - got := len(delivered) - expected := totalMsgs + got = len(acks) + expected = 3 if got != expected { - t.Errorf("Expected %d messages, got: %d", expected, got) + t.Errorf("Expected %v, got %v", expected, got) } - // Wait for a couple of heartbeats to arrive and confirm there is no error. - select { - case <-time.After(1 * time.Second): - case err := <-errCh: - t.Errorf("error handler: %v", err) + expected = 5 + for _, got := range acks { + if got != expected { + t.Errorf("Expected %v, got %v", expected, got) + } + } + + _, err = sub.NextMsg(100 * time.Millisecond) + if err != nats.ErrTimeout { + t.Errorf("Expected timeout, got: %v", err) } } @@ -2479,11 +1993,17 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } waitForPending(t, toSend) - // Bind has the same effect as above since it would not attempt to create and lookup will work. - sub, err = js.SubscribeSync("ORDERS", nats.Bind("ORDERS", "d4")) + // It is also possible to create a subscription with a SubjectIsDelivery() + // option that says that the given subject will be used to create the low + // level NATS subscription and no lookup/create attempt will be made. + sub, err = js.SubscribeSync("p.d4", nats.SubjectIsDelivery()) if err != nil { t.Fatalf("Unexpected error: %v", err) } + js.Publish("orders", []byte("msg")) + if _, err := sub.NextMsg(time.Second); err != nil { + t.Fatalf("Error getting message: %v", err) + } // Even if there are no permissions or import to check that a consumer exists, // it is still possible to bind subscription to it. @@ -2509,7 +2029,8 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } defer nc.Close() - js, err = nc.JetStream() + // Since we know that the lookup will fail, we use a smaller timeout than the 5s default. + js, err = nc.JetStream(nats.MaxWait(500 * time.Millisecond)) if err != nil { t.Fatal(err) } @@ -2800,83 +2321,6 @@ func TestJetStreamInterfaces(t *testing.T) { publishMsg(js, []byte("hello world")) } -func TestJetStreamChanSubscribeStall(t *testing.T) { - conf := createConfFile(t, []byte(` - listen: 127.0.0.1:-1 - jetstream: enabled - no_auth_user: pc - accounts: { - JS: { - jetstream: enabled - users: [ {user: pc, password: foo} ] - }, - } - `)) - defer os.Remove(conf) - - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer nc.Close() - - js, err := nc.JetStream() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Create a stream. - if _, err = js.AddStream(&nats.StreamConfig{Name: "STALL"}); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - _, err = js.StreamInfo("STALL") - if err != nil { - t.Fatalf("stream lookup failed: %v", err) - } - - msg := []byte(strings.Repeat("A", 512)) - toSend := 100_000 - for i := 0; i < toSend; i++ { - // Use plain NATS here for speed. - nc.Publish("STALL", msg) - } - nc.Flush() - - batch := 100 - msgs := make(chan *nats.Msg, batch-2) - sub, err := js.ChanSubscribe("STALL", msgs, - nats.Durable("dlc"), - nats.EnableFlowControl(), - nats.MaxAckPending(batch-2), - ) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer sub.Unsubscribe() - - for received := 0; received < toSend; { - select { - case m := <-msgs: - received++ - meta, _ := m.Metadata() - if meta.Sequence.Consumer != uint64(received) { - t.Fatalf("Missed something, wanted %d but got %d", received, meta.Sequence.Consumer) - } - m.Ack() - case <-time.After(time.Second): - t.Fatalf("Timeout waiting for messages, last received was %d", received) - } - } -} - func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -3618,17 +3062,25 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { + fetchConsumers := func(t *testing.T, expected int) { + t.Helper() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("foo") { + infos = append(infos, info) + } + if len(infos) != expected { + return fmt.Errorf("Expected %d consumers, got: %d", expected, len(infos)) + } + return nil + }) + } + + deleteAllConsumers := func(t *testing.T) { t.Helper() - var infos []*nats.ConsumerInfo - for info := range js.ConsumersInfo("foo") { - infos = append(infos, info) - } - if len(infos) != expected { - t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) + for cn := range js.ConsumerNames("foo") { + js.DeleteConsumer("foo", cn) } - - return infos } js.Publish("foo.A", []byte("A")) @@ -3636,27 +3088,41 @@ func TestJetStream_Unsubscribe(t *testing.T) { js.Publish("foo.C", []byte("C")) t.Run("consumers deleted on unsubscribe", func(t *testing.T) { - subA, err := js.SubscribeSync("foo.A") + sub, err := js.SubscribeSync("foo.A") if err != nil { t.Fatal(err) } - err = subA.Unsubscribe() + if err := sub.Unsubscribe(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + sub, err = js.SubscribeSync("foo.B", nats.Durable("B")) if err != nil { + t.Fatal(err) + } + if err := sub.Unsubscribe(); err != nil { t.Errorf("Unexpected error: %v", err) } - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + sub, err = js.Subscribe("foo.C", func(_ *nats.Msg) {}) if err != nil { t.Fatal(err) } - err = subB.Unsubscribe() + if err := sub.Unsubscribe(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + sub, err = js.Subscribe("foo.C", func(_ *nats.Msg) {}, nats.Durable("C")) if err != nil { + t.Fatal(err) + } + if err := sub.Unsubscribe(); err != nil { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 0) }) - t.Run("attached pull consumer deleted on unsubscribe", func(t *testing.T) { + t.Run("not deleted on unsubscribe if consumer created externally", func(t *testing.T) { // Created by JetStreamManagement if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ Durable: "wq", @@ -3684,43 +3150,35 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Expected %v, got %v", expected, got) } subC.Unsubscribe() - fetchConsumers(t, 0) + fetchConsumers(t, 1) + deleteAllConsumers(t) }) - t.Run("ephemeral consumers deleted on drain", func(t *testing.T) { - subA, err := js.SubscribeSync("foo.A") + t.Run("consumers deleted on drain", func(t *testing.T) { + subA, err := js.Subscribe("foo.A", func(_ *nats.Msg) {}) if err != nil { t.Fatal(err) } + fetchConsumers(t, 1) err = subA.Drain() if err != nil { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 0) + deleteAllConsumers(t) }) - t.Run("durable consumers not deleted on drain", func(t *testing.T) { - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + t.Run("durable consumers deleted on drain", func(t *testing.T) { + subB, err := js.Subscribe("foo.B", func(_ *nats.Msg) {}, nats.Durable("B")) if err != nil { t.Fatal(err) } - err = subB.Drain() - if err != nil { - t.Errorf("Unexpected error: %v", err) - } fetchConsumers(t, 1) - }) - - t.Run("reattached durable consumers not deleted on drain", func(t *testing.T) { - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) - if err != nil { - t.Fatal(err) - } err = subB.Drain() if err != nil { t.Errorf("Unexpected error: %v", err) } - fetchConsumers(t, 1) + fetchConsumers(t, 0) }) } @@ -3782,8 +3240,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { t.Fatal(err) } - // sub.Drain() or nc.Drain() does not delete the durable consumers, - // just makes client go away. Ephemerals will get deleted though. + // sub.Drain() or nc.Drain() delete JS consumer, same than Unsubscribe() nc.Drain() <-ctx.Done() fetchConsumers(t, 0) @@ -3804,8 +3261,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - _, err = js.SubscribeSync("foo.A") - if err != nil { + if _, err := js.SubscribeSync("foo.A"); err != nil { t.Fatalf("Unexpected error: %v", err) } subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) @@ -3834,7 +3290,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { jsm.Publish("foo.B", []byte("B.2")) jsm.Publish("foo.C", []byte("C.2")) - t.Run("reattached durables consumers can be deleted with unsubscribe", func(t *testing.T) { + t.Run("reattached durables consumers cannot be deleted with unsubscribe", func(t *testing.T) { nc, err := nats.Connect(serverURL) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3871,20 +3327,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { jsm.Publish("foo.B", []byte("B.3")) - // Attach again to the same subject with the durable. - dupSub, err := js.SubscribeSync("foo.B", nats.Durable("B")) - if err != nil { - t.Fatal(err) - } - - // The same durable is already used, so this dup durable - // subscription won't receive the message. - _, err = dupSub.NextMsg(1 * time.Second) - if err == nil { - t.Fatalf("Expected error: %v", err) - } - - // Original sub can still receive the same message. + // Sub can still receive the same message. resp, err = subB.NextMsg(1 * time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3901,17 +3344,8 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { t.Errorf("Unexpected error: %v", err) } - err = dupSub.Unsubscribe() - if err == nil { - t.Fatalf("Unexpected success") - } - if !errors.Is(err, nats.ErrConsumerNotFound) { - t.Errorf("Expected consumer not found error, got: %v", err) - } - - // Remains an ephemeral consumer that did not get deleted - // when Close() was called. - fetchConsumers(t, 1) + // Since library did not create, the JS consumers remain. + fetchConsumers(t, 2) }) } @@ -3947,7 +3381,7 @@ func TestJetStream_UnsubscribeDeleteNoPermissions(t *testing.T) { } defer nc.Close() - js, err := nc.JetStream() + js, err := nc.JetStream(nats.MaxWait(time.Second)) if err != nil { t.Fatal(err) } @@ -4315,14 +3749,14 @@ func waitForJSReady(t *testing.T, nc *nats.Conn) { var err error timeout := time.Now().Add(10 * time.Second) for time.Now().Before(timeout) { - js, err := nc.JetStream() + // Use a smaller MaxWait here since if it fails, we don't want + // to wait for too long since we are going to try again. + js, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond)) if err != nil { t.Fatal(err) } _, err = js.AccountInfo() if err != nil { - // Backoff for a bit until cluster ready. - time.Sleep(250 * time.Millisecond) continue } return @@ -4882,15 +4316,6 @@ func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { continue } - t.Run(fmt.Sprintf("sub n=%d r=%d", n, r), func(t *testing.T) { - name := fmt.Sprintf("SUB%d%d", n, r) - stream := &nats.StreamConfig{ - Name: name, - Replicas: n, - } - withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleSubscribe) - }) - t.Run(fmt.Sprintf("qsub n=%d r=%d", n, r), func(t *testing.T) { name := fmt.Sprintf("MSUB%d%d", n, r) stream := &nats.StreamConfig{ @@ -4912,89 +4337,6 @@ func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { } } -func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs ...*jsServer) { - srv := srvs[0] - nc, err := nats.Connect(srv.ClientURL()) - if err != nil { - t.Fatal(err) - } - defer nc.Close() - - var wg sync.WaitGroup - ctx, done := context.WithTimeout(context.Background(), 2*time.Second) - defer done() - - js, err := nc.JetStream() - if err != nil { - t.Fatal(err) - } - - size := 5 - subs := make([]*nats.Subscription, size) - errCh := make(chan error, size) - - // We are testing auto-bind here so create one first and expect others to bind to it. - sub, err := js.SubscribeSync(subject, nats.Durable("shared")) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - subs[0] = sub - - for i := 1; i < size; i++ { - wg.Add(1) - go func(n int) { - defer wg.Done() - var sub *nats.Subscription - var err error - for attempt := 0; attempt < 5; attempt++ { - sub, err = js.SubscribeSync(subject, nats.Durable("shared")) - if err != nil { - time.Sleep(1 * time.Second) - continue - } - break - } - if err != nil { - errCh <- err - } else { - subs[n] = sub - } - }(i) - } - - go func() { - // Unblock the main context when done. - wg.Wait() - done() - }() - - wg.Wait() - for i := 0; i < size*2; i++ { - js.Publish(subject, []byte("test")) - } - - delivered := 0 - for _, sub := range subs { - if sub == nil { - continue - } - if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs > 0 { - delivered++ - } - } - if delivered < 2 { - t.Fatalf("Expected more than one subscriber to receive a message, got: %v", delivered) - } - - select { - case <-ctx.Done(): - case err := <-errCh: - if err != nil { - t.Fatalf("Unexpected error with multiple subscribers: %v", err) - } - } -} - func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, srvs ...*jsServer) { srv := srvs[0] nc, err := nats.Connect(srv.ClientURL()) @@ -5161,7 +4503,7 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr } func TestJetStream_ClusterReconnect(t *testing.T) { - t.Skip("This test need to be revisited, fails often even without those changes") + t.Skip("This test need to be revisited") n := 3 replicas := []int{1, 3} @@ -6147,7 +5489,7 @@ func TestJetStreamBindConsumer(t *testing.T) { } // Push Consumer Bind Only - _, err = js.SubscribeSync("foo", nats.Bind("foo", "push")) + sub, err := js.SubscribeSync("foo", nats.Bind("foo", "push")) if err != nil { t.Fatal(err) } @@ -6160,12 +5502,54 @@ func TestJetStreamBindConsumer(t *testing.T) { if err == nil || !strings.Contains(err.Error(), `nats: duplicate stream name (foo and foo2)`) { t.Fatalf("Unexpected error: %v", err) } + sub.Unsubscribe() + + checkConsInactive := func() { + t.Helper() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + ci, _ := js.ConsumerInfo("foo", "push") + if ci != nil && !ci.PushBound { + return nil + } + return fmt.Errorf("Conusmer %q still active", "push") + }) + } + checkConsInactive() // Duplicate stream name is fine. - _, err = js.SubscribeSync("foo", nats.BindStream("foo"), nats.Bind("foo", "push")) + sub, err = js.SubscribeSync("foo", nats.BindStream("foo"), nats.Bind("foo", "push")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Cannot have 2 instances for same durable + _, err = js.SubscribeSync("foo", nats.Durable("push")) + if err == nil || !strings.Contains(err.Error(), "already bound") { + t.Fatalf("Unexpected error: %v", err) + } + // Cannot start a queue sub since plain durable is active + _, err = js.QueueSubscribeSync("foo", "wq", nats.Durable("push")) + if err == nil || !strings.Contains(err.Error(), "without a deliver group") { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + checkConsInactive() + + // Create a queue sub + _, err = js.QueueSubscribeSync("foo", "wq1", nats.Durable("qpush")) if err != nil { t.Fatalf("Unexpected error: %v", err) } + // Can't create a plain sub on that durable + _, err = js.SubscribeSync("foo", nats.Durable("qpush")) + if err == nil || !strings.Contains(err.Error(), "cannot create a subscription for a consumer with a deliver group") { + t.Fatalf("Unexpected error: %v", err) + } + // Try to attach different queue group + _, err = js.QueueSubscribeSync("foo", "wq2", nats.Durable("qpush")) + if err == nil || !strings.Contains(err.Error(), "cannot create a queue subscription") { + t.Fatalf("Unexpected error: %v", err) + } // Pull consumer _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ @@ -6201,30 +5585,36 @@ func TestJetStreamBindConsumer(t *testing.T) { t.Fatal(err) } - // Bind to ephemeral consumer by setting the consumer. - sub2, err := js.SubscribeSync("foo", nats.Bind("foo", cinfo.Name)) - if err != nil { + // Cannot bind to ephemeral consumer because it is active. + _, err = js.SubscribeSync("foo", nats.Bind("foo", cinfo.Name)) + if err == nil || !strings.Contains(err.Error(), "already bound") { t.Fatalf("Unexpected error: %v", err) } - sub3, err := nc.SubscribeSync(cinfo.Config.DeliverSubject) + + // However, one can create an ephemeral Queue subscription and bind several members to it. + sub2, err := js.QueueSubscribeSync("foo", "wq3") if err != nil { t.Fatalf("Unexpected error: %v", err) } - js.Publish("foo", []byte("hi 1")) - js.Publish("foo", []byte("hi 2")) - js.Publish("foo", []byte("hi 3")) - - _, err = sub1.NextMsg(1 * time.Second) - if err != nil { - t.Fatal(err) + // Consumer all + for i := 0; i < 25; i++ { + msg, err := sub2.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on NextMsg: %v", err) + } + msg.AckSync() } - _, err = sub2.NextMsg(1 * time.Second) + cinfo, _ = sub2.ConsumerInfo() + sub3, err := js.QueueSubscribeSync("foo", "wq3", nats.Bind("foo", cinfo.Name)) if err != nil { - t.Fatal(err) + t.Fatalf("Unexpected error: %v", err) } - _, err = sub3.NextMsg(1 * time.Second) - if err != nil { - t.Fatal(err) + for i := 0; i < 100; i++ { + js.Publish("foo", []byte("new")) + } + // We expect sub3 to at least get a message + if _, err := sub3.NextMsg(time.Second); err != nil { + t.Fatalf("Second member failed to get a message: %v", err) } } @@ -6417,3 +5807,111 @@ func TestJetStreamMaxMsgsPerSubject(t *testing.T) { }) } } + +func TestJetStreamDrainFailsToDeleteConsumer(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errCh := make(chan error, 1) + nc, err := nats.Connect(s.ClientURL(), nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { + select { + case errCh <- err: + default: + } + })) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js.Publish("foo", []byte("hi")) + + blockCh := make(chan struct{}) + sub, err := js.Subscribe("foo", func(m *nats.Msg) { + <-blockCh + }, nats.Durable("dur")) + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + + // Initiate the drain... it won't complete because we have blocked the + // message callback. + sub.Drain() + + // Now delete the JS consumer + if err := js.DeleteConsumer("TEST", "dur"); err != nil { + t.Fatalf("Error deleting consumer: %v", err) + } + + // Now unblock and make sure we get the async error + close(blockCh) + + select { + case err := <-errCh: + if !strings.Contains(err.Error(), "consumer not found") { + t.Fatalf("Unexpected async error: %v", err) + } + case <-time.After(time.Second): + t.Fatal("Did not get async error") + } +} + +func TestJetStreamDomainInPubAck(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + jetstream: {domain: "HUB"} + `)) + defer os.Remove(conf) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer os.RemoveAll(config.StoreDir) + } + + // Client for API requests. + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Got error during initialization %v", err) + } + + cfg := &nats.StreamConfig{ + Name: "TEST", + Storage: nats.MemoryStorage, + Subjects: []string{"foo"}, + } + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + pa, err := js.Publish("foo", []byte("msg")) + if err != nil { + t.Fatalf("Error on publish: %v", err) + } + if pa.Domain != "HUB" { + t.Fatalf("Expected PubAck to have domain of %q, got %q", "HUB", pa.Domain) + } +}