Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue where JS would change subscription subject #793

Merged
merged 1 commit into from Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 2 additions & 8 deletions js.go
Expand Up @@ -818,6 +818,7 @@ type jsSub struct {
// For pull subscribers, this is the next message subject to send requests to.
nms string

psubj string // the subject that was passed by user to the subscribe calls
consumer string
stream string
deliver string
Expand Down Expand Up @@ -1114,13 +1115,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
dseq: 1,
pull: isPullMode,
nms: nms,
psubj: subj,
}

sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
// Since JetStream sends on different subject, make sure this reflects the user's intentions.
sub.mu.Lock()
sub.Subject = subj
sub.mu.Unlock()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1211,10 +1209,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
// Since JetStream sends on different subject, make sure this reflects the user's intentions.
sub.mu.Lock()
sub.Subject = subj
sub.mu.Unlock()
}
} else {
if cinfo.Error.Code == 404 {
Expand Down
86 changes: 86 additions & 0 deletions js_test.go
Expand Up @@ -409,3 +409,89 @@ func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) {
}
}
}

func TestJetStreamSubscribeReconnect(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

rch := make(chan struct{}, 1)
nc, err := Connect(s.ClientURL(),
ReconnectWait(50*time.Millisecond),
ReconnectHandler(func(_ *Conn) {
select {
case rch <- struct{}{}:
default:
}
}))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream(MaxWait(250 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create the stream using our client API.
_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sub, err := js.SubscribeSync("foo", Durable("bar"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

sendAndReceive := func(msgContent string) {
t.Helper()
var ok bool
var err error
for i := 0; i < 5; i++ {
if _, err = js.Publish("foo", []byte(msgContent)); err != nil {
time.Sleep(250 * time.Millisecond)
continue
}
ok = true
break
}
if !ok {
t.Fatalf("Error on publish: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatal("Did not get message")
}
if string(msg.Data) != msgContent {
t.Fatalf("Unexpected content: %q", msg.Data)
}
if err := msg.AckSync(); err != nil {
t.Fatalf("Error on ack: %v", err)
}
}

sendAndReceive("msg1")

// Cause a disconnect...
nc.mu.Lock()
nc.conn.Close()
nc.mu.Unlock()

// Wait for reconnect
select {
case <-rch:
case <-time.After(time.Second):
t.Fatal("Did not reconnect")
}

// Make sure we can send and receive the msg
sendAndReceive("msg2")
}
15 changes: 12 additions & 3 deletions nats.go
Expand Up @@ -1270,7 +1270,15 @@ func defaultErrHandler(nc *Conn, sub *Subscription, err error) {
}
var errStr string
if sub != nil {
errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, sub.Subject)
var subject string
sub.mu.Lock()
if sub.jsi != nil {
subject = sub.jsi.psubj
} else {
subject = sub.Subject
}
sub.mu.Unlock()
errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, subject)
} else {
errStr = fmt.Sprintf("%s on connection [%d]\n", err.Error(), cid)
}
Expand Down Expand Up @@ -4451,12 +4459,13 @@ func (nc *Conn) resendSubscriptions() {
continue
}
}
subj, queue, sid := s.Subject, s.Queue, s.sid
s.mu.Unlock()

nc.bw.writeDirect(fmt.Sprintf(subProto, s.Subject, s.Queue, s.sid))
nc.bw.writeDirect(fmt.Sprintf(subProto, subj, queue, sid))
if adjustedMax > 0 {
maxStr := strconv.Itoa(int(adjustedMax))
nc.bw.writeDirect(fmt.Sprintf(unsubProto, s.sid, maxStr))
nc.bw.writeDirect(fmt.Sprintf(unsubProto, sid, maxStr))
}
}
}
Expand Down