Skip to content

Commit

Permalink
[IMPROVED] Make sure to never process next message requests inline. (#…
Browse files Browse the repository at this point in the history
…4022)

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 4, 2023
2 parents c9350a2 + ebe4f89 commit 34922df
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 15 deletions.
90 changes: 77 additions & 13 deletions server/consumer.go
Expand Up @@ -254,6 +254,7 @@ type consumer struct {
ackReplyT string
ackSubj string
nextMsgSubj string
nextMsgReqs *ipQueue[*nextMsgReq]
maxp int
pblimit int
maxpb int
Expand Down Expand Up @@ -750,6 +751,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Create our request waiting queue.
if o.isPullMode() {
o.waiting = newWaitQueue(config.MaxWaiting)
// Create our internal queue for next msg requests.
o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, mset.cfg.Name))
}

// Check if we have filtered subject that is a wildcard.
Expand Down Expand Up @@ -1099,6 +1102,7 @@ func (o *consumer) setLeader(isLeader bool) {
if node != nil && o.pch == nil {
o.pch = make(chan struct{}, 1)
}
pullMode := o.isPullMode()
o.mu.Unlock()

// Snapshot initial info.
Expand All @@ -1110,6 +1114,12 @@ func (o *consumer) setLeader(isLeader bool) {
// Now start up Go routine to process acks.
go o.processInboundAcks(qch)

if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)

}

// If we are R>1 spin up our proposal loop.
if node != nil {
// Determine if we can send pending requests info to the group.
Expand Down Expand Up @@ -1145,6 +1155,7 @@ func (o *consumer) setLeader(isLeader bool) {
if !o.isDurable() {
stopAndClearTimer(&o.dtmr)
}
o.nextMsgReqs.drain()
} else if o.srv.gateway.enabled {
stopAndClearTimer(&o.gwdtmr)
}
Expand Down Expand Up @@ -2833,28 +2844,54 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
return nil
}

// Next message request.
type nextMsgReq struct {
reply string
msg []byte
}

var nextMsgReqPool sync.Pool

func newNextMsgReq(reply string, msg []byte) *nextMsgReq {
var nmr *nextMsgReq
m := nextMsgReqPool.Get()
if m != nil {
nmr = m.(*nextMsgReq)
} else {
nmr = &nextMsgReq{}
}
// When getting something from a pool it is critical that all fields are
// initialized. Doing this way guarantees that if someone adds a field to
// the structure, the compiler will fail the build if this line is not updated.
(*nmr) = nextMsgReq{reply, msg}
return nmr
}

func (nmr *nextMsgReq) returnToPool() {
if nmr == nil {
return
}
nmr.reply, nmr.msg = _EMPTY_, nil
nextMsgReqPool.Put(nmr)
}

// processNextMsgReq will process a request for the next message available. A nil message payload means deliver
// a single message. If the payload is a formal request or a number parseable with Atoi(), then we will send a
// batch of messages without requiring another request to this endpoint, or an ACK.
func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, reply string, msg []byte) {
if reply == _EMPTY_ {
return
}
_, msg = c.msgParts(msg)

inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}
if inlineOk {
o.processNextMsgRequest(reply, msg)
} else {
go o.processNextMsgRequest(reply, copyBytes(msg))
// Short circuit error here.
if o.nextMsgReqs == nil {
hdr := []byte("NATS/1.0 409 Consumer is push based\r\n\r\n")
o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}

_, msg = c.msgParts(msg)
o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg)))
}

func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
Expand Down Expand Up @@ -3224,6 +3261,30 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
}
}

// Process inbound next message requests.
func (o *consumer) processInboundNextMsgReqs(qch chan struct{}) {
// Grab the server lock to watch for server quit.
o.mu.RLock()
s := o.srv
o.mu.RUnlock()

for {
select {
case <-o.nextMsgReqs.ch:
reqs := o.nextMsgReqs.pop()
for _, req := range reqs {
o.processNextMsgRequest(req.reply, req.msg)
req.returnToPool()
}
o.nextMsgReqs.recycle(&reqs)
case <-qch:
return
case <-s.quitCh:
return
}
}
}

// Suppress auto cleanup on ack activity of any kind.
func (o *consumer) suppressDeletion() {
o.mu.Lock()
Expand Down Expand Up @@ -4310,6 +4371,9 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
n := o.node
qgroup := o.cfg.DeliverGroup
o.ackMsgs.unregister()
if o.nextMsgReqs != nil {
o.nextMsgReqs.unregister()
}

// For cleaning up the node assignment.
var ca *consumerAssignment
Expand Down
2 changes: 1 addition & 1 deletion server/norace_test.go
Expand Up @@ -6559,7 +6559,7 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
setHighStartSequence := false
simulateMaxRedeliveries := false
maxBadPubTimes := uint32(20)
badPubThresh := 5 * time.Second
badPubThresh := 500 * time.Millisecond
testTime := 5 * time.Minute // make sure to do --timeout=65m

t.Logf("Starting Test: Total Test Time %v", testTime)
Expand Down
2 changes: 1 addition & 1 deletion server/stream.go
Expand Up @@ -4223,7 +4223,7 @@ func newJSPubMsg(dsubj, subj, reply string, hdr, msg []byte, o *consumer, seq ui
} else {
m = new(jsPubMsg)
}
// When getting something from a pool it is criticical that all fields are
// When getting something from a pool it is critical that all fields are
// initialized. Doing this way guarantees that if someone adds a field to
// the structure, the compiler will fail the build if this line is not updated.
(*m) = jsPubMsg{dsubj, reply, StoreMsg{subj, hdr, msg, buf, seq, 0}, o}
Expand Down

0 comments on commit 34922df

Please sign in to comment.