Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] JetStream flow control may stall in some conditions #837

Merged
merged 3 commits into from Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 75 additions & 5 deletions js.go
Expand Up @@ -94,6 +94,14 @@ const (

// Scale for threshold of missed HBs or lack of activity.
hbcThresh = 2

// For ChanSubscription, we can't update sub.delivered as we do for other
// type of subscriptions, since the channel is user provided.
// With flow control in play, we will check for flow control on incoming
// messages (as opposed to when they are delivered), but also from a go
// routine. Without this, the subscription would possibly stall until
// a new message or heartbeat/fc are received.
chanSubFCCheckInterval = 250 * time.Millisecond
)

// Types of control messages, so far heartbeat and flow control
Expand Down Expand Up @@ -897,6 +905,8 @@ type jsSub struct {
cmeta string
fcr string
fcd uint64
fciseq uint64
csfct *time.Timer
}

// Deletes the JS Consumer.
Expand Down Expand Up @@ -1409,15 +1419,27 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if ch != nil {
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
}
}
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat
// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.Code == 404 {
Expand All @@ -1442,10 +1464,44 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if hasHeartbeats {
sub.scheduleHeartbeatCheck()
}
// For ChanSubscriptions, if we know that there is flow control, we will
// start a go routine that evaluates the number of delivered messages
// and process flow control.
if sub.Type() == ChanSubscription && hasFC {
sub.chanSubcheckForFlowControlResponse()
}

return sub, nil
}

// This long-lived routine is used per ChanSubscription to check
// on the number of delivered messages and check for flow control response.
func (sub *Subscription) chanSubcheckForFlowControlResponse() {
sub.mu.Lock()
// We don't use defer since if we need to send an RC reply, we need
// to do it outside the sub's lock. So doing explicit unlock...
if sub.closed {
sub.mu.Unlock()
return
}
var fcReply string
var nc *Conn

jsi := sub.jsi
if jsi.csfct == nil {
jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse)
} else {
fcReply = sub.checkForFlowControlResponse()
nc = sub.conn
// Do the reset here under the lock, it's ok...
jsi.csfct.Reset(chanSubFCCheckInterval)
}
sub.mu.Unlock()
// This call will return an error (which we don't care here)
// if nc is nil or fcReply is empty.
nc.Publish(fcReply, nil)
}

// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
Expand Down Expand Up @@ -1488,8 +1544,11 @@ func isJSControlMessage(msg *Msg) (bool, int) {

// Keeps track of the incoming message's reply subject so that the consumer's
// state (deliver sequence, etc..) can be checked against heartbeats.
// We will also bump the incoming data message sequence that is used in FC cases.
// Runs under the subscription lock
func (sub *Subscription) trackSequences(reply string) {
// For flow control, keep track of incoming message sequence.
sub.jsi.fciseq++
sub.jsi.cmeta = reply
}

Expand Down Expand Up @@ -1626,13 +1685,25 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
}()
}

// For jetstream subscriptions, returns the number of delivered messages.
// For ChanSubscription, this value is computed based on the known number
// of messages added to the channel minus the current size of that channel.
// Lock held on entry
func (sub *Subscription) getJSDelivered() uint64 {
if sub.typ == ChanSubscription {
return sub.jsi.fciseq - uint64(len(sub.mch))
}
return sub.delivered
}

// checkForFlowControlResponse will check to see if we should send a flow control response
// based on the subscription current delivered index and the target.
// Runs under subscription lock
func (sub *Subscription) checkForFlowControlResponse() string {
// Caller has verified that there is a sub.jsi and fc
jsi := sub.jsi
if jsi.fcd == sub.delivered {
jsi.active = true
if sub.getJSDelivered() >= jsi.fcd {
fcr := jsi.fcr
jsi.fcr, jsi.fcd = _EMPTY_, 0
return fcr
Expand All @@ -1642,9 +1713,8 @@ func (sub *Subscription) checkForFlowControlResponse() string {

// Record an inbound flow control message.
// Runs under subscription lock
func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) {
jsi := sub.jsi
jsi.fcr, jsi.fcd = reply, dfuture
func (sub *Subscription) scheduleFlowControlResponse(reply string) {
sub.jsi.fcr, sub.jsi.fcd = reply, sub.jsi.fciseq
}

// Checks for activity from our consumer.
Expand Down
18 changes: 12 additions & 6 deletions js_test.go
Expand Up @@ -105,7 +105,10 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
} else {
chunk = msg[i : i+chunkSize]
}
js.PublishAsync("a", chunk)
msg := NewMsg("a")
msg.Data = chunk
msg.Header.Set("data", "true")
js.PublishMsgAsync(msg)
}
js.PublishAsync("a", nil) // eof

Expand Down Expand Up @@ -171,7 +174,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
t.Fatalf("Objects do not match")
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs)
t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1)
}
}

