Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Make sure to never process next message requests inline. #4022

Merged
merged 3 commits into from Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 77 additions & 13 deletions server/consumer.go
Expand Up @@ -254,6 +254,7 @@ type consumer struct {
ackReplyT string
ackSubj string
nextMsgSubj string
nextMsgReqs *ipQueue[*nextMsgReq]
maxp int
pblimit int
maxpb int
Expand Down Expand Up @@ -750,6 +751,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Create our request waiting queue.
if o.isPullMode() {
o.waiting = newWaitQueue(config.MaxWaiting)
// Create our internal queue for next msg requests.
o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, mset.cfg.Name))
}

// Check if we have filtered subject that is a wildcard.
Expand Down Expand Up @@ -1099,6 +1102,7 @@ func (o *consumer) setLeader(isLeader bool) {
if node != nil && o.pch == nil {
o.pch = make(chan struct{}, 1)
}
pullMode := o.isPullMode()
o.mu.Unlock()

// Snapshot initial info.
Expand All @@ -1110,6 +1114,12 @@ func (o *consumer) setLeader(isLeader bool) {
// Now start up Go routine to process acks.
go o.processInboundAcks(qch)

if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)

}

// If we are R>1 spin up our proposal loop.
if node != nil {
// Determine if we can send pending requests info to the group.
Expand Down Expand Up @@ -1145,6 +1155,7 @@ func (o *consumer) setLeader(isLeader bool) {
if !o.isDurable() {
stopAndClearTimer(&o.dtmr)
}
o.nextMsgReqs.drain()
} else if o.srv.gateway.enabled {
stopAndClearTimer(&o.gwdtmr)
}
Expand Down Expand Up @@ -2833,28 +2844,54 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
return nil
}

// Next message request.
type nextMsgReq struct {
reply string
msg []byte
}

var nextMsgReqPool sync.Pool

func newNextMsgReq(reply string, msg []byte) *nextMsgReq {
var nmr *nextMsgReq
m := nextMsgReqPool.Get()
if m != nil {
nmr = m.(*nextMsgReq)
} else {
nmr = &nextMsgReq{}
}
// When getting something from a pool it is critical that all fields are
// initialized. Doing this way guarantees that if someone adds a field to
// the structure, the compiler will fail the build if this line is not updated.
(*nmr) = nextMsgReq{reply, msg}
return nmr
}

func (nmr *nextMsgReq) returnToPool() {
if nmr == nil {
return
}
nmr.reply, nmr.msg = _EMPTY_, nil
nextMsgReqPool.Put(nmr)
}

// processNextMsgReq will process a request for the next message available. A nil message payload means deliver
// a single message. If the payload is a formal request or a number parseable with Atoi(), then we will send a
// batch of messages without requiring another request to this endpoint, or an ACK.
func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, reply string, msg []byte) {
if reply == _EMPTY_ {
return
}
_, msg = c.msgParts(msg)

inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}
if inlineOk {
o.processNextMsgRequest(reply, msg)
} else {
go o.processNextMsgRequest(reply, copyBytes(msg))
// Short circuit error here.
if o.nextMsgReqs == nil {
hdr := []byte("NATS/1.0 409 Consumer is push based\r\n\r\n")
o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}

_, msg = c.msgParts(msg)
o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg)))
}

func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
Expand Down Expand Up @@ -3224,6 +3261,30 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
}
}

// Process inbound next message requests.
func (o *consumer) processInboundNextMsgReqs(qch chan struct{}) {
// Grab the server lock to watch for server quit.
o.mu.RLock()
s := o.srv
o.mu.RUnlock()

for {
select {
case <-o.nextMsgReqs.ch:
reqs := o.nextMsgReqs.pop()
for _, req := range reqs {
o.processNextMsgRequest(req.reply, req.msg)
req.returnToPool()
}
o.nextMsgReqs.recycle(&reqs)
case <-qch:
return
case <-s.quitCh:
return
}
}
}

// Suppress auto cleanup on ack activity of any kind.
func (o *consumer) suppressDeletion() {
o.mu.Lock()
Expand Down Expand Up @@ -4310,6 +4371,9 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
n := o.node
qgroup := o.cfg.DeliverGroup
o.ackMsgs.unregister()
if o.nextMsgReqs != nil {
o.nextMsgReqs.unregister()
}

// For cleaning up the node assignment.
var ca *consumerAssignment
Expand Down
2 changes: 1 addition & 1 deletion server/norace_test.go
Expand Up @@ -6559,7 +6559,7 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
setHighStartSequence := false
simulateMaxRedeliveries := false
maxBadPubTimes := uint32(20)
badPubThresh := 5 * time.Second
badPubThresh := 500 * time.Millisecond
testTime := 5 * time.Minute // make sure to do --timeout=65m

t.Logf("Starting Test: Total Test Time %v", testTime)
Expand Down
2 changes: 1 addition & 1 deletion server/stream.go
Expand Up @@ -4223,7 +4223,7 @@ func newJSPubMsg(dsubj, subj, reply string, hdr, msg []byte, o *consumer, seq ui
} else {
m = new(jsPubMsg)
}
// When getting something from a pool it is criticical that all fields are
// When getting something from a pool it is critical that all fields are
// initialized. Doing this way guarantees that if someone adds a field to
// the structure, the compiler will fail the build if this line is not updated.
(*m) = jsPubMsg{dsubj, reply, StoreMsg{subj, hdr, msg, buf, seq, 0}, o}
Expand Down