Skip to content

Commit

Permalink
Merge pull request #803 from nats-io/fix_796
Browse files Browse the repository at this point in the history
[IMPROVED] Subscribe returns error if consumer config does not match
  • Loading branch information
kozlovic committed Aug 24, 2021
2 parents fc13b0b + e2c6dff commit 0a2e4a4
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 9 deletions.
111 changes: 104 additions & 7 deletions js.go
Expand Up @@ -973,7 +973,7 @@ func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable)))
}

func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (string, error) {
func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
ccfg := &info.Config

// Make sure this new subject matches or is a subset.
Expand All @@ -990,7 +990,7 @@ func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (s

// If pull mode, nothing else to check here.
if isPullMode {
return _EMPTY_, nil
return _EMPTY_, checkConfig(ccfg, userCfg)
}

// At this point, we know the user wants push mode, and the JS consumer is
Expand Down Expand Up @@ -1019,14 +1019,80 @@ func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (s
queue, dg)
}
}
if err := checkConfig(ccfg, userCfg); err != nil {
return _EMPTY_, err
}
return ccfg.DeliverSubject, nil
}

func checkConfig(s, u *ConsumerConfig) error {
makeErr := func(fieldName string, usrVal, srvVal interface{}) error {
return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal)
}

if u.Durable != _EMPTY_ && u.Durable != s.Durable {
return makeErr("durable", u.Durable, s.Durable)
}
if u.Description != _EMPTY_ && u.Description != s.Description {
return makeErr("description", u.Description, s.Description)
}
if u.DeliverPolicy != deliverPolicyNotSet && u.DeliverPolicy != s.DeliverPolicy {
return makeErr("deliver policy", u.DeliverPolicy, s.DeliverPolicy)
}
if u.OptStartSeq > 0 && u.OptStartSeq != s.OptStartSeq {
return makeErr("optional start sequence", u.OptStartSeq, s.OptStartSeq)
}
if u.OptStartTime != nil && !u.OptStartTime.IsZero() && u.OptStartTime != s.OptStartTime {
return makeErr("optional start time", u.OptStartTime, s.OptStartTime)
}
if u.AckPolicy != ackPolicyNotSet && u.AckPolicy != s.AckPolicy {
return makeErr("ack policy", u.AckPolicy, s.AckPolicy)
}
if u.AckWait > 0 && u.AckWait != s.AckWait {
return makeErr("ack wait", u.AckWait, s.AckWait)
}
if u.MaxDeliver > 0 && u.MaxDeliver != s.MaxDeliver {
return makeErr("max deliver", u.MaxDeliver, s.MaxDeliver)
}
if u.ReplayPolicy != replayPolicyNotSet && u.ReplayPolicy != s.ReplayPolicy {
return makeErr("replay policy", u.ReplayPolicy, s.ReplayPolicy)
}
if u.RateLimit > 0 && u.RateLimit != s.RateLimit {
return makeErr("rate limit", u.RateLimit, s.RateLimit)
}
if u.SampleFrequency != _EMPTY_ && u.SampleFrequency != s.SampleFrequency {
return makeErr("sample frequency", u.SampleFrequency, s.SampleFrequency)
}
if u.MaxWaiting > 0 && u.MaxWaiting != s.MaxWaiting {
return makeErr("max waiting", u.MaxWaiting, s.MaxWaiting)
}
if u.MaxAckPending > 0 && u.MaxAckPending != s.MaxAckPending {
return makeErr("max ack pending", u.MaxAckPending, s.MaxAckPending)
}
// For flow control, we want to fail if the user explicit wanted it, but
// it is not set in the existing consumer. If it is not asked by the user,
// the library still handles it and so no reason to fail.
if u.FlowControl && !s.FlowControl {
return makeErr("flow control", u.FlowControl, s.FlowControl)
}
if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat {
return makeErr("heartbeat", u.Heartbeat, s.Heartbeat)
}
return nil
}

func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) {
cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet}
cfg := ConsumerConfig{
DeliverPolicy: deliverPolicyNotSet,
AckPolicy: ackPolicyNotSet,
ReplayPolicy: replayPolicyNotSet,
}
o := subOpts{cfg: &cfg}
if len(opts) > 0 {
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt.configureSubscribe(&o); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1159,7 +1225,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

switch {
case info != nil:
deliver, err = processConsInfo(info, isPullMode, subj, queue)
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1189,10 +1255,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
cfg.DeliverGroup = queue
}

// If not set default to ack explicit.
// If not set, default to deliver all
if cfg.DeliverPolicy == deliverPolicyNotSet {
cfg.DeliverPolicy = DeliverAllPolicy
}
// If not set, default to ack explicit.
if cfg.AckPolicy == ackPolicyNotSet {
cfg.AckPolicy = AckExplicitPolicy
}
// If not set, default to instant
if cfg.ReplayPolicy == replayPolicyNotSet {
cfg.ReplayPolicy = ReplayInstantPolicy
}

// If we have acks at all and the MaxAckPending is not set go ahead
// and set to the internal max.
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
Expand Down Expand Up @@ -1303,7 +1378,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
deliver, err = processConsInfo(info, isPullMode, subj, queue)
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1712,6 +1787,14 @@ func ManualAck() SubOpt {
})
}

// Description will set the description for the created consumer.
func Description(description string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Description = description
return nil
})
}

// Durable defines the consumer name for JetStream durable subscribers.
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand Down Expand Up @@ -1836,6 +1919,14 @@ func ReplayOriginal() SubOpt {
})
}

// ReplayInstant replays the messages as fast as possible.
func ReplayInstant() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayInstantPolicy
return nil
})
}

// RateLimit is the Bits per sec rate limit applied to a push consumer.
func RateLimit(n uint64) SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand Down Expand Up @@ -2418,7 +2509,7 @@ const (
// AckExplicitPolicy requires ack or nack for all messages.
AckExplicitPolicy

// For setting
// For configuration mismatch check
ackPolicyNotSet = 99
)

Expand Down Expand Up @@ -2478,6 +2569,9 @@ const (

// ReplayOriginalPolicy will maintain the same timing as the messages were received.
ReplayOriginalPolicy

// For configuration mismatch check
replayPolicyNotSet = 99
)

func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -2538,6 +2632,9 @@ const (
// DeliverLastPerSubjectPolicy will start the consumer with the last message
// for all subjects received.
DeliverLastPerSubjectPolicy

// For configuration mismatch check
deliverPolicyNotSet = 99
)

func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
Expand Down

0 comments on commit 0a2e4a4

Please sign in to comment.