Skip to content

Commit

Permalink
Passed nakDelay as an option to internal ackReply() function
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 1, 2022
1 parent f1d50b8 commit d34c12d
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions js.go
Expand Up @@ -804,8 +804,9 @@ func ExpectLastMsgId(id string) PubOpt {
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
ttl time.Duration
ctx context.Context
nakDelay time.Duration
}

// AckOpt are the options that can be passed when acknowledge a message.
Expand Down Expand Up @@ -880,6 +881,13 @@ func Context(ctx context.Context) ContextOpt {
return ContextOpt{ctx}
}

type nakDelay time.Duration

func (d nakDelay) configureAck(opts *ackOpts) error {
opts.nakDelay = time.Duration(d)
return nil
}

// Subscribe

// ConsumerConfig is the configuration of a JetStream consumer.
Expand Down Expand Up @@ -2600,7 +2608,7 @@ func (m *Msg) checkReply() (*js, *jsSub, error) {
// ackReply handles all acks. Will do the right thing for pull and sync mode.
// It ensures that an ack is only sent a single time, regardless of
// how many times it is being called to avoid duplicated acks.
func (m *Msg) ackReply(ackType []byte, nakDelay time.Duration, sync bool, opts ...AckOpt) error {
func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
var o ackOpts
for _, opt := range opts {
if err := opt.configureAck(&o); err != nil {
Expand Down Expand Up @@ -2641,8 +2649,8 @@ func (m *Msg) ackReply(ackType []byte, nakDelay time.Duration, sync bool, opts .

var body []byte
// This will be > 0 only when called from NakWithDelay()
if nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, nakDelay.Nanoseconds()))
if o.nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds()))
} else {
body = ackType
}
Expand All @@ -2669,40 +2677,43 @@ func (m *Msg) ackReply(ackType []byte, nakDelay time.Duration, sync bool, opts .
// Ack acknowledges a message. This tells the server that the message was
// successfully processed and it can move on to the next message.
func (m *Msg) Ack(opts ...AckOpt) error {
return m.ackReply(ackAck, 0, false, opts...)
return m.ackReply(ackAck, false, opts...)
}

// AckSync is the synchronous version of Ack. This indicates successful message
// processing.
func (m *Msg) AckSync(opts ...AckOpt) error {
return m.ackReply(ackAck, 0, true, opts...)
return m.ackReply(ackAck, true, opts...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message. You can configure the number of redeliveries by passing
// nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
func (m *Msg) Nak(opts ...AckOpt) error {
return m.ackReply(ackNak, 0, false, opts...)
return m.ackReply(ackNak, false, opts...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message after the give `delay` duration. You can configure the number
// of redeliveries by passing nats.MaxDeliver when you Subscribe.
// The default is infinite redeliveries.
func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error {
return m.ackReply(ackNak, delay, false, opts...)
if delay > 0 {
opts = append(opts, nakDelay(delay))
}
return m.ackReply(ackNak, false, opts...)
}

// Term tells the server to not redeliver this message, regardless of the value
// of nats.MaxDeliver.
func (m *Msg) Term(opts ...AckOpt) error {
return m.ackReply(ackTerm, 0, false, opts...)
return m.ackReply(ackTerm, false, opts...)
}

// InProgress tells the server that this message is being worked on. It resets
// the redelivery timer on the server.
func (m *Msg) InProgress(opts ...AckOpt) error {
return m.ackReply(ackProgress, 0, false, opts...)
return m.ackReply(ackProgress, false, opts...)
}

// MsgMetadata is the JetStream metadata associated with received messages.
Expand Down

0 comments on commit d34c12d

Please sign in to comment.