diff --git a/js.go b/js.go index 9a2f4652a..0f2cb855c 100644 --- a/js.go +++ b/js.go @@ -973,7 +973,7 @@ func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable))) } -func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (string, error) { +func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) { ccfg := &info.Config // Make sure this new subject matches or is a subset. @@ -990,7 +990,7 @@ func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (s // If pull mode, nothing else to check here. if isPullMode { - return _EMPTY_, nil + return _EMPTY_, checkConfig(ccfg, userCfg) } // At this point, we know the user wants push mode, and the JS consumer is @@ -1019,14 +1019,80 @@ func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (s queue, dg) } } + if err := checkConfig(ccfg, userCfg); err != nil { + return _EMPTY_, err + } return ccfg.DeliverSubject, nil } +func checkConfig(s, u *ConsumerConfig) error { + makeErr := func(fieldName string, usrVal, srvVal interface{}) error { + return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal) + } + + if u.Durable != _EMPTY_ && u.Durable != s.Durable { + return makeErr("durable", u.Durable, s.Durable) + } + if u.Description != _EMPTY_ && u.Description != s.Description { + return makeErr("description", u.Description, s.Description) + } + if u.DeliverPolicy != deliverPolicyNotSet && u.DeliverPolicy != s.DeliverPolicy { + return makeErr("deliver policy", u.DeliverPolicy, s.DeliverPolicy) + } + if u.OptStartSeq > 0 && u.OptStartSeq != s.OptStartSeq { + return makeErr("optional start sequence", u.OptStartSeq, s.OptStartSeq) + } + if u.OptStartTime != nil && !u.OptStartTime.IsZero() && u.OptStartTime != s.OptStartTime { + return makeErr("optional start time", u.OptStartTime, s.OptStartTime) + } + if u.AckPolicy != ackPolicyNotSet && u.AckPolicy != s.AckPolicy { + return makeErr("ack policy", u.AckPolicy, s.AckPolicy) + } + if u.AckWait > 0 && u.AckWait != s.AckWait { + return makeErr("ack wait", u.AckWait, s.AckWait) + } + if u.MaxDeliver > 0 && u.MaxDeliver != s.MaxDeliver { + return makeErr("max deliver", u.MaxDeliver, s.MaxDeliver) + } + if u.ReplayPolicy != replayPolicyNotSet && u.ReplayPolicy != s.ReplayPolicy { + return makeErr("replay policy", u.ReplayPolicy, s.ReplayPolicy) + } + if u.RateLimit > 0 && u.RateLimit != s.RateLimit { + return makeErr("rate limit", u.RateLimit, s.RateLimit) + } + if u.SampleFrequency != _EMPTY_ && u.SampleFrequency != s.SampleFrequency { + return makeErr("sample frequency", u.SampleFrequency, s.SampleFrequency) + } + if u.MaxWaiting > 0 && u.MaxWaiting != s.MaxWaiting { + return makeErr("max waiting", u.MaxWaiting, s.MaxWaiting) + } + if u.MaxAckPending > 0 && u.MaxAckPending != s.MaxAckPending { + return makeErr("max ack pending", u.MaxAckPending, s.MaxAckPending) + } + // For flow control, we want to fail if the user explicit wanted it, but + // it is not set in the existing consumer. If it is not asked by the user, + // the library still handles it and so no reason to fail. + if u.FlowControl && !s.FlowControl { + return makeErr("flow control", u.FlowControl, s.FlowControl) + } + if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat { + return makeErr("heartbeat", u.Heartbeat, s.Heartbeat) + } + return nil +} + func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) { - cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} + cfg := ConsumerConfig{ + DeliverPolicy: deliverPolicyNotSet, + AckPolicy: ackPolicyNotSet, + ReplayPolicy: replayPolicyNotSet, + } o := subOpts{cfg: &cfg} if len(opts) > 0 { for _, opt := range opts { + if opt == nil { + continue + } if err := opt.configureSubscribe(&o); err != nil { return nil, err } @@ -1159,7 +1225,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, switch { case info != nil: - deliver, err = processConsInfo(info, isPullMode, subj, queue) + deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue) if err != nil { return nil, err } @@ -1189,10 +1255,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, cfg.DeliverGroup = queue } - // If not set default to ack explicit. + // If not set, default to deliver all + if cfg.DeliverPolicy == deliverPolicyNotSet { + cfg.DeliverPolicy = DeliverAllPolicy + } + // If not set, default to ack explicit. if cfg.AckPolicy == ackPolicyNotSet { cfg.AckPolicy = AckExplicitPolicy } + // If not set, default to instant + if cfg.ReplayPolicy == replayPolicyNotSet { + cfg.ReplayPolicy = ReplayInstantPolicy + } + // If we have acks at all and the MaxAckPending is not set go ahead // and set to the internal max. // TODO(dlc) - We should be able to update this if client updates PendingLimits. @@ -1303,7 +1378,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if err != nil { return nil, err } - deliver, err = processConsInfo(info, isPullMode, subj, queue) + deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue) if err != nil { return nil, err } @@ -1712,6 +1787,14 @@ func ManualAck() SubOpt { }) } +// Description will set the description for the created consumer. +func Description(description string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.Description = description + return nil + }) +} + // Durable defines the consumer name for JetStream durable subscribers. func Durable(consumer string) SubOpt { return subOptFn(func(opts *subOpts) error { @@ -1836,6 +1919,14 @@ func ReplayOriginal() SubOpt { }) } +// ReplayInstant replays the messages as fast as possible. +func ReplayInstant() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.ReplayPolicy = ReplayInstantPolicy + return nil + }) +} + // RateLimit is the Bits per sec rate limit applied to a push consumer. func RateLimit(n uint64) SubOpt { return subOptFn(func(opts *subOpts) error { @@ -2418,7 +2509,7 @@ const ( // AckExplicitPolicy requires ack or nack for all messages. AckExplicitPolicy - // For setting + // For configuration mismatch check ackPolicyNotSet = 99 ) @@ -2478,6 +2569,9 @@ const ( // ReplayOriginalPolicy will maintain the same timing as the messages were received. ReplayOriginalPolicy + + // For configuration mismatch check + replayPolicyNotSet = 99 ) func (p *ReplayPolicy) UnmarshalJSON(data []byte) error { @@ -2538,6 +2632,9 @@ const ( // DeliverLastPerSubjectPolicy will start the consumer with the last message // for all subjects received. DeliverLastPerSubjectPolicy + + // For configuration mismatch check + deliverPolicyNotSet = 99 ) func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { diff --git a/test/js_test.go b/test/js_test.go index 864dae80f..f7cc4ed43 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -30,6 +30,7 @@ import ( "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" natsserver "github.com/nats-io/nats-server/v2/test" ) @@ -2819,7 +2820,7 @@ func TestJetStreamPullSubscribe_AckPending(t *testing.T) { expectedPending(1, 7) meta = getMetadata(msg) if meta.Sequence.Stream != prevSeq { - t.Errorf("Expected to get message at seq=%v, got seq=%v", prevSeq, meta.Stream) + t.Errorf("Expected to get message at seq=%v, got seq=%v", prevSeq, meta.Sequence.Stream) } if string(msg.Data) != prevPayload { t.Errorf("Expected: %q, got: %q", string(prevPayload), string(msg.Data)) @@ -2837,7 +2838,7 @@ func TestJetStreamPullSubscribe_AckPending(t *testing.T) { expectedPending(0, 7) meta = getMetadata(msg) if meta.Sequence.Stream != prevSeq { - t.Errorf("Expected to get message at seq=%v, got seq=%v", prevSeq, meta.Stream) + t.Errorf("Expected to get message at seq=%v, got seq=%v", prevSeq, meta.Sequence.Stream) } if string(msg.Data) != prevPayload { t.Errorf("Expected: %q, got: %q", string(prevPayload), string(msg.Data)) @@ -3596,6 +3597,185 @@ func TestJetStreamSubscribe_RateLimit(t *testing.T) { } } +func TestJetStreamSubscribe_ConfigCantChange(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create the stream using our client API. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for _, test := range []struct { + name string + first nats.SubOpt + second nats.SubOpt + }{ + {"description", nats.Description("a"), nats.Description("b")}, + {"deliver policy", nats.DeliverAll(), nats.DeliverLast()}, + {"optional start sequence", nats.StartSequence(1), nats.StartSequence(10)}, + {"optional start time", nats.StartTime(time.Now()), nats.StartTime(time.Now().Add(-2 * time.Hour))}, + {"ack wait", nats.AckWait(10 * time.Second), nats.AckWait(15 * time.Second)}, + {"max deliver", nats.MaxDeliver(3), nats.MaxDeliver(5)}, + {"replay policy", nats.ReplayOriginal(), nats.ReplayInstant()}, + {"max waiting", nats.PullMaxWaiting(10), nats.PullMaxWaiting(20)}, + {"max ack pending", nats.MaxAckPending(10), nats.MaxAckPending(20)}, + } { + t.Run(test.name, func(t *testing.T) { + durName := nuid.Next() + sub, err := js.PullSubscribe("foo", durName, test.first) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // Once it is created, options can't be changed. + _, err = js.PullSubscribe("foo", durName, test.second) + if err == nil || !strings.Contains(err.Error(), test.name) { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + }) + } + + for _, test := range []struct { + name string + cc *nats.ConsumerConfig + opt nats.SubOpt + }{ + {"ack policy", &nats.ConsumerConfig{AckPolicy: nats.AckAllPolicy}, nats.AckNone()}, + {"rate limit", &nats.ConsumerConfig{RateLimit: 10}, nats.RateLimit(100)}, + {"flow control", &nats.ConsumerConfig{FlowControl: false}, nats.EnableFlowControl()}, + {"heartbeat", &nats.ConsumerConfig{Heartbeat: 10 * time.Second}, nats.IdleHeartbeat(20 * time.Second)}, + } { + t.Run(test.name, func(t *testing.T) { + durName := nuid.Next() + + cc := test.cc + cc.Durable = durName + cc.DeliverSubject = nuid.Next() + if _, err := js.AddConsumer("TEST", cc); err != nil { + t.Fatalf("Error creating consumer: %v", err) + } + + sub, err := js.SubscribeSync("foo", nats.Durable(durName), test.opt) + if err == nil || !strings.Contains(err.Error(), test.name) { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + }) + } + + // Verify that we don't fail if user did not set it. + for _, test := range []struct { + name string + opt nats.SubOpt + }{ + {"description", nats.Description("a")}, + {"deliver policy", nats.DeliverAll()}, + {"optional start sequence", nats.StartSequence(10)}, + {"optional start time", nats.StartTime(time.Now())}, + {"ack wait", nats.AckWait(10 * time.Second)}, + {"max deliver", nats.MaxDeliver(3)}, + {"replay policy", nats.ReplayOriginal()}, + {"max waiting", nats.PullMaxWaiting(10)}, + {"max ack pending", nats.MaxAckPending(10)}, + } { + t.Run(test.name+" not set", func(t *testing.T) { + durName := nuid.Next() + sub, err := js.PullSubscribe("foo", durName, test.opt) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // If not explicitly asked by the user, we are ok + _, err = js.PullSubscribe("foo", durName) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + }) + } + + for _, test := range []struct { + name string + opt nats.SubOpt + }{ + {"default deliver policy", nats.DeliverAll()}, + {"default ack wait", nats.AckWait(30 * time.Second)}, + {"default replay policy", nats.ReplayInstant()}, + {"default max waiting", nats.PullMaxWaiting(512)}, + {"default ack pending", nats.MaxAckPending(65536)}, + } { + t.Run(test.name, func(t *testing.T) { + durName := nuid.Next() + sub, err := js.PullSubscribe("foo", durName) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // If the option is the same as the server default, it is not an error either. + _, err = js.PullSubscribe("foo", durName, test.opt) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + }) + } + + for _, test := range []struct { + name string + opt nats.SubOpt + }{ + {"policy", nats.DeliverNew()}, + {"ack wait", nats.AckWait(31 * time.Second)}, + {"replay policy", nats.ReplayOriginal()}, + {"max waiting", nats.PullMaxWaiting(513)}, + {"ack pending", nats.MaxAckPending(2)}, + } { + t.Run(test.name+" changed from default", func(t *testing.T) { + durName := nuid.Next() + sub, err := js.PullSubscribe("foo", durName) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // First time it was created with defaults and the + // second time a change is attempted, so it is an error. + _, err = js.PullSubscribe("foo", durName, test.opt) + if err == nil || !strings.Contains(err.Error(), test.name) { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + }) + } + + // Check that binding to a durable (without specifying durable option) works + if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "BindDurable", + DeliverSubject: "bar", + }); err != nil { + t.Fatalf("Failed to create consumer: %v", err) + } + if _, err := js.SubscribeSync("foo", nats.Bind("TEST", "BindDurable")); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } +} + type jsServer struct { *server.Server myopts *server.Options