Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix consumer reply subject escaping #4040

Merged
merged 1 commit into from Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions server/consumer.go
Expand Up @@ -823,8 +823,10 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Set up the ack subscription for this consumer. Will use wildcard for all acks.
// We will remember the template to generate replies with sequence numbers and use
// that to scanf them back in.
mn := mset.cfg.Name
pre := fmt.Sprintf(jsAckT, mn, o.name)
// Escape '%' in consumer and stream names, as `pre` is used as a template later
// in consumer.ackReply(), resulting in erroneous formatting of the ack subject.
mn := strings.ReplaceAll(mset.cfg.Name, "%", "%%")
pre := fmt.Sprintf(jsAckT, mn, strings.ReplaceAll(o.name, "%", "%%"))
o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre)
o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre)
o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name)
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
31 changes: 31 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -19784,3 +19784,34 @@ func TestJetStreamConsumerAckFloorWithExpired(t *testing.T) {
require_True(t, ci.NumPending == 0)
require_True(t, ci.NumRedelivered == 0)
}

func TestJetStreamConsumerWithFormattingSymbol(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "Test%123",
Subjects: []string{"foo"},
})
require_NoError(t, err)

for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, "foo", "OK")
}

_, err = js.AddConsumer("Test%123", &nats.ConsumerConfig{
Durable: "Test%123",
FilterSubject: "foo",
DeliverSubject: "bar",
})
require_NoError(t, err)

sub, err := js.SubscribeSync("foo", nats.Bind("Test%123", "Test%123"))
require_NoError(t, err)

_, err = sub.NextMsg(time.Second * 5)
require_NoError(t, err)
}