Skip to content

Commit

Permalink
[FIXED] Received JetStream message may have wrong subject
Browse files Browse the repository at this point in the history
The recent PR #824 tried to optimize reducing memory copy
by using the non JS, non wildcard subscription's subject
as the message subject.

However, in case where a NATS subscription is used for
the delivery subject of a AddConsumer call, then this
breaks and the message would be received with the subscription
subject instead of the subject that we get from MSG parsing.

This PR basically reverts changes from #824 and add a subject
to make sure that we catch such issue in the future.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Sep 21, 2021
1 parent 10daedc commit 3a9b411
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 17 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Expand Up @@ -19,9 +19,9 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a h1:CTZ20lNuVCOzZxXN2s1TJBUOdoHbsJXGNaChvdfr99c=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a/go.mod h1:TUAhMFYh1VISyY/D4WKJUMuGHg8yHtoUTuxkbiej1lc=
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 h1:TYI6K487xhbbpKjz4gIIVBWL6l2gFI3JHu/N0XySwRY=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6/go.mod h1:xZLDZ6cRUu9FCh7+mKXGEy16O66CdWVxttxNIiUuNCk=
github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
14 changes: 1 addition & 13 deletions nats.go
Expand Up @@ -577,9 +577,6 @@ type Subscription struct {
// Type of Subscription
typ SubscriptionType

// Whether subject in messages and Subject in subscription may differ
smd bool

// Async linked list
pHead *Msg
pTail *Msg
Expand Down Expand Up @@ -2695,10 +2692,7 @@ func (nc *Conn) processMsg(data []byte) {
}

// Copy them into string
subj := sub.Subject
if sub.smd {
subj = string(nc.ps.ma.subject)
}
subj := string(nc.ps.ma.subject)
reply := string(nc.ps.ma.reply)

// Doing message create outside of the sub's lock to reduce contention.
Expand Down Expand Up @@ -3752,11 +3746,6 @@ func badQueue(qname string) bool {
return strings.ContainsAny(qname, " \t\r\n")
}

// wildcard will check a subject name for wildcardness.
func wildcard(subj string) bool {
return strings.ContainsAny(subj, "*>")
}

// subscribe is the internal subscribe function that indicates interest in a subject.
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
Expand Down Expand Up @@ -3796,7 +3785,6 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
mcb: cb,
conn: nc,
jsi: js,
smd: wildcard(subj) || js != nil,
}
// Set pending limits.
if ch != nil {
Expand Down
1 change: 1 addition & 0 deletions norace_test.go
Expand Up @@ -793,6 +793,7 @@ func TestNoRaceJetStreamChanSubscribeStall(t *testing.T) {
sub, err := js.ChanSubscribe("STALL", msgs,
Durable("dlc"),
EnableFlowControl(),
IdleHeartbeat(5*time.Second),
MaxAckPending(batch-2),
)
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions test/js_test.go
Expand Up @@ -6434,3 +6434,56 @@ func TestJetStreamStreamAndConsumerDescription(t *testing.T) {
t.Fatalf("Invalid description: %q vs %q", consDesc, ci.Config.Description)
}
}

func TestJetStreamMsgSubjectRewrite(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

sub, err := nc.SubscribeSync(nats.NewInbox())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverSubject: sub.Subject,
DeliverPolicy: nats.DeliverAllPolicy,
}); err != nil {
t.Fatalf("Error adding consumer: %v", err)
}

if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Error on publish: %v", err)
}

msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Did not get message: %v", err)
}
if msg.Subject != "foo" {
t.Fatalf("Subject should be %q, got %q", "foo", msg.Subject)
}
if string(msg.Data) != "msg" {
t.Fatalf("Unexepcted data: %q", msg.Data)
}
}

0 comments on commit 3a9b411

Please sign in to comment.