Skip to content

Commit

Permalink
Merge pull request #827 from nats-io/fix_js_subject_rewrite
Browse files Browse the repository at this point in the history
[FIXED] Received JetStream message may have wrong subject
  • Loading branch information
kozlovic committed Sep 21, 2021
2 parents 10daedc + 3a9b411 commit 68de9e1
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 17 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Expand Up @@ -19,9 +19,9 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a h1:CTZ20lNuVCOzZxXN2s1TJBUOdoHbsJXGNaChvdfr99c=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a/go.mod h1:TUAhMFYh1VISyY/D4WKJUMuGHg8yHtoUTuxkbiej1lc=
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 h1:TYI6K487xhbbpKjz4gIIVBWL6l2gFI3JHu/N0XySwRY=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6/go.mod h1:xZLDZ6cRUu9FCh7+mKXGEy16O66CdWVxttxNIiUuNCk=
github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
14 changes: 1 addition & 13 deletions nats.go
Expand Up @@ -577,9 +577,6 @@ type Subscription struct {
// Type of Subscription
typ SubscriptionType

// Whether subject in messages and Subject in subscription may differ
smd bool

// Async linked list
pHead *Msg
pTail *Msg
Expand Down Expand Up @@ -2695,10 +2692,7 @@ func (nc *Conn) processMsg(data []byte) {
}

// Copy them into string
subj := sub.Subject
if sub.smd {
subj = string(nc.ps.ma.subject)
}
subj := string(nc.ps.ma.subject)
reply := string(nc.ps.ma.reply)

// Doing message create outside of the sub's lock to reduce contention.
Expand Down Expand Up @@ -3752,11 +3746,6 @@ func badQueue(qname string) bool {
return strings.ContainsAny(qname, " \t\r\n")
}

// wildcard will check a subject name for wildcardness.
func wildcard(subj string) bool {
return strings.ContainsAny(subj, "*>")
}

// subscribe is the internal subscribe function that indicates interest in a subject.
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
Expand Down Expand Up @@ -3796,7 +3785,6 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
mcb: cb,
conn: nc,
jsi: js,
smd: wildcard(subj) || js != nil,
}
// Set pending limits.
if ch != nil {
Expand Down
1 change: 1 addition & 0 deletions norace_test.go
Expand Up @@ -793,6 +793,7 @@ func TestNoRaceJetStreamChanSubscribeStall(t *testing.T) {
sub, err := js.ChanSubscribe("STALL", msgs,
Durable("dlc"),
EnableFlowControl(),
IdleHeartbeat(5*time.Second),
MaxAckPending(batch-2),
)
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions test/js_test.go
Expand Up @@ -6434,3 +6434,56 @@ func TestJetStreamStreamAndConsumerDescription(t *testing.T) {
t.Fatalf("Invalid description: %q vs %q", consDesc, ci.Config.Description)
}
}

func TestJetStreamMsgSubjectRewrite(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

sub, err := nc.SubscribeSync(nats.NewInbox())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverSubject: sub.Subject,
DeliverPolicy: nats.DeliverAllPolicy,
}); err != nil {
t.Fatalf("Error adding consumer: %v", err)
}

if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Error on publish: %v", err)
}

msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Did not get message: %v", err)
}
if msg.Subject != "foo" {
t.Fatalf("Subject should be %q, got %q", "foo", msg.Subject)
}
if string(msg.Data) != "msg" {
t.Fatalf("Unexepcted data: %q", msg.Data)
}
}

0 comments on commit 68de9e1

Please sign in to comment.