Skip to content

Commit

Permalink
Remove NakDelay() option and instead add Msg.NakWithDelay()
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 1, 2022
1 parent fd2a476 commit f1d50b8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
37 changes: 19 additions & 18 deletions js.go
Expand Up @@ -804,9 +804,8 @@ func ExpectLastMsgId(id string) PubOpt {
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
nakDelay time.Duration
ttl time.Duration
ctx context.Context
}

// AckOpt are the options that can be passed when acknowledge a message.
Expand Down Expand Up @@ -845,13 +844,6 @@ func (ttl AckWait) configureAck(opts *ackOpts) error {
return nil
}

type NakDelay time.Duration

func (delay NakDelay) configureAck(opts *ackOpts) error {
opts.nakDelay = time.Duration(delay)
return nil
}

// ContextOpt is an option used to set a context.Context.
type ContextOpt struct {
context.Context
Expand Down Expand Up @@ -2608,7 +2600,7 @@ func (m *Msg) checkReply() (*js, *jsSub, error) {
// ackReply handles all acks. Will do the right thing for pull and sync mode.
// It ensures that an ack is only sent a single time, regardless of
// how many times it is being called to avoid duplicated acks.
func (m *Msg) ackReply(ackType []byte, nak, sync bool, opts ...AckOpt) error {
func (m *Msg) ackReply(ackType []byte, nakDelay time.Duration, sync bool, opts ...AckOpt) error {

This comment has been minimized.

Copy link
@derekcollison

derekcollison Feb 1, 2022

Member

Can we put this into opts somehow vs explicit arg?

var o ackOpts
for _, opt := range opts {
if err := opt.configureAck(&o); err != nil {
Expand Down Expand Up @@ -2648,8 +2640,9 @@ func (m *Msg) ackReply(ackType []byte, nak, sync bool, opts ...AckOpt) error {
}

var body []byte
if nak && o.nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds()))
// This will be > 0 only when called from NakWithDelay()
if nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, nakDelay.Nanoseconds()))
} else {
body = ackType
}
Expand All @@ -2676,32 +2669,40 @@ func (m *Msg) ackReply(ackType []byte, nak, sync bool, opts ...AckOpt) error {
// Ack acknowledges a message. This tells the server that the message was
// successfully processed and it can move on to the next message.
func (m *Msg) Ack(opts ...AckOpt) error {
return m.ackReply(ackAck, false, false, opts...)
return m.ackReply(ackAck, 0, false, opts...)
}

// AckSync is the synchronous version of Ack. This indicates successful message
// processing.
func (m *Msg) AckSync(opts ...AckOpt) error {
return m.ackReply(ackAck, false, true, opts...)
return m.ackReply(ackAck, 0, true, opts...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message. You can configure the number of redeliveries by passing
// nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
func (m *Msg) Nak(opts ...AckOpt) error {
return m.ackReply(ackNak, true, false, opts...)
return m.ackReply(ackNak, 0, false, opts...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message after the give `delay` duration. You can configure the number
// of redeliveries by passing nats.MaxDeliver when you Subscribe.
// The default is infinite redeliveries.
func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error {
return m.ackReply(ackNak, delay, false, opts...)
}

// Term tells the server to not redeliver this message, regardless of the value
// of nats.MaxDeliver.
func (m *Msg) Term(opts ...AckOpt) error {
return m.ackReply(ackTerm, false, false, opts...)
return m.ackReply(ackTerm, 0, false, opts...)
}

// InProgress tells the server that this message is being worked on. It resets
// the redelivery timer on the server.
func (m *Msg) InProgress(opts ...AckOpt) error {
return m.ackReply(ackProgress, false, false, opts...)
return m.ackReply(ackProgress, 0, false, opts...)
}

// MsgMetadata is the JetStream metadata associated with received messages.
Expand Down
4 changes: 2 additions & 2 deletions test/js_test.go
Expand Up @@ -2752,7 +2752,7 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
if err != nil {
t.Fatalf("Error on NextMsg: %v", err)
}
if err := msg.Nak(nats.NakDelay(500 * time.Millisecond)); err != nil {
if err := msg.NakWithDelay(500 * time.Millisecond); err != nil {
t.Fatalf("Error on Nak: %v", err)
}
// We should not get redelivery before 500ms+
Expand All @@ -2763,7 +2763,7 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
if err != nil {
t.Fatalf("Error on NextMsg: %v", err)
}
if err := msg.Nak(nats.NakDelay(0)); err != nil {
if err := msg.NakWithDelay(0); err != nil {
t.Fatalf("Error on Nak: %v", err)
}
msg, err = sub.NextMsg(250 * time.Millisecond)
Expand Down

0 comments on commit f1d50b8

Please sign in to comment.