Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Optimize non-inline direct gets to not use simple go routines. #4028

Merged
merged 1 commit into from Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions server/client.go
Expand Up @@ -110,9 +110,6 @@ const (
// For stalling fast producers
stallClientMinDuration = 100 * time.Millisecond
stallClientMaxDuration = time.Second

// Threshold for not knowingly doing a potential blocking operation when internal and on a route or gateway or leafnode.
noBlockThresh = 500 * time.Millisecond
)

var readLoopReportThreshold = readLoopReport
Expand Down
61 changes: 37 additions & 24 deletions server/stream.go
Expand Up @@ -193,6 +193,7 @@ type stream struct {
pubAck []byte
outq *jsOutQ
msgs *ipQueue[*inMsg]
gets *ipQueue[*directGetReq]
store StreamStore
ackq *ipQueue[uint64]
lseq uint64
Expand Down Expand Up @@ -460,6 +461,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
stype: cfg.Storage,
consumers: make(map[string]*consumer),
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
Expand Down Expand Up @@ -3528,12 +3530,25 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
mset.queueInbound(mset.msgs, subj, rply, hdr, msg)
}

var dgPool = sync.Pool{
New: func() interface{} {
return &directGetReq{}
},
}

// For when we need to not inline the request.
type directGetReq struct {
// Copy of this is correct for this.
req JSApiMsgGetRequest
reply string
}

// processDirectGetRequest handles direct get request for stream messages.
func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
if len(reply) == 0 {
return
}
_, msg := c.msgParts(rmsg)
if len(msg) == 0 {
hdr := []byte("NATS/1.0 408 Empty Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
Expand Down Expand Up @@ -3566,26 +3581,20 @@ func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Accou

inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}

if inlineOk {
mset.getDirectRequest(&req, reply)
dg := dgPool.Get().(*directGetReq)
dg.req, dg.reply = req, reply
mset.gets.push(dg)
} else {
go mset.getDirectRequest(&req, reply)
mset.getDirectRequest(&req, reply)
}
}

// This is for direct get by last subject which is part of the subject itself.
func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
if len(reply) == 0 {
return
}
_, msg := c.msgParts(rmsg)
// This version expects no payload.
if len(msg) != 0 {
hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n")
Expand All @@ -3611,19 +3620,15 @@ func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *cli
return
}

req := JSApiMsgGetRequest{LastFor: key}

inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}

if inlineOk {
mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply)
dg := dgPool.Get().(*directGetReq)
dg.req, dg.reply = req, reply
mset.gets.push(dg)
} else {
go mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply)
mset.getDirectRequest(&req, reply)
}
}

Expand Down Expand Up @@ -4332,7 +4337,7 @@ func (mset *stream) internalLoop() {
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
outq, qch, msgs := mset.outq, mset.qch, mset.msgs
outq, qch, msgs, gets := mset.outq, mset.qch, mset.msgs, mset.gets

// For the ack msgs queue for interest retention.
var (
Expand Down Expand Up @@ -4413,6 +4418,14 @@ func (mset *stream) internalLoop() {
}
}
msgs.recycle(&ims)
case <-gets.ch:
dgs := gets.pop()
for _, dg := range dgs {
mset.getDirectRequest(&dg.req, dg.reply)
dgPool.Put(dg)
}
gets.recycle(&dgs)

case <-amch:
seqs := ackq.pop()
for _, seq := range seqs {
Expand Down Expand Up @@ -5099,7 +5112,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
if err != nil {
return nil, err
}
if hdr.Typeflag != tar.TypeReg && hdr.Typeflag != tar.TypeRegA {
if hdr.Typeflag != tar.TypeReg {
return nil, logAndReturnError()
}
fpath := filepath.Join(sdir, filepath.Clean(hdr.Name))
Expand Down