Skip to content

Commit

Permalink
Merge pull request #1032 from nats-io/fix_1027
Browse files Browse the repository at this point in the history
[FIXED] JetStream: Fail Ack() (and the likes) for AckNone consumer
  • Loading branch information
kozlovic committed Aug 4, 2022
2 parents cc189da + c040b4f commit d4eeb20
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 24 deletions.
45 changes: 24 additions & 21 deletions js.go
Expand Up @@ -1102,6 +1102,7 @@ type jsSub struct {
deliver string
pull bool
dc bool // Delete JS consumer
ackNone bool

// This is ConsumerInfo's Pending+Consumer.Delivered that we get from the
// add consumer response. Note that some versions of the server gather the
Expand Down Expand Up @@ -1576,6 +1577,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
nms: nms,
psubj: subj,
cancel: cancel,
ackNone: o.cfg.AckPolicy == AckNonePolicy,
}

// Auto acknowledge unless manual ack is set or policy is set to AckNonePolicy
Expand Down Expand Up @@ -2785,24 +2787,14 @@ func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byt
return resp, nil
}

func (m *Msg) checkReply() (*js, *jsSub, error) {
func (m *Msg) checkReply() error {
if m == nil || m.Sub == nil {
return nil, nil, ErrMsgNotBound
return ErrMsgNotBound
}
if m.Reply == _EMPTY_ {
return nil, nil, ErrMsgNoReply
return ErrMsgNoReply
}
sub := m.Sub
if sub.jsi == nil {
// Not using a JS context.
return nil, nil, nil
}
sub.mu.Lock()
js := sub.jsi.js
jsi := sub.jsi
sub.mu.Unlock()

return js, jsi, nil
return nil
}

// ackReply handles all acks. Will do the right thing for pull and sync mode.
Expand All @@ -2816,19 +2808,29 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
}
}

js, _, err := m.checkReply()
if err != nil {
if err := m.checkReply(); err != nil {
return err
}

var ackNone bool
var js *js

sub := m.Sub
sub.mu.Lock()
nc := sub.conn
if jsi := sub.jsi; jsi != nil {
js = jsi.js
ackNone = jsi.ackNone
}
sub.mu.Unlock()

// Skip if already acked.
if atomic.LoadUint32(&m.ackd) == 1 {
return ErrMsgAlreadyAckd
}

m.Sub.mu.Lock()
nc := m.Sub.conn
m.Sub.mu.Unlock()
if ackNone {
return ErrCantAckIfConsumerAckNone
}

usesCtx := o.ctx != nil
usesWait := o.ttl > 0
Expand All @@ -2848,6 +2850,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
}

var body []byte
var err error
// This will be > 0 only when called from NakWithDelay()
if o.nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds()))
Expand Down Expand Up @@ -2996,7 +2999,7 @@ func getMetadataFields(subject string) ([]string, error) {
// Metadata retrieves the metadata from a JetStream message. This method will
// return an error for non-JetStream Msgs.
func (m *Msg) Metadata() (*MsgMetadata, error) {
if _, _, err := m.checkReply(); err != nil {
if err := m.checkReply(); err != nil {
return nil, err
}

Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -164,6 +164,7 @@ var (
ErrConsumerNotActive = errors.New("nats: consumer not active")
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrCantAckIfConsumerAckNone = errors.New("nats: cannot acknowledge a message for a consumer with AckNone policy")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
Expand Down
3 changes: 3 additions & 0 deletions test/conn_test.go
Expand Up @@ -1592,6 +1592,9 @@ func TestCustomFlusherTimeout(t *testing.T) {
// Notify when connection lost
nats.ClosedHandler(func(_ *nats.Conn) {
doneCh <- struct{}{}
}),
// Use error handler to silence the stderr output
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {
}))
if err != nil {
t.Fatalf("Expected to be able to connect, got: %s", err)
Expand Down
37 changes: 34 additions & 3 deletions test/js_test.go
Expand Up @@ -1534,8 +1534,8 @@ func TestAccountInfo(t *testing.T) {
accounts: {
A {
users: [{ user: "foo" }]
jetstream: {
max_mem: 64MB,
jetstream: {
max_mem: 64MB,
max_file: 32MB,
max_streams: 10,
max_consumers: 20,
Expand All @@ -1545,7 +1545,7 @@ func TestAccountInfo(t *testing.T) {
max_stream_bytes: true
}
}
}
}
`,
expected: &nats.AccountInfo{
Tier: nats.Tier{
Expand Down Expand Up @@ -7454,3 +7454,34 @@ func TestJetStreamConsumerReplicasOption(t *testing.T) {
}
})
}

func TestJetStreamMsgAckShouldErrForConsumerAckNone(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{
Name: "ACKNONE",
Storage: nats.MemoryStorage,
Subjects: []string{"foo"},
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}
if _, err := js.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}

sub, err := js.SubscribeSync("foo", nats.OrderedConsumer())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
if err := msg.Ack(); err != nats.ErrCantAckIfConsumerAckNone {
t.Fatalf("Expected error indicating that sub is AckNone, got %v", err)
}
}

0 comments on commit d4eeb20

Please sign in to comment.