Skip to content

Commit

Permalink
Merge pull request #894 from nats-io/nak_with_delay_and_backoff
Browse files Browse the repository at this point in the history
[ADDED] JetStream: Nak delay and BackOff lists
  • Loading branch information
kozlovic committed Feb 1, 2022
2 parents 565319f + d34c12d commit b16cefd
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 28 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go_test.sum
Expand Up @@ -17,8 +17,8 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b h1:h8EYD8Q7yUbjXmMT6z1XI7SAV+aiHhkNEc1O+WImMh4=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c h1:TfLMCHvaj2YSNrgiEWQiXA344lWqPmX3xOLtZj/ywlA=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
76 changes: 52 additions & 24 deletions js.go
Expand Up @@ -804,8 +804,9 @@ func ExpectLastMsgId(id string) PubOpt {
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
ttl time.Duration
ctx context.Context
nakDelay time.Duration
}

// AckOpt are the options that can be passed when acknowledge a message.
Expand Down Expand Up @@ -880,29 +881,37 @@ func Context(ctx context.Context) ContextOpt {
return ContextOpt{ctx}
}

type nakDelay time.Duration

func (d nakDelay) configureAck(opts *ackOpts) error {
opts.nakDelay = time.Duration(d)
return nil
}

// Subscribe

// ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
Expand Down Expand Up @@ -2638,14 +2647,22 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
wait = js.opts.wait
}

var body []byte
// This will be > 0 only when called from NakWithDelay()
if o.nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds()))
} else {
body = ackType
}

if sync {
if usesCtx {
_, err = nc.RequestWithContext(ctx, m.Reply, ackType)
_, err = nc.RequestWithContext(ctx, m.Reply, body)
} else {
_, err = nc.Request(m.Reply, ackType, wait)
_, err = nc.Request(m.Reply, body, wait)
}
} else {
err = nc.Publish(m.Reply, ackType)
err = nc.Publish(m.Reply, body)
}

// Mark that the message has been acked unless it is AckProgress
Expand Down Expand Up @@ -2676,6 +2693,17 @@ func (m *Msg) Nak(opts ...AckOpt) error {
return m.ackReply(ackNak, false, opts...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message after the give `delay` duration. You can configure the number
// of redeliveries by passing nats.MaxDeliver when you Subscribe.
// The default is infinite redeliveries.
func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error {
if delay > 0 {
opts = append(opts, nakDelay(delay))
}
return m.ackReply(ackNak, false, opts...)
}

// Term tells the server to not redeliver this message, regardless of the value
// of nats.MaxDeliver.
func (m *Msg) Term(opts ...AckOpt) error {
Expand Down
96 changes: 95 additions & 1 deletion test/js_test.go
Expand Up @@ -2465,7 +2465,7 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Subjects: []string{"foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -2741,6 +2741,100 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {

checkAcks(t, sub)
})

t.Run("Nak with delay", func(t *testing.T) {
js.Publish("bar", []byte("msg"))
sub, err := js.SubscribeSync("bar", nats.Durable("nak_dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on NextMsg: %v", err)
}
if err := msg.NakWithDelay(500 * time.Millisecond); err != nil {
t.Fatalf("Error on Nak: %v", err)
}
// We should not get redelivery before 500ms+
if _, err = sub.NextMsg(250 * time.Millisecond); err != nats.ErrTimeout {
t.Fatalf("Expected timeout, got %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on NextMsg: %v", err)
}
if err := msg.NakWithDelay(0); err != nil {
t.Fatalf("Error on Nak: %v", err)
}
msg, err = sub.NextMsg(250 * time.Millisecond)
if err != nil {
t.Fatalf("Expected timeout, got %v", err)
}
msg.Ack()
})

t.Run("BackOff redeliveries", func(t *testing.T) {
inbox := nats.NewInbox()
sub, err := nc.SubscribeSync(inbox)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer sub.Unsubscribe()
cc := nats.ConsumerConfig{
Durable: "backoff",
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: "bar",
DeliverSubject: inbox,
BackOff: []time.Duration{50 * time.Millisecond, 250 * time.Millisecond},
}
// First, try with a MaxDeliver that is < len(BackOff), which the
// server should reject.
cc.MaxDeliver = 1
_, err = js.AddConsumer("TEST", &cc)
if err == nil || !strings.Contains(err.Error(), "max deliver is required to be > length of backoff values") {
t.Fatalf("Expected backoff/max deliver error, got %v", err)
}
// Now put a valid value
cc.MaxDeliver = 4
ci, err := js.AddConsumer("TEST", &cc)
if err != nil {
t.Fatalf("Error on add consumer: %v", err)
}
if !reflect.DeepEqual(ci.Config.BackOff, cc.BackOff) {
t.Fatalf("Expected backoff to be %v, got %v", cc.BackOff, ci.Config.BackOff)
}
// Consume the first delivery
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on nextMsg: %v", err)
}
// We should get a redelivery at around 50ms
start := time.Now()
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on nextMsg: %v", err)
}
if dur := time.Since(start); dur < 25*time.Millisecond || dur > 100*time.Millisecond {
t.Fatalf("Expected to be redelivered at around 50ms, took %v", dur)
}
// Now it should be every 250ms or so
for i := 0; i < 2; i++ {
start = time.Now()
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on nextMsg for iter=%v: %v", i+1, err)
}
if dur := time.Since(start); dur < 200*time.Millisecond || dur > 300*time.Millisecond {
t.Fatalf("Expected to be redelivered at around 250ms, took %v", dur)
}
}
// At this point, we should have go reach MaxDeliver
_, err = sub.NextMsg(300 * time.Millisecond)
if err != nats.ErrTimeout {
t.Fatalf("Expected timeout, got %v", err)
}
})
}

func TestJetStreamPullSubscribe_AckPending(t *testing.T) {
Expand Down

0 comments on commit b16cefd

Please sign in to comment.