Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] JetStream: Fail Ack() (and the likes) for AckNone consumer #1032

Merged
merged 1 commit into from Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 24 additions & 21 deletions js.go
Expand Up @@ -1102,6 +1102,7 @@ type jsSub struct {
deliver string
pull bool
dc bool // Delete JS consumer
ackNone bool

// This is ConsumerInfo's Pending+Consumer.Delivered that we get from the
// add consumer response. Note that some versions of the server gather the
Expand Down Expand Up @@ -1576,6 +1577,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
nms: nms,
psubj: subj,
cancel: cancel,
ackNone: o.cfg.AckPolicy == AckNonePolicy,
}

// Auto acknowledge unless manual ack is set or policy is set to AckNonePolicy
Expand Down Expand Up @@ -2785,24 +2787,14 @@ func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byt
return resp, nil
}

func (m *Msg) checkReply() (*js, *jsSub, error) {
func (m *Msg) checkReply() error {
if m == nil || m.Sub == nil {
return nil, nil, ErrMsgNotBound
return ErrMsgNotBound
}
if m.Reply == _EMPTY_ {
return nil, nil, ErrMsgNoReply
return ErrMsgNoReply
}
sub := m.Sub
if sub.jsi == nil {
// Not using a JS context.
return nil, nil, nil
}
sub.mu.Lock()
js := sub.jsi.js
jsi := sub.jsi
sub.mu.Unlock()

return js, jsi, nil
return nil
}

// ackReply handles all acks. Will do the right thing for pull and sync mode.
Expand All @@ -2816,19 +2808,29 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
}
}

js, _, err := m.checkReply()
if err != nil {
if err := m.checkReply(); err != nil {
return err
}

var ackNone bool
var js *js

sub := m.Sub
sub.mu.Lock()
nc := sub.conn
if jsi := sub.jsi; jsi != nil {
js = jsi.js
ackNone = jsi.ackNone
}
sub.mu.Unlock()

// Skip if already acked.
if atomic.LoadUint32(&m.ackd) == 1 {
return ErrMsgAlreadyAckd
}

m.Sub.mu.Lock()
nc := m.Sub.conn
m.Sub.mu.Unlock()
if ackNone {
return ErrCantAckIfConsumerAckNone
}

usesCtx := o.ctx != nil
usesWait := o.ttl > 0
Expand All @@ -2848,6 +2850,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
}

var body []byte
var err error
// This will be > 0 only when called from NakWithDelay()
if o.nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds()))
Expand Down Expand Up @@ -2996,7 +2999,7 @@ func getMetadataFields(subject string) ([]string, error) {
// Metadata retrieves the metadata from a JetStream message. This method will
// return an error for non-JetStream Msgs.
func (m *Msg) Metadata() (*MsgMetadata, error) {
if _, _, err := m.checkReply(); err != nil {
if err := m.checkReply(); err != nil {
return nil, err
}

Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -164,6 +164,7 @@ var (
ErrConsumerNotActive = errors.New("nats: consumer not active")
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrCantAckIfConsumerAckNone = errors.New("nats: cannot acknowledge a message for a consumer with AckNone policy")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
Expand Down
3 changes: 3 additions & 0 deletions test/conn_test.go
Expand Up @@ -1592,6 +1592,9 @@ func TestCustomFlusherTimeout(t *testing.T) {
// Notify when connection lost
nats.ClosedHandler(func(_ *nats.Conn) {
doneCh <- struct{}{}
}),
// Use error handler to silence the stderr output
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {
}))
if err != nil {
t.Fatalf("Expected to be able to connect, got: %s", err)
Expand Down
37 changes: 34 additions & 3 deletions test/js_test.go
Expand Up @@ -1534,8 +1534,8 @@ func TestAccountInfo(t *testing.T) {
accounts: {
A {
users: [{ user: "foo" }]
jetstream: {
max_mem: 64MB,
jetstream: {
max_mem: 64MB,
max_file: 32MB,
max_streams: 10,
max_consumers: 20,
Expand All @@ -1545,7 +1545,7 @@ func TestAccountInfo(t *testing.T) {
max_stream_bytes: true
}
}
}
}
`,
expected: &nats.AccountInfo{
Tier: nats.Tier{
Expand Down Expand Up @@ -7454,3 +7454,34 @@ func TestJetStreamConsumerReplicasOption(t *testing.T) {
}
})
}

func TestJetStreamMsgAckShouldErrForConsumerAckNone(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{
Name: "ACKNONE",
Storage: nats.MemoryStorage,
Subjects: []string{"foo"},
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}
if _, err := js.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}

sub, err := js.SubscribeSync("foo", nats.OrderedConsumer())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
if err := msg.Ack(); err != nats.ErrCantAckIfConsumerAckNone {
t.Fatalf("Expected error indicating that sub is AckNone, got %v", err)
}
}