From ffc3c71d21565012ede71420c8a6df6a21e74f02 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 2 Jun 2022 15:30:49 +0200 Subject: [PATCH] Fix sending ack when AckNonePolicy is set --- js.go | 4 +-- test/js_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/js.go b/js.go index dafa53ab5..773748aad 100644 --- a/js.go +++ b/js.go @@ -1520,8 +1520,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, cancel: cancel, } - // Check if we are manual ack. - if cb != nil && !o.mack { + // Auto acknowledge unless manual ack is set or policy is set to AckNonePolicy + if cb != nil && !o.mack && o.cfg.AckPolicy != AckNonePolicy { ocb := cb cb = func(m *Msg) { ocb(m); m.Ack() } } diff --git a/test/js_test.go b/test/js_test.go index 1fb2a3074..0ea848b0f 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -3030,6 +3030,78 @@ func TestJetStreamSubscribe_AckDup(t *testing.T) { } } +func TestJetStreamSubscribe_AutoAck(t *testing.T) { + tests := []struct { + name string + opt nats.SubOpt + expectedAck bool + }{ + { + name: "with ack explicit", + opt: nats.AckExplicit(), + expectedAck: true, + }, + { + name: "with ack all", + opt: nats.AckAll(), + expectedAck: true, + }, + { + name: "with ack none", + opt: nats.AckNone(), + expectedAck: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + // Create the stream using our client API. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js.Publish("foo", []byte("hello")) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + acks := make(chan struct{}, 2) + nc.Subscribe("$JS.ACK.TEST.>", func(msg *nats.Msg) { + acks <- struct{}{} + }) + nc.Flush() + + _, err = js.Subscribe("foo", func(m *nats.Msg) { + }, test.opt) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + <-ctx.Done() + + if test.expectedAck { + if len(acks) != 1 { + t.Fatalf("Expected to receive a single ack, got: %v", len(acks)) + } + return + } + if len(acks) != 0 { + t.Fatalf("Expected no acks, got: %v", len(acks)) + } + }) + } +} + func TestJetStreamSubscribe_AckDupInProgress(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s)