Expand Down Expand Up @@ -202,7 +205,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
rmsg = append(rmsg, m.Data...)
}
if !done {
t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs)
t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1)
}
if rsum := sha256.Sum256(rmsg); rsum != sum {
t.Fatalf("Objects do not match")
Expand All @@ -215,18 +218,19 @@ func TestJetStreamOrderedConsumer(t *testing.T) {

// Now introduce some loss.
singleLoss := func(m *Msg) *Msg {
if rand.Intn(100) <= 10 {
if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ {
nc.removeMsgFilter("a")
return nil
}
return m
}
nc.addMsgFilter("a", singleLoss)
testConsumer()
nc.addMsgFilter("a", singleLoss)
testSyncConsumer()

multiLoss := func(m *Msg) *Msg {
if rand.Intn(100) <= 10 {
if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ {
return nil
}
return m
Expand All @@ -246,11 +250,12 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
}
nc.addMsgFilter("a", firstOnly)
testConsumer()
nc.addMsgFilter("a", firstOnly)
testSyncConsumer()

lastOnly := func(m *Msg) *Msg {
if meta, err := m.Metadata(); err == nil {
if meta.Sequence.Stream >= si.State.LastSeq {
if meta.Sequence.Stream >= si.State.LastSeq-1 {
nc.removeMsgFilter("a")
return nil
}
Expand All @@ -259,6 +264,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
}
nc.addMsgFilter("a", lastOnly)
testConsumer()
nc.addMsgFilter("a", lastOnly)
testSyncConsumer()
}

Expand Down
34 changes: 26 additions & 8 deletions nats.go
Expand Up @@ -2600,7 +2600,6 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
delivered = s.delivered
if s.jsi != nil {
fcReply = s.checkForFlowControlResponse()
s.jsi.active = true
}
}
s.mu.Unlock()
Expand Down Expand Up @@ -2768,6 +2767,7 @@ func (nc *Conn) processMsg(data []byte) {

// Skip processing if this is a control message.
if !ctrlMsg {
var chanSubCheckFC bool
// Subscription internal stats (applicable only for non ChanSubscription's)
if sub.typ != ChanSubscription {
sub.pMsgs++
Expand All @@ -2784,6 +2784,8 @@ func (nc *Conn) processMsg(data []byte) {
(sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
goto slowConsumer
}
} else if jsi != nil {
chanSubCheckFC = true
}

// We have two modes of delivery. One is the channel, used by channel
Expand Down Expand Up @@ -2811,15 +2813,26 @@ func (nc *Conn) processMsg(data []byte) {
// Store the ACK metadata from the message to
// compare later on with the received heartbeat.
sub.trackSequences(m.Reply)
if chanSubCheckFC {
// For ChanSubscription, since we can't call this when a message
// is "delivered" (since user is pull from their own channel),
// we have a go routine that does this check, however, we do it
// also here to make it much more responsive. The go routine is
// really to avoid stalling when there is no new messages coming.
fcReply = sub.checkForFlowControlResponse()
}
}
} else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ {
// This is a flow control message.
// If we have no pending, go ahead and send in place.
if sub.pMsgs <= 0 {
// We will schedule the send of the FC reply once we have delivered the
// DATA message that was received before this flow control message, which
// has sequence `jsi.fciseq`. However, it is possible that this message
// has already been delivered, in that case, we need to send the FC reply now.
if sub.getJSDelivered() >= jsi.fciseq {
fcReply = m.Reply
} else {
// Schedule a reply after the previous message is delivered.
sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply)
sub.scheduleFlowControlResponse(m.Reply)
}
}

Expand Down Expand Up @@ -3851,9 +3864,15 @@ func (nc *Conn) removeSub(s *Subscription) {
s.mch = nil

// If JS subscription then stop HB timer.
if jsi := s.jsi; jsi != nil && jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
if jsi := s.jsi; jsi != nil {
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
}
if jsi.csfct != nil {
jsi.csfct.Stop()
jsi.csfct = nil
}
}

// Mark as invalid
Expand Down Expand Up @@ -4192,7 +4211,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
delivered := s.delivered
if s.jsi != nil {
fcReply = s.checkForFlowControlResponse()
s.jsi.active = true
}

if s.typ == SyncSubscription {
Expand Down