diff --git a/js.go b/js.go index 4d4adbdac..626fcabfd 100644 --- a/js.go +++ b/js.go @@ -94,6 +94,14 @@ const ( // Scale for threshold of missed HBs or lack of activity. hbcThresh = 2 + + // For ChanSubscription, we can't update sub.delivered as we do for other + // type of subscriptions, since the channel is user provided. + // With flow control in play, we will check for flow control on incoming + // messages (as opposed to when they are delivered), but also from a go + // routine. Without this, the subscription would possibly stall until + // a new message or heartbeat/fc are received. + chanSubFCCheckInterval = 250 * time.Millisecond ) // Types of control messages, so far heartbeat and flow control @@ -897,6 +905,8 @@ type jsSub struct { cmeta string fcr string fcd uint64 + fciseq uint64 + csfct *time.Timer } // Deletes the JS Consumer. @@ -1409,15 +1419,27 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } if !isPullMode { // We can't reuse the channel, so if one was passed, we need to create a new one. - if ch != nil { + if isSync { ch = make(chan *Msg, cap(ch)) + } else if ch != nil { + // User provided (ChanSubscription), simply try to drain it. + for done := false; !done; { + select { + case <-ch: + default: + done = true + } + } } jsi.deliver = deliver + jsi.hbi = info.Config.Heartbeat // Recreate the subscription here. sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) if err != nil { return nil, err } + hasFC = info.Config.FlowControl + hasHeartbeats = info.Config.Heartbeat > 0 } } else { if cinfo.Error.Code == 404 { @@ -1442,10 +1464,44 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if hasHeartbeats { sub.scheduleHeartbeatCheck() } + // For ChanSubscriptions, if we know that there is flow control, we will + // start a go routine that evaluates the number of delivered messages + // and process flow control. + if sub.Type() == ChanSubscription && hasFC { + sub.chanSubcheckForFlowControlResponse() + } return sub, nil } +// This long-lived routine is used per ChanSubscription to check +// on the number of delivered messages and check for flow control response. +func (sub *Subscription) chanSubcheckForFlowControlResponse() { + sub.mu.Lock() + // We don't use defer since if we need to send an RC reply, we need + // to do it outside the sub's lock. So doing explicit unlock... + if sub.closed { + sub.mu.Unlock() + return + } + var fcReply string + var nc *Conn + + jsi := sub.jsi + if jsi.csfct == nil { + jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse) + } else { + fcReply = sub.checkForFlowControlResponse() + nc = sub.conn + // Do the reset here under the lock, it's ok... + jsi.csfct.Reset(chanSubFCCheckInterval) + } + sub.mu.Unlock() + // This call will return an error (which we don't care here) + // if nc is nil or fcReply is empty. + nc.Publish(fcReply, nil) +} + // ErrConsumerSequenceMismatch represents an error from a consumer // that received a Heartbeat including sequence different to the // one expected from the view of the client. @@ -1488,8 +1544,11 @@ func isJSControlMessage(msg *Msg) (bool, int) { // Keeps track of the incoming message's reply subject so that the consumer's // state (deliver sequence, etc..) can be checked against heartbeats. +// We will also bump the incoming data message sequence that is used in FC cases. // Runs under the subscription lock func (sub *Subscription) trackSequences(reply string) { + // For flow control, keep track of incoming message sequence. + sub.jsi.fciseq++ sub.jsi.cmeta = reply } @@ -1626,13 +1685,25 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { }() } +// For jetstream subscriptions, returns the number of delivered messages. +// For ChanSubscription, this value is computed based on the known number +// of messages added to the channel minus the current size of that channel. +// Lock held on entry +func (sub *Subscription) getJSDelivered() uint64 { + if sub.typ == ChanSubscription { + return sub.jsi.fciseq - uint64(len(sub.mch)) + } + return sub.delivered +} + // checkForFlowControlResponse will check to see if we should send a flow control response // based on the subscription current delivered index and the target. // Runs under subscription lock func (sub *Subscription) checkForFlowControlResponse() string { // Caller has verified that there is a sub.jsi and fc jsi := sub.jsi - if jsi.fcd == sub.delivered { + jsi.active = true + if sub.getJSDelivered() >= jsi.fcd { fcr := jsi.fcr jsi.fcr, jsi.fcd = _EMPTY_, 0 return fcr @@ -1642,9 +1713,8 @@ func (sub *Subscription) checkForFlowControlResponse() string { // Record an inbound flow control message. // Runs under subscription lock -func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) { - jsi := sub.jsi - jsi.fcr, jsi.fcd = reply, dfuture +func (sub *Subscription) scheduleFlowControlResponse(reply string) { + sub.jsi.fcr, sub.jsi.fcd = reply, sub.jsi.fciseq } // Checks for activity from our consumer. diff --git a/js_test.go b/js_test.go index 90c03fe64..9685ce154 100644 --- a/js_test.go +++ b/js_test.go @@ -105,7 +105,10 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } else { chunk = msg[i : i+chunkSize] } - js.PublishAsync("a", chunk) + msg := NewMsg("a") + msg.Data = chunk + msg.Header.Set("data", "true") + js.PublishMsgAsync(msg) } js.PublishAsync("a", nil) // eof @@ -171,7 +174,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { t.Fatalf("Objects do not match") } case <-time.After(5 * time.Second): - t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs) + t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1) } } @@ -202,7 +205,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { rmsg = append(rmsg, m.Data...) } if !done { - t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs) + t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1) } if rsum := sha256.Sum256(rmsg); rsum != sum { t.Fatalf("Objects do not match") @@ -215,7 +218,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { // Now introduce some loss. singleLoss := func(m *Msg) *Msg { - if rand.Intn(100) <= 10 { + if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ { nc.removeMsgFilter("a") return nil } @@ -223,10 +226,11 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } nc.addMsgFilter("a", singleLoss) testConsumer() + nc.addMsgFilter("a", singleLoss) testSyncConsumer() multiLoss := func(m *Msg) *Msg { - if rand.Intn(100) <= 10 { + if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ { return nil } return m @@ -246,11 +250,12 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } nc.addMsgFilter("a", firstOnly) testConsumer() + nc.addMsgFilter("a", firstOnly) testSyncConsumer() lastOnly := func(m *Msg) *Msg { if meta, err := m.Metadata(); err == nil { - if meta.Sequence.Stream >= si.State.LastSeq { + if meta.Sequence.Stream >= si.State.LastSeq-1 { nc.removeMsgFilter("a") return nil } @@ -259,6 +264,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } nc.addMsgFilter("a", lastOnly) testConsumer() + nc.addMsgFilter("a", lastOnly) testSyncConsumer() } diff --git a/nats.go b/nats.go index ed8506eb2..95a5577df 100644 --- a/nats.go +++ b/nats.go @@ -2600,7 +2600,6 @@ func (nc *Conn) waitForMsgs(s *Subscription) { delivered = s.delivered if s.jsi != nil { fcReply = s.checkForFlowControlResponse() - s.jsi.active = true } } s.mu.Unlock() @@ -2768,6 +2767,7 @@ func (nc *Conn) processMsg(data []byte) { // Skip processing if this is a control message. if !ctrlMsg { + var chanSubCheckFC bool // Subscription internal stats (applicable only for non ChanSubscription's) if sub.typ != ChanSubscription { sub.pMsgs++ @@ -2784,6 +2784,8 @@ func (nc *Conn) processMsg(data []byte) { (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) { goto slowConsumer } + } else if jsi != nil { + chanSubCheckFC = true } // We have two modes of delivery. One is the channel, used by channel @@ -2811,15 +2813,26 @@ func (nc *Conn) processMsg(data []byte) { // Store the ACK metadata from the message to // compare later on with the received heartbeat. sub.trackSequences(m.Reply) + if chanSubCheckFC { + // For ChanSubscription, since we can't call this when a message + // is "delivered" (since user is pull from their own channel), + // we have a go routine that does this check, however, we do it + // also here to make it much more responsive. The go routine is + // really to avoid stalling when there is no new messages coming. + fcReply = sub.checkForFlowControlResponse() + } } } else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ { // This is a flow control message. - // If we have no pending, go ahead and send in place. - if sub.pMsgs <= 0 { + // We will schedule the send of the FC reply once we have delivered the + // DATA message that was received before this flow control message, which + // has sequence `jsi.fciseq`. However, it is possible that this message + // has already been delivered, in that case, we need to send the FC reply now. + if sub.getJSDelivered() >= jsi.fciseq { fcReply = m.Reply } else { // Schedule a reply after the previous message is delivered. - sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) + sub.scheduleFlowControlResponse(m.Reply) } } @@ -3851,9 +3864,15 @@ func (nc *Conn) removeSub(s *Subscription) { s.mch = nil // If JS subscription then stop HB timer. - if jsi := s.jsi; jsi != nil && jsi.hbc != nil { - jsi.hbc.Stop() - jsi.hbc = nil + if jsi := s.jsi; jsi != nil { + if jsi.hbc != nil { + jsi.hbc.Stop() + jsi.hbc = nil + } + if jsi.csfct != nil { + jsi.csfct.Stop() + jsi.csfct = nil + } } // Mark as invalid @@ -4192,7 +4211,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { delivered := s.delivered if s.jsi != nil { fcReply = s.checkForFlowControlResponse() - s.jsi.active = true } if s.typ == SyncSubscription {