Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] JetStream: Nak delay and BackOff lists #894

Merged
merged 3 commits into from Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go_test.sum
Expand Up @@ -17,8 +17,8 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b h1:h8EYD8Q7yUbjXmMT6z1XI7SAV+aiHhkNEc1O+WImMh4=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c h1:TfLMCHvaj2YSNrgiEWQiXA344lWqPmX3xOLtZj/ywlA=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
73 changes: 45 additions & 28 deletions js.go
Expand Up @@ -884,25 +884,26 @@ func Context(ctx context.Context) ContextOpt {

// ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
Expand Down Expand Up @@ -2599,7 +2600,7 @@ func (m *Msg) checkReply() (*js, *jsSub, error) {
// ackReply handles all acks. Will do the right thing for pull and sync mode.
// It ensures that an ack is only sent a single time, regardless of
// how many times it is being called to avoid duplicated acks.
func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
func (m *Msg) ackReply(ackType []byte, nakDelay time.Duration, sync bool, opts ...AckOpt) error {
var o ackOpts
for _, opt := range opts {
if err := opt.configureAck(&o); err != nil {
Expand Down Expand Up @@ -2638,14 +2639,22 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
wait = js.opts.wait
}

var body []byte
// This will be > 0 only when called from NakWithDelay()
if nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, nakDelay.Nanoseconds()))
} else {
body = ackType
}

if sync {
if usesCtx {
_, err = nc.RequestWithContext(ctx, m.Reply, ackType)
_, err = nc.RequestWithContext(ctx, m.Reply, body)
} else {
_, err = nc.Request(m.Reply, ackType, wait)
_, err = nc.Request(m.Reply, body, wait)
}
} else {
err = nc.Publish(m.Reply, ackType)
err = nc.Publish(m.Reply, body)
}

// Mark that the message has been acked unless it is AckProgress
Expand All @@ -2660,32 +2669,40 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
// Ack acknowledges a message. This tells the server that the message was
// successfully processed and it can move on to the next message.
func (m *Msg) Ack(opts ...AckOpt) error {
return m.ackReply(ackAck, false, opts...)
return m.ackReply(ackAck, 0, false, opts...)
}

// AckSync is the synchronous version of Ack. This indicates successful message
// processing.
func (m *Msg) AckSync(opts ...AckOpt) error {
return m.ackReply(ackAck, true, opts...)
return m.ackReply(ackAck, 0, true, opts...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message. You can configure the number of redeliveries by passing
// nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
func (m *Msg) Nak(opts ...AckOpt) error {
return m.ackReply(ackNak, false, opts...)
return m.ackReply(ackNak, 0, false, opts...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message after the give `delay` duration. You can configure the number
// of redeliveries by passing nats.MaxDeliver when you Subscribe.
// The default is infinite redeliveries.
func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error {
return m.ackReply(ackNak, delay, false, opts...)
}

// Term tells the server to not redeliver this message, regardless of the value
// of nats.MaxDeliver.
func (m *Msg) Term(opts ...AckOpt) error {
return m.ackReply(ackTerm, false, opts...)
return m.ackReply(ackTerm, 0, false, opts...)
}

// InProgress tells the server that this message is being worked on. It resets
// the redelivery timer on the server.
func (m *Msg) InProgress(opts ...AckOpt) error {
return m.ackReply(ackProgress, false, opts...)
return m.ackReply(ackProgress, 0, false, opts...)
}

// MsgMetadata is the JetStream metadata associated with received messages.
Expand Down
96 changes: 95 additions & 1 deletion test/js_test.go
Expand Up @@ -2465,7 +2465,7 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Subjects: []string{"foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -2741,6 +2741,100 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {

checkAcks(t, sub)
})

t.Run("Nak with delay", func(t *testing.T) {
js.Publish("bar", []byte("msg"))
sub, err := js.SubscribeSync("bar", nats.Durable("nak_dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on NextMsg: %v", err)
}
if err := msg.NakWithDelay(500 * time.Millisecond); err != nil {
t.Fatalf("Error on Nak: %v", err)
}
// We should not get redelivery before 500ms+
if _, err = sub.NextMsg(250 * time.Millisecond); err != nats.ErrTimeout {
t.Fatalf("Expected timeout, got %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on NextMsg: %v", err)
}
if err := msg.NakWithDelay(0); err != nil {
t.Fatalf("Error on Nak: %v", err)
}
msg, err = sub.NextMsg(250 * time.Millisecond)
if err != nil {
t.Fatalf("Expected timeout, got %v", err)
}
msg.Ack()
})

t.Run("BackOff redeliveries", func(t *testing.T) {
inbox := nats.NewInbox()
sub, err := nc.SubscribeSync(inbox)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer sub.Unsubscribe()
cc := nats.ConsumerConfig{
Durable: "backoff",
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: "bar",
DeliverSubject: inbox,
BackOff: []time.Duration{50 * time.Millisecond, 250 * time.Millisecond},
}
// First, try with a MaxDeliver that is < len(BackOff), which the
// server should reject.
cc.MaxDeliver = 1
_, err = js.AddConsumer("TEST", &cc)
if err == nil || !strings.Contains(err.Error(), "max deliver is required to be > length of backoff values") {
t.Fatalf("Expected backoff/max deliver error, got %v", err)
}
// Now put a valid value
cc.MaxDeliver = 4
ci, err := js.AddConsumer("TEST", &cc)
if err != nil {
t.Fatalf("Error on add consumer: %v", err)
}
if !reflect.DeepEqual(ci.Config.BackOff, cc.BackOff) {
t.Fatalf("Expected backoff to be %v, got %v", cc.BackOff, ci.Config.BackOff)
}
// Consume the first delivery
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on nextMsg: %v", err)
}
// We should get a redelivery at around 50ms
start := time.Now()
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on nextMsg: %v", err)
}
if dur := time.Since(start); dur < 25*time.Millisecond || dur > 100*time.Millisecond {
t.Fatalf("Expected to be redelivered at around 50ms, took %v", dur)
}
// Now it should be every 250ms or so
for i := 0; i < 2; i++ {
start = time.Now()
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on nextMsg for iter=%v: %v", i+1, err)
}
if dur := time.Since(start); dur < 200*time.Millisecond || dur > 300*time.Millisecond {
t.Fatalf("Expected to be redelivered at around 250ms, took %v", dur)
}
}
// At this point, we should have go reach MaxDeliver
_, err = sub.NextMsg(300 * time.Millisecond)
if err != nats.ErrTimeout {
t.Fatalf("Expected timeout, got %v", err)
}
})
}

func TestJetStreamPullSubscribe_AckPending(t *testing.T) {
Expand Down