Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] JetStream flow control may stall in some conditions #837

Merged
merged 3 commits into from Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 71 additions & 5 deletions js.go
Expand Up @@ -94,6 +94,14 @@ const (

// Scale for threshold of missed HBs or lack of activity.
hbcThresh = 2

// For ChanSubscription, we can't update sub.delivered as we do for other
// type of subscriptions, since the channel is user provided.
// With flow control in play, we will check for flow control on incoming
// messages (as opposed to when they are delivered), but also from a go
// routine. Without this, the subscription would possibly stall until
// a new message or heartbeat/fc are received.
chanSubFCCheckInterval = 250 * time.Millisecond
)

// Types of control messages, so far heartbeat and flow control
Expand Down Expand Up @@ -897,6 +905,7 @@ type jsSub struct {
cmeta string
fcr string
fcd uint64
fciseq uint64
}

// Deletes the JS Consumer.
Expand Down Expand Up @@ -1409,15 +1418,27 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if ch != nil {
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
}
}
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat
// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.Code == 404 {
Expand All @@ -1442,10 +1463,41 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if hasHeartbeats {
sub.scheduleHeartbeatCheck()
}
// For ChanSubscriptions, if we know that there is flow control, we will
// start a go routine that evaluates the number of delivered messages
// and process flow control.
if sub.Type() == ChanSubscription && hasFC {
go sub.chanSubcheckForFlowControlResponse()
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
}

return sub, nil
}

// This long-lived routine is used per ChanSubscription to check
// on the number of delivered messages and check for flow control response.
func (sub *Subscription) chanSubcheckForFlowControlResponse() {
sub.mu.Lock()
nc := sub.conn
sub.mu.Unlock()
if nc == nil {
return
}
t := time.NewTicker(chanSubFCCheckInterval)
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
for range t.C {
sub.mu.Lock()
if sub.closed {
sub.mu.Unlock()
t.Stop()
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
return
}
fcReply := sub.checkForFlowControlResponse()
sub.mu.Unlock()
if fcReply != _EMPTY_ {
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
nc.Publish(fcReply, nil)
}
}
}

// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
Expand Down Expand Up @@ -1488,8 +1540,11 @@ func isJSControlMessage(msg *Msg) (bool, int) {

// Keeps track of the incoming message's reply subject so that the consumer's
// state (deliver sequence, etc..) can be checked against heartbeats.
// We will also bump the incoming data message sequence that is used in FC cases.
// Runs under the subscription lock
func (sub *Subscription) trackSequences(reply string) {
// For flow control, keep track of incoming message sequence.
sub.jsi.fciseq++
sub.jsi.cmeta = reply
}

Expand Down Expand Up @@ -1626,13 +1681,25 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
}()
}

// For jetstream subscriptions, returns the number of delivered messages.
// For ChanSubscription, this value is computed based on the known number
// of messages added to the channel minus the current size of that channel.
// Lock held on entry
func (sub *Subscription) getJSDelivered() uint64 {
if sub.typ == ChanSubscription {
return sub.jsi.fciseq - uint64(len(sub.mch))
}
return sub.delivered
}

// checkForFlowControlResponse will check to see if we should send a flow control response
// based on the subscription current delivered index and the target.
// Runs under subscription lock
func (sub *Subscription) checkForFlowControlResponse() string {
// Caller has verified that there is a sub.jsi and fc
jsi := sub.jsi
if jsi.fcd == sub.delivered {
jsi.active = true
if sub.getJSDelivered() >= jsi.fcd {
fcr := jsi.fcr
jsi.fcr, jsi.fcd = _EMPTY_, 0
return fcr
Expand All @@ -1642,9 +1709,8 @@ func (sub *Subscription) checkForFlowControlResponse() string {

// Record an inbound flow control message.
// Runs under subscription lock
func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) {
jsi := sub.jsi
jsi.fcr, jsi.fcd = reply, dfuture
func (sub *Subscription) scheduleFlowControlResponse(reply string) {
sub.jsi.fcr, sub.jsi.fcd = reply, sub.jsi.fciseq
}

// Checks for activity from our consumer.
Expand Down
22 changes: 17 additions & 5 deletions nats.go
Expand Up @@ -2600,7 +2600,6 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
delivered = s.delivered
if s.jsi != nil {
fcReply = s.checkForFlowControlResponse()
s.jsi.active = true
}
}
s.mu.Unlock()
Expand Down Expand Up @@ -2768,6 +2767,7 @@ func (nc *Conn) processMsg(data []byte) {

// Skip processing if this is a control message.
if !ctrlMsg {
var chanSubCheckFC bool
// Subscription internal stats (applicable only for non ChanSubscription's)
if sub.typ != ChanSubscription {
sub.pMsgs++
Expand All @@ -2784,6 +2784,8 @@ func (nc *Conn) processMsg(data []byte) {
(sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
goto slowConsumer
}
} else if jsi != nil {
chanSubCheckFC = true
}

// We have two modes of delivery. One is the channel, used by channel
Expand Down Expand Up @@ -2811,15 +2813,26 @@ func (nc *Conn) processMsg(data []byte) {
// Store the ACK metadata from the message to
// compare later on with the received heartbeat.
sub.trackSequences(m.Reply)
if chanSubCheckFC {
// For ChanSubscription, since we can't call this when a message
// is "delivered" (since user is pull from their own channel),
// we have a go routine that does this check, however, we do it
// also here to make it much more responsive. The go routine is
// really to avoid stalling when there is no new messages coming.
fcReply = sub.checkForFlowControlResponse()
}
}
} else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ {
// This is a flow control message.
// If we have no pending, go ahead and send in place.
if sub.pMsgs <= 0 {
// We will schedule the send of the FC reply once we have delivered the
// DATA message that was received before this flow control message, which
// has sequence `jsi.fciseq`. However, it is possible that this message
// has already been delivered, in that case, we need to send the FC reply now.
if sub.getJSDelivered() >= jsi.fciseq {
fcReply = m.Reply
} else {
// Schedule a reply after the previous message is delivered.
sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply)
sub.scheduleFlowControlResponse(m.Reply)
}
}

Expand Down Expand Up @@ -4192,7 +4205,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
delivered := s.delivered
if s.jsi != nil {
fcReply = s.checkForFlowControlResponse()
s.jsi.active = true
}

if s.typ == SyncSubscription {
Expand Down