From 3a9b41157510db8b6ef912ecb9458e00de77b4b5 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 21 Sep 2021 12:43:49 -0600 Subject: [PATCH] [FIXED] Received JetStream message may have wrong subject The recent PR #824 tried to optimize reducing memory copy by using the non JS, non wildcard subscription's subject as the message subject. However, in case where a NATS subscription is used for the delivery subject of a AddConsumer call, then this breaks and the message would be received with the subscription subject instead of the subject that we get from MSG parsing. This PR basically reverts changes from #824 and add a subject to make sure that we catch such issue in the future. Signed-off-by: Ivan Kozlovic --- go_test.mod | 2 +- go_test.sum | 6 +++--- nats.go | 14 +------------ norace_test.go | 1 + test/js_test.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 59 insertions(+), 17 deletions(-) diff --git a/go_test.mod b/go_test.mod index 77fa67393..4d301c8a5 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a + github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 1e6ac88b0..05586f46f 100644 --- a/go_test.sum +++ b/go_test.sum @@ -19,9 +19,9 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a h1:CTZ20lNuVCOzZxXN2s1TJBUOdoHbsJXGNaChvdfr99c= -github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a/go.mod h1:TUAhMFYh1VISyY/D4WKJUMuGHg8yHtoUTuxkbiej1lc= -github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 h1:TYI6K487xhbbpKjz4gIIVBWL6l2gFI3JHu/N0XySwRY= +github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6/go.mod h1:xZLDZ6cRUu9FCh7+mKXGEy16O66CdWVxttxNIiUuNCk= +github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/nats.go b/nats.go index 196483736..ae224a566 100644 --- a/nats.go +++ b/nats.go @@ -577,9 +577,6 @@ type Subscription struct { // Type of Subscription typ SubscriptionType - // Whether subject in messages and Subject in subscription may differ - smd bool - // Async linked list pHead *Msg pTail *Msg @@ -2695,10 +2692,7 @@ func (nc *Conn) processMsg(data []byte) { } // Copy them into string - subj := sub.Subject - if sub.smd { - subj = string(nc.ps.ma.subject) - } + subj := string(nc.ps.ma.subject) reply := string(nc.ps.ma.reply) // Doing message create outside of the sub's lock to reduce contention. @@ -3752,11 +3746,6 @@ func badQueue(qname string) bool { return strings.ContainsAny(qname, " \t\r\n") } -// wildcard will check a subject name for wildcardness. -func wildcard(subj string) bool { - return strings.ContainsAny(subj, "*>") -} - // subscribe is the internal subscribe function that indicates interest in a subject. func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) { if nc == nil { @@ -3796,7 +3785,6 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, mcb: cb, conn: nc, jsi: js, - smd: wildcard(subj) || js != nil, } // Set pending limits. if ch != nil { diff --git a/norace_test.go b/norace_test.go index 5864a113c..7d16687be 100644 --- a/norace_test.go +++ b/norace_test.go @@ -793,6 +793,7 @@ func TestNoRaceJetStreamChanSubscribeStall(t *testing.T) { sub, err := js.ChanSubscribe("STALL", msgs, Durable("dlc"), EnableFlowControl(), + IdleHeartbeat(5*time.Second), MaxAckPending(batch-2), ) if err != nil { diff --git a/test/js_test.go b/test/js_test.go index b38535460..1c96656b9 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -6434,3 +6434,56 @@ func TestJetStreamStreamAndConsumerDescription(t *testing.T) { t.Fatalf("Invalid description: %q vs %q", consDesc, ci.Config.Description) } } + +func TestJetStreamMsgSubjectRewrite(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }); err != nil { + t.Fatalf("Error adding stream: %v", err) + } + + sub, err := nc.SubscribeSync(nats.NewInbox()) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + DeliverSubject: sub.Subject, + DeliverPolicy: nats.DeliverAllPolicy, + }); err != nil { + t.Fatalf("Error adding consumer: %v", err) + } + + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Did not get message: %v", err) + } + if msg.Subject != "foo" { + t.Fatalf("Subject should be %q, got %q", "foo", msg.Subject) + } + if string(msg.Data) != "msg" { + t.Fatalf("Unexepcted data: %q", msg.Data) + } +}