Skip to content

Commit

Permalink
Merge pull request #987 from nats-io/ack-none-policy
Browse files Browse the repository at this point in the history
Fix sending ack when AckNonePolicy is set
  • Loading branch information
wallyqs committed Jun 2, 2022
2 parents b795e86 + ffc3c71 commit ce0cf21
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
4 changes: 2 additions & 2 deletions js.go
Expand Up @@ -1520,8 +1520,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
cancel: cancel,
}

// Check if we are manual ack.
if cb != nil && !o.mack {
// Auto acknowledge unless manual ack is set or policy is set to AckNonePolicy
if cb != nil && !o.mack && o.cfg.AckPolicy != AckNonePolicy {
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}
Expand Down
72 changes: 72 additions & 0 deletions test/js_test.go
Expand Up @@ -3030,6 +3030,78 @@ func TestJetStreamSubscribe_AckDup(t *testing.T) {
}
}

func TestJetStreamSubscribe_AutoAck(t *testing.T) {
tests := []struct {
name string
opt nats.SubOpt
expectedAck bool
}{
{
name: "with ack explicit",
opt: nats.AckExplicit(),
expectedAck: true,
},
{
name: "with ack all",
opt: nats.AckAll(),
expectedAck: true,
},
{
name: "with ack none",
opt: nats.AckNone(),
expectedAck: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js.Publish("foo", []byte("hello"))

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

acks := make(chan struct{}, 2)
nc.Subscribe("$JS.ACK.TEST.>", func(msg *nats.Msg) {
acks <- struct{}{}
})
nc.Flush()

_, err = js.Subscribe("foo", func(m *nats.Msg) {
}, test.opt)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
<-ctx.Done()

if test.expectedAck {
if len(acks) != 1 {
t.Fatalf("Expected to receive a single ack, got: %v", len(acks))
}
return
}
if len(acks) != 0 {
t.Fatalf("Expected no acks, got: %v", len(acks))
}
})
}
}

func TestJetStreamSubscribe_AckDupInProgress(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit ce0cf21

Please sign in to comment.