Skip to content

Commit

Permalink
[IMPROVED] Optimize non-inline direct gets to not use simple go routi…
Browse files Browse the repository at this point in the history
…nes. (#4028)

This was causing regression in certain KV performance tests.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 6, 2023
2 parents 7b82384 + 2da5051 commit 1e80ea6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 27 deletions.
3 changes: 0 additions & 3 deletions server/client.go
Expand Up @@ -110,9 +110,6 @@ const (
// For stalling fast producers
stallClientMinDuration = 100 * time.Millisecond
stallClientMaxDuration = time.Second

// Threshold for not knowingly doing a potential blocking operation when internal and on a route or gateway or leafnode.
noBlockThresh = 500 * time.Millisecond
)

var readLoopReportThreshold = readLoopReport
Expand Down
61 changes: 37 additions & 24 deletions server/stream.go
Expand Up @@ -193,6 +193,7 @@ type stream struct {
pubAck []byte
outq *jsOutQ
msgs *ipQueue[*inMsg]
gets *ipQueue[*directGetReq]
store StreamStore
ackq *ipQueue[uint64]
lseq uint64
Expand Down Expand Up @@ -460,6 +461,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
stype: cfg.Storage,
consumers: make(map[string]*consumer),
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
Expand Down Expand Up @@ -3528,12 +3530,25 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
mset.queueInbound(mset.msgs, subj, rply, hdr, msg)
}

var dgPool = sync.Pool{
New: func() interface{} {
return &directGetReq{}
},
}

// For when we need to not inline the request.
type directGetReq struct {
// Copy of this is correct for this.
req JSApiMsgGetRequest
reply string
}

// processDirectGetRequest handles direct get request for stream messages.
func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
if len(reply) == 0 {
return
}
_, msg := c.msgParts(rmsg)
if len(msg) == 0 {
hdr := []byte("NATS/1.0 408 Empty Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
Expand Down Expand Up @@ -3566,26 +3581,20 @@ func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Accou

inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}

if inlineOk {
mset.getDirectRequest(&req, reply)
dg := dgPool.Get().(*directGetReq)
dg.req, dg.reply = req, reply
mset.gets.push(dg)
} else {
go mset.getDirectRequest(&req, reply)
mset.getDirectRequest(&req, reply)
}
}

// This is for direct get by last subject which is part of the subject itself.
func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
if len(reply) == 0 {
return
}
_, msg := c.msgParts(rmsg)
// This version expects no payload.
if len(msg) != 0 {
hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n")
Expand All @@ -3611,19 +3620,15 @@ func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *cli
return
}

req := JSApiMsgGetRequest{LastFor: key}

inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}

if inlineOk {
mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply)
dg := dgPool.Get().(*directGetReq)
dg.req, dg.reply = req, reply
mset.gets.push(dg)
} else {
go mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply)
mset.getDirectRequest(&req, reply)
}
}

Expand Down Expand Up @@ -4332,7 +4337,7 @@ func (mset *stream) internalLoop() {
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
outq, qch, msgs := mset.outq, mset.qch, mset.msgs
outq, qch, msgs, gets := mset.outq, mset.qch, mset.msgs, mset.gets

// For the ack msgs queue for interest retention.
var (
Expand Down Expand Up @@ -4413,6 +4418,14 @@ func (mset *stream) internalLoop() {
}
}
msgs.recycle(&ims)
case <-gets.ch:
dgs := gets.pop()
for _, dg := range dgs {
mset.getDirectRequest(&dg.req, dg.reply)
dgPool.Put(dg)
}
gets.recycle(&dgs)

case <-amch:
seqs := ackq.pop()
for _, seq := range seqs {
Expand Down Expand Up @@ -5099,7 +5112,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
if err != nil {
return nil, err
}
if hdr.Typeflag != tar.TypeReg && hdr.Typeflag != tar.TypeRegA {
if hdr.Typeflag != tar.TypeReg {
return nil, logAndReturnError()
}
fpath := filepath.Join(sdir, filepath.Clean(hdr.Name))
Expand Down

0 comments on commit 1e80ea6

Please sign in to comment.