diff --git a/go_test.mod b/go_test.mod index d426ef375..51a6455c8 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b + github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 83dfe546f..eead8215f 100644 --- a/go_test.sum +++ b/go_test.sum @@ -17,8 +17,8 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b h1:h8EYD8Q7yUbjXmMT6z1XI7SAV+aiHhkNEc1O+WImMh4= -github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= +github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c h1:TfLMCHvaj2YSNrgiEWQiXA344lWqPmX3xOLtZj/ywlA= +github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/js.go b/js.go index 7444782a2..b329a1b36 100644 --- a/js.go +++ b/js.go @@ -804,8 +804,9 @@ func ExpectLastMsgId(id string) PubOpt { } type ackOpts struct { - ttl time.Duration - ctx context.Context + ttl time.Duration + ctx context.Context + nakDelay time.Duration } // AckOpt are the options that can be passed when acknowledge a message. @@ -880,29 +881,37 @@ func Context(ctx context.Context) ContextOpt { return ContextOpt{ctx} } +type nakDelay time.Duration + +func (d nakDelay) configureAck(opts *ackOpts) error { + opts.nakDelay = time.Duration(d) + return nil +} + // Subscribe // ConsumerConfig is the configuration of a JetStream consumer. type ConsumerConfig struct { - Durable string `json:"durable_name,omitempty"` - Description string `json:"description,omitempty"` - DeliverSubject string `json:"deliver_subject,omitempty"` - DeliverGroup string `json:"deliver_group,omitempty"` - DeliverPolicy DeliverPolicy `json:"deliver_policy"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - AckPolicy AckPolicy `json:"ack_policy"` - AckWait time.Duration `json:"ack_wait,omitempty"` - MaxDeliver int `json:"max_deliver,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - ReplayPolicy ReplayPolicy `json:"replay_policy"` - RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec - SampleFrequency string `json:"sample_freq,omitempty"` - MaxWaiting int `json:"max_waiting,omitempty"` - MaxAckPending int `json:"max_ack_pending,omitempty"` - FlowControl bool `json:"flow_control,omitempty"` - Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` - HeadersOnly bool `json:"headers_only,omitempty"` + Durable string `json:"durable_name,omitempty"` + Description string `json:"description,omitempty"` + DeliverSubject string `json:"deliver_subject,omitempty"` + DeliverGroup string `json:"deliver_group,omitempty"` + DeliverPolicy DeliverPolicy `json:"deliver_policy"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + AckPolicy AckPolicy `json:"ack_policy"` + AckWait time.Duration `json:"ack_wait,omitempty"` + MaxDeliver int `json:"max_deliver,omitempty"` + BackOff []time.Duration `json:"backoff,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + ReplayPolicy ReplayPolicy `json:"replay_policy"` + RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec + SampleFrequency string `json:"sample_freq,omitempty"` + MaxWaiting int `json:"max_waiting,omitempty"` + MaxAckPending int `json:"max_ack_pending,omitempty"` + FlowControl bool `json:"flow_control,omitempty"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` + HeadersOnly bool `json:"headers_only,omitempty"` // Pull based options. MaxRequestBatch int `json:"max_batch,omitempty"` @@ -2638,14 +2647,22 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error { wait = js.opts.wait } + var body []byte + // This will be > 0 only when called from NakWithDelay() + if o.nakDelay > 0 { + body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds())) + } else { + body = ackType + } + if sync { if usesCtx { - _, err = nc.RequestWithContext(ctx, m.Reply, ackType) + _, err = nc.RequestWithContext(ctx, m.Reply, body) } else { - _, err = nc.Request(m.Reply, ackType, wait) + _, err = nc.Request(m.Reply, body, wait) } } else { - err = nc.Publish(m.Reply, ackType) + err = nc.Publish(m.Reply, body) } // Mark that the message has been acked unless it is AckProgress @@ -2676,6 +2693,17 @@ func (m *Msg) Nak(opts ...AckOpt) error { return m.ackReply(ackNak, false, opts...) } +// Nak negatively acknowledges a message. This tells the server to redeliver +// the message after the give `delay` duration. You can configure the number +// of redeliveries by passing nats.MaxDeliver when you Subscribe. +// The default is infinite redeliveries. +func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error { + if delay > 0 { + opts = append(opts, nakDelay(delay)) + } + return m.ackReply(ackNak, false, opts...) +} + // Term tells the server to not redeliver this message, regardless of the value // of nats.MaxDeliver. func (m *Msg) Term(opts ...AckOpt) error { diff --git a/test/js_test.go b/test/js_test.go index 98a1c1933..85843e709 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2465,7 +2465,7 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) { // Create the stream using our client API. _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST", - Subjects: []string{"foo"}, + Subjects: []string{"foo", "bar"}, }) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -2741,6 +2741,100 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) { checkAcks(t, sub) }) + + t.Run("Nak with delay", func(t *testing.T) { + js.Publish("bar", []byte("msg")) + sub, err := js.SubscribeSync("bar", nats.Durable("nak_dur")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on NextMsg: %v", err) + } + if err := msg.NakWithDelay(500 * time.Millisecond); err != nil { + t.Fatalf("Error on Nak: %v", err) + } + // We should not get redelivery before 500ms+ + if _, err = sub.NextMsg(250 * time.Millisecond); err != nats.ErrTimeout { + t.Fatalf("Expected timeout, got %v", err) + } + msg, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on NextMsg: %v", err) + } + if err := msg.NakWithDelay(0); err != nil { + t.Fatalf("Error on Nak: %v", err) + } + msg, err = sub.NextMsg(250 * time.Millisecond) + if err != nil { + t.Fatalf("Expected timeout, got %v", err) + } + msg.Ack() + }) + + t.Run("BackOff redeliveries", func(t *testing.T) { + inbox := nats.NewInbox() + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + defer sub.Unsubscribe() + cc := nats.ConsumerConfig{ + Durable: "backoff", + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: "bar", + DeliverSubject: inbox, + BackOff: []time.Duration{50 * time.Millisecond, 250 * time.Millisecond}, + } + // First, try with a MaxDeliver that is < len(BackOff), which the + // server should reject. + cc.MaxDeliver = 1 + _, err = js.AddConsumer("TEST", &cc) + if err == nil || !strings.Contains(err.Error(), "max deliver is required to be > length of backoff values") { + t.Fatalf("Expected backoff/max deliver error, got %v", err) + } + // Now put a valid value + cc.MaxDeliver = 4 + ci, err := js.AddConsumer("TEST", &cc) + if err != nil { + t.Fatalf("Error on add consumer: %v", err) + } + if !reflect.DeepEqual(ci.Config.BackOff, cc.BackOff) { + t.Fatalf("Expected backoff to be %v, got %v", cc.BackOff, ci.Config.BackOff) + } + // Consume the first delivery + _, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on nextMsg: %v", err) + } + // We should get a redelivery at around 50ms + start := time.Now() + _, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on nextMsg: %v", err) + } + if dur := time.Since(start); dur < 25*time.Millisecond || dur > 100*time.Millisecond { + t.Fatalf("Expected to be redelivered at around 50ms, took %v", dur) + } + // Now it should be every 250ms or so + for i := 0; i < 2; i++ { + start = time.Now() + _, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on nextMsg for iter=%v: %v", i+1, err) + } + if dur := time.Since(start); dur < 200*time.Millisecond || dur > 300*time.Millisecond { + t.Fatalf("Expected to be redelivered at around 250ms, took %v", dur) + } + } + // At this point, we should have go reach MaxDeliver + _, err = sub.NextMsg(300 * time.Millisecond) + if err != nats.ErrTimeout { + t.Fatalf("Expected timeout, got %v", err) + } + }) } func TestJetStreamPullSubscribe_AckPending(t *testing.T) {