Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Received JetStream message may have wrong subject #827

Merged
merged 1 commit into from Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Expand Up @@ -19,9 +19,9 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a h1:CTZ20lNuVCOzZxXN2s1TJBUOdoHbsJXGNaChvdfr99c=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210902224824-3aa8e63b290a/go.mod h1:TUAhMFYh1VISyY/D4WKJUMuGHg8yHtoUTuxkbiej1lc=
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 h1:TYI6K487xhbbpKjz4gIIVBWL6l2gFI3JHu/N0XySwRY=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6/go.mod h1:xZLDZ6cRUu9FCh7+mKXGEy16O66CdWVxttxNIiUuNCk=
github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
14 changes: 1 addition & 13 deletions nats.go
Expand Up @@ -577,9 +577,6 @@ type Subscription struct {
// Type of Subscription
typ SubscriptionType

// Whether subject in messages and Subject in subscription may differ
smd bool

// Async linked list
pHead *Msg
pTail *Msg
Expand Down Expand Up @@ -2695,10 +2692,7 @@ func (nc *Conn) processMsg(data []byte) {
}

// Copy them into string
subj := sub.Subject
if sub.smd {
subj = string(nc.ps.ma.subject)
}
subj := string(nc.ps.ma.subject)
reply := string(nc.ps.ma.reply)

// Doing message create outside of the sub's lock to reduce contention.
Expand Down Expand Up @@ -3752,11 +3746,6 @@ func badQueue(qname string) bool {
return strings.ContainsAny(qname, " \t\r\n")
}

// wildcard will check a subject name for wildcardness.
func wildcard(subj string) bool {
return strings.ContainsAny(subj, "*>")
}

// subscribe is the internal subscribe function that indicates interest in a subject.
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
Expand Down Expand Up @@ -3796,7 +3785,6 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
mcb: cb,
conn: nc,
jsi: js,
smd: wildcard(subj) || js != nil,
}
// Set pending limits.
if ch != nil {
Expand Down
1 change: 1 addition & 0 deletions norace_test.go
Expand Up @@ -793,6 +793,7 @@ func TestNoRaceJetStreamChanSubscribeStall(t *testing.T) {
sub, err := js.ChanSubscribe("STALL", msgs,
Durable("dlc"),
EnableFlowControl(),
IdleHeartbeat(5*time.Second),
MaxAckPending(batch-2),
)
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions test/js_test.go
Expand Up @@ -6434,3 +6434,56 @@ func TestJetStreamStreamAndConsumerDescription(t *testing.T) {
t.Fatalf("Invalid description: %q vs %q", consDesc, ci.Config.Description)
}
}

func TestJetStreamMsgSubjectRewrite(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

sub, err := nc.SubscribeSync(nats.NewInbox())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverSubject: sub.Subject,
DeliverPolicy: nats.DeliverAllPolicy,
}); err != nil {
t.Fatalf("Error adding consumer: %v", err)
}

if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Error on publish: %v", err)
}

msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Did not get message: %v", err)
}
if msg.Subject != "foo" {
t.Fatalf("Subject should be %q, got %q", "foo", msg.Subject)
}
if string(msg.Data) != "msg" {
t.Fatalf("Unexepcted data: %q", msg.Data)
}
}