Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Subscribe returns error if consumer config does not match #803

Merged
merged 2 commits into from Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 104 additions & 7 deletions js.go
Expand Up @@ -973,7 +973,7 @@ func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable)))
}

func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (string, error) {
func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
ccfg := &info.Config

// Make sure this new subject matches or is a subset.
Expand All @@ -990,7 +990,7 @@ func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (s

// If pull mode, nothing else to check here.
if isPullMode {
return _EMPTY_, nil
return _EMPTY_, checkConfig(ccfg, userCfg)
}

// At this point, we know the user wants push mode, and the JS consumer is
Expand Down Expand Up @@ -1019,14 +1019,80 @@ func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (s
queue, dg)
}
}
if err := checkConfig(ccfg, userCfg); err != nil {
return _EMPTY_, err
}
return ccfg.DeliverSubject, nil
}

func checkConfig(s, u *ConsumerConfig) error {
makeErr := func(fieldName string, usrVal, srvVal interface{}) error {
return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal)
}

if u.Durable != _EMPTY_ && u.Durable != s.Durable {
return makeErr("durable", u.Durable, s.Durable)
}
if u.Description != _EMPTY_ && u.Description != s.Description {
return makeErr("description", u.Description, s.Description)
}
if u.DeliverPolicy != deliverPolicyNotSet && u.DeliverPolicy != s.DeliverPolicy {
return makeErr("deliver policy", u.DeliverPolicy, s.DeliverPolicy)
}
if u.OptStartSeq > 0 && u.OptStartSeq != s.OptStartSeq {
return makeErr("optional start sequence", u.OptStartSeq, s.OptStartSeq)
}
if u.OptStartTime != nil && !u.OptStartTime.IsZero() && u.OptStartTime != s.OptStartTime {
return makeErr("optional start time", u.OptStartTime, s.OptStartTime)
}
if u.AckPolicy != ackPolicyNotSet && u.AckPolicy != s.AckPolicy {
return makeErr("ack policy", u.AckPolicy, s.AckPolicy)
}
if u.AckWait > 0 && u.AckWait != s.AckWait {
return makeErr("ack wait", u.AckWait, s.AckWait)
}
if u.MaxDeliver > 0 && u.MaxDeliver != s.MaxDeliver {
return makeErr("max deliver", u.MaxDeliver, s.MaxDeliver)
}
if u.ReplayPolicy != replayPolicyNotSet && u.ReplayPolicy != s.ReplayPolicy {
return makeErr("replay policy", u.ReplayPolicy, s.ReplayPolicy)
}
if u.RateLimit > 0 && u.RateLimit != s.RateLimit {
return makeErr("rate limit", u.RateLimit, s.RateLimit)
}
if u.SampleFrequency != _EMPTY_ && u.SampleFrequency != s.SampleFrequency {
return makeErr("sample frequency", u.SampleFrequency, s.SampleFrequency)
}
if u.MaxWaiting > 0 && u.MaxWaiting != s.MaxWaiting {
return makeErr("max waiting", u.MaxWaiting, s.MaxWaiting)
}
if u.MaxAckPending > 0 && u.MaxAckPending != s.MaxAckPending {
return makeErr("max ack pending", u.MaxAckPending, s.MaxAckPending)
}
// For flow control, we want to fail if the user explicit wanted it, but
// it is not set in the existing consumer. If it is not asked by the user,
// the library still handles it and so no reason to fail.
if u.FlowControl && !s.FlowControl {
return makeErr("flow control", u.FlowControl, s.FlowControl)
}
if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat {
return makeErr("heartbeat", u.Heartbeat, s.Heartbeat)
}
return nil
}

func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) {
cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet}
cfg := ConsumerConfig{
DeliverPolicy: deliverPolicyNotSet,
AckPolicy: ackPolicyNotSet,
ReplayPolicy: replayPolicyNotSet,
}
o := subOpts{cfg: &cfg}
if len(opts) > 0 {
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt.configureSubscribe(&o); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1159,7 +1225,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

switch {
case info != nil:
deliver, err = processConsInfo(info, isPullMode, subj, queue)
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1189,10 +1255,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
cfg.DeliverGroup = queue
}

// If not set default to ack explicit.
// If not set, default to deliver all
if cfg.DeliverPolicy == deliverPolicyNotSet {
cfg.DeliverPolicy = DeliverAllPolicy
}
// If not set, default to ack explicit.
if cfg.AckPolicy == ackPolicyNotSet {
cfg.AckPolicy = AckExplicitPolicy
}
// If not set, default to instant
if cfg.ReplayPolicy == replayPolicyNotSet {
cfg.ReplayPolicy = ReplayInstantPolicy
}

// If we have acks at all and the MaxAckPending is not set go ahead
// and set to the internal max.
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
Expand Down Expand Up @@ -1303,7 +1378,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
deliver, err = processConsInfo(info, isPullMode, subj, queue)
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1712,6 +1787,14 @@ func ManualAck() SubOpt {
})
}

// Description will set the description for the created consumer.
func Description(description string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Description = description
return nil
})
}

// Durable defines the consumer name for JetStream durable subscribers.
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand Down Expand Up @@ -1836,6 +1919,14 @@ func ReplayOriginal() SubOpt {
})
}

// ReplayInstant replays the messages as fast as possible.
func ReplayInstant() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayInstantPolicy
return nil
})
}

// RateLimit is the Bits per sec rate limit applied to a push consumer.
func RateLimit(n uint64) SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand Down Expand Up @@ -2418,7 +2509,7 @@ const (
// AckExplicitPolicy requires ack or nack for all messages.
AckExplicitPolicy

// For setting
// For configuration mismatch check
ackPolicyNotSet = 99
)

Expand Down Expand Up @@ -2478,6 +2569,9 @@ const (

// ReplayOriginalPolicy will maintain the same timing as the messages were received.
ReplayOriginalPolicy

// For configuration mismatch check
replayPolicyNotSet = 99
)

func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -2538,6 +2632,9 @@ const (
// DeliverLastPerSubjectPolicy will start the consumer with the last message
// for all subjects received.
DeliverLastPerSubjectPolicy

// For configuration mismatch check
deliverPolicyNotSet = 99
)

func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
Expand Down