diff --git a/go_test.mod b/go_test.mod index 77fa67393..4d301c8a5 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a + github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 1e6ac88b0..05586f46f 100644 --- a/go_test.sum +++ b/go_test.sum @@ -19,9 +19,9 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a h1:CTZ20lNuVCOzZxXN2s1TJBUOdoHbsJXGNaChvdfr99c= -github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a/go.mod h1:TUAhMFYh1VISyY/D4WKJUMuGHg8yHtoUTuxkbiej1lc= -github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 h1:TYI6K487xhbbpKjz4gIIVBWL6l2gFI3JHu/N0XySwRY= +github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6/go.mod h1:xZLDZ6cRUu9FCh7+mKXGEy16O66CdWVxttxNIiUuNCk= +github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/nats.go b/nats.go index 196483736..ae224a566 100644 --- a/nats.go +++ b/nats.go @@ -577,9 +577,6 @@ type Subscription struct { // Type of Subscription typ SubscriptionType - // Whether subject in messages and Subject in subscription may differ - smd bool - // Async linked list pHead *Msg pTail *Msg @@ -2695,10 +2692,7 @@ func (nc *Conn) processMsg(data []byte) { } // Copy them into string - subj := sub.Subject - if sub.smd { - subj = string(nc.ps.ma.subject) - } + subj := string(nc.ps.ma.subject) reply := string(nc.ps.ma.reply) // Doing message create outside of the sub's lock to reduce contention. @@ -3752,11 +3746,6 @@ func badQueue(qname string) bool { return strings.ContainsAny(qname, " \t\r\n") } -// wildcard will check a subject name for wildcardness. -func wildcard(subj string) bool { - return strings.ContainsAny(subj, "*>") -} - // subscribe is the internal subscribe function that indicates interest in a subject. func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) { if nc == nil { @@ -3796,7 +3785,6 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, mcb: cb, conn: nc, jsi: js, - smd: wildcard(subj) || js != nil, } // Set pending limits. if ch != nil { diff --git a/norace_test.go b/norace_test.go index 5864a113c..7d16687be 100644 --- a/norace_test.go +++ b/norace_test.go @@ -793,6 +793,7 @@ func TestNoRaceJetStreamChanSubscribeStall(t *testing.T) { sub, err := js.ChanSubscribe("STALL", msgs, Durable("dlc"), EnableFlowControl(), + IdleHeartbeat(5*time.Second), MaxAckPending(batch-2), ) if err != nil { diff --git a/test/js_test.go b/test/js_test.go index b38535460..1c96656b9 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -6434,3 +6434,56 @@ func TestJetStreamStreamAndConsumerDescription(t *testing.T) { t.Fatalf("Invalid description: %q vs %q", consDesc, ci.Config.Description) } } + +func TestJetStreamMsgSubjectRewrite(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }); err != nil { + t.Fatalf("Error adding stream: %v", err) + } + + sub, err := nc.SubscribeSync(nats.NewInbox()) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + DeliverSubject: sub.Subject, + DeliverPolicy: nats.DeliverAllPolicy, + }); err != nil { + t.Fatalf("Error adding consumer: %v", err) + } + + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Did not get message: %v", err) + } + if msg.Subject != "foo" { + t.Fatalf("Subject should be %q, got %q", "foo", msg.Subject) + } + if string(msg.Data) != "msg" { + t.Fatalf("Unexepcted data: %q", msg.Data) + } +}