From d34c12d8a5817d32d5c6ad043fe302bf8ddc1730 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 31 Jan 2022 18:48:48 -0700 Subject: [PATCH] Passed nakDelay as an option to internal ackReply() function Signed-off-by: Ivan Kozlovic --- js.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/js.go b/js.go index e3a863aa9..b329a1b36 100644 --- a/js.go +++ b/js.go @@ -804,8 +804,9 @@ func ExpectLastMsgId(id string) PubOpt { } type ackOpts struct { - ttl time.Duration - ctx context.Context + ttl time.Duration + ctx context.Context + nakDelay time.Duration } // AckOpt are the options that can be passed when acknowledge a message. @@ -880,6 +881,13 @@ func Context(ctx context.Context) ContextOpt { return ContextOpt{ctx} } +type nakDelay time.Duration + +func (d nakDelay) configureAck(opts *ackOpts) error { + opts.nakDelay = time.Duration(d) + return nil +} + // Subscribe // ConsumerConfig is the configuration of a JetStream consumer. @@ -2600,7 +2608,7 @@ func (m *Msg) checkReply() (*js, *jsSub, error) { // ackReply handles all acks. Will do the right thing for pull and sync mode. // It ensures that an ack is only sent a single time, regardless of // how many times it is being called to avoid duplicated acks. -func (m *Msg) ackReply(ackType []byte, nakDelay time.Duration, sync bool, opts ...AckOpt) error { +func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error { var o ackOpts for _, opt := range opts { if err := opt.configureAck(&o); err != nil { @@ -2641,8 +2649,8 @@ func (m *Msg) ackReply(ackType []byte, nakDelay time.Duration, sync bool, opts . var body []byte // This will be > 0 only when called from NakWithDelay() - if nakDelay > 0 { - body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, nakDelay.Nanoseconds())) + if o.nakDelay > 0 { + body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds())) } else { body = ackType } @@ -2669,20 +2677,20 @@ func (m *Msg) ackReply(ackType []byte, nakDelay time.Duration, sync bool, opts . // Ack acknowledges a message. This tells the server that the message was // successfully processed and it can move on to the next message. func (m *Msg) Ack(opts ...AckOpt) error { - return m.ackReply(ackAck, 0, false, opts...) + return m.ackReply(ackAck, false, opts...) } // AckSync is the synchronous version of Ack. This indicates successful message // processing. func (m *Msg) AckSync(opts ...AckOpt) error { - return m.ackReply(ackAck, 0, true, opts...) + return m.ackReply(ackAck, true, opts...) } // Nak negatively acknowledges a message. This tells the server to redeliver // the message. You can configure the number of redeliveries by passing // nats.MaxDeliver when you Subscribe. The default is infinite redeliveries. func (m *Msg) Nak(opts ...AckOpt) error { - return m.ackReply(ackNak, 0, false, opts...) + return m.ackReply(ackNak, false, opts...) } // Nak negatively acknowledges a message. This tells the server to redeliver @@ -2690,19 +2698,22 @@ func (m *Msg) Nak(opts ...AckOpt) error { // of redeliveries by passing nats.MaxDeliver when you Subscribe. // The default is infinite redeliveries. func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error { - return m.ackReply(ackNak, delay, false, opts...) + if delay > 0 { + opts = append(opts, nakDelay(delay)) + } + return m.ackReply(ackNak, false, opts...) } // Term tells the server to not redeliver this message, regardless of the value // of nats.MaxDeliver. func (m *Msg) Term(opts ...AckOpt) error { - return m.ackReply(ackTerm, 0, false, opts...) + return m.ackReply(ackTerm, false, opts...) } // InProgress tells the server that this message is being worked on. It resets // the redelivery timer on the server. func (m *Msg) InProgress(opts ...AckOpt) error { - return m.ackReply(ackProgress, 0, false, opts...) + return m.ackReply(ackProgress, false, opts...) } // MsgMetadata is the JetStream metadata associated with received messages.