Skip to content

Commit

Permalink
Cherry-pick #3733 (Refactor outbound queues, remove dynamic sizing, a…
Browse files Browse the repository at this point in the history
…dd buffer reuse) (#3965)

This brings #3733 forward from `dev` into `main`, to go into the next
release.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison committed Apr 3, 2023
2 parents 14ad983 + f2bffec commit b2b3ed9
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 243 deletions.
15 changes: 10 additions & 5 deletions server/accounts_test.go
Expand Up @@ -2099,12 +2099,17 @@ func TestCrossAccountServiceResponseTypes(t *testing.T) {

cfoo.parseAsync(string(mReply))

var b [256]byte
n, err := crBar.Read(b[:])
if err != nil {
t.Fatalf("Error reading response: %v", err)
var buf []byte
for i := 0; i < 20; i++ {
b, err := crBar.ReadBytes('\n')
if err != nil {
t.Fatalf("Error reading response: %v", err)
}
buf = append(buf[:], b...)
if mraw = msgPat.FindAllStringSubmatch(string(buf), -1); len(mraw) == 10 {
break
}
}
mraw = msgPat.FindAllStringSubmatch(string(b[:n]), -1)
if len(mraw) != 10 {
t.Fatalf("Expected a response but got %d", len(mraw))
}
Expand Down
253 changes: 133 additions & 120 deletions server/client.go
Expand Up @@ -293,13 +293,9 @@ type pinfo struct {

// outbound holds pending data for a socket.
type outbound struct {
p []byte // Primary write buffer
s []byte // Secondary for use post flush
nb net.Buffers // net.Buffers for writev IO
sz int32 // limit size per []byte, uses variable BufSize constants, start, min, max.
sws int32 // Number of short writes, used for dynamic resizing.
nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below.
wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
pb int64 // Total pending/queued bytes.
pm int32 // Total pending/queued messages.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
sg *sync.Cond // To signal writeLoop that there is data to flush.
wdl time.Duration // Snapshot of write deadline.
Expand All @@ -308,6 +304,37 @@ type outbound struct {
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
}

const nbPoolSizeSmall = 4096 // Underlying array size of small buffer
const nbPoolSizeLarge = 65536 // Underlying array size of large buffer

var nbPoolSmall = &sync.Pool{
New: func() any {
b := [nbPoolSizeSmall]byte{}
return &b
},
}

var nbPoolLarge = &sync.Pool{
New: func() any {
b := [nbPoolSizeLarge]byte{}
return &b
},
}

func nbPoolPut(b []byte) {
switch cap(b) {
case nbPoolSizeSmall:
b := (*[nbPoolSizeSmall]byte)(b[0:nbPoolSizeSmall])
nbPoolSmall.Put(b)
case nbPoolSizeLarge:
b := (*[nbPoolSizeLarge]byte)(b[0:nbPoolSizeLarge])
nbPoolLarge.Put(b)
default:
// Ignore frames that are the wrong size, this might happen
// with WebSocket/MQTT messages as they are framed
}
}

type perm struct {
allow *Sublist
deny *Sublist
Expand Down Expand Up @@ -584,7 +611,6 @@ func (c *client) initClient() {
c.cid = atomic.AddUint64(&s.gcid, 1)

// Outbound data structure setup
c.out.sz = startBufSize
c.out.sg = sync.NewCond(&(c.mu))
opts := s.getOpts()
// Snapshots to avoid mutex access in fast paths.
Expand Down Expand Up @@ -1344,11 +1370,6 @@ func (c *client) collapsePtoNB() (net.Buffers, int64) {
if c.isWebsocket() {
return c.wsCollapsePtoNB()
}
if c.out.p != nil {
p := c.out.p
c.out.p = nil
return append(c.out.nb, p), c.out.pb
}
return c.out.nb, c.out.pb
}

Expand All @@ -1359,9 +1380,6 @@ func (c *client) handlePartialWrite(pnb net.Buffers) {
c.ws.frames = append(pnb, c.ws.frames...)
return
}
nb, _ := c.collapsePtoNB()
// The partial needs to be first, so append nb to pnb
c.out.nb = append(pnb, nb...)
}

// flushOutbound will flush outbound buffer to a client.
Expand All @@ -1385,26 +1403,37 @@ func (c *client) flushOutbound() bool {
return true // true because no need to queue a signal.
}

// Place primary on nb, assign primary to secondary, nil out nb and secondary.
nb, attempted := c.collapsePtoNB()
c.out.p, c.out.nb, c.out.s = c.out.s, nil, nil
if nb == nil {
return true
}

// For selecting primary replacement.
cnb := nb
var lfs int
if len(cnb) > 0 {
lfs = len(cnb[0])
}
// In the case of a normal socket connection, "collapsed" is just a ref
// to "nb". In the case of WebSockets, additional framing is added to
// anything that is waiting in "nb". Also keep a note of how many bytes
// were queued before we release the mutex.
collapsed, attempted := c.collapsePtoNB()

// Frustratingly, (net.Buffers).WriteTo() modifies the receiver so we
// can't work on "nb" directly — while the mutex is unlocked during IO,
// something else might call queueOutbound and modify it. So instead we
// need a working copy — we'll operate on "wnb" instead. Note that in
// the case of a partial write, "wnb" may have remaining data from the
// previous write, and in the case of WebSockets, that data may already
// be framed, so we are careful not to re-frame "wnb" here. Instead we
// will just frame up "nb" and append it onto whatever is left on "wnb".
// "nb" will be reset back to its starting position so it can be modified
// safely by queueOutbound calls.
c.out.wnb = append(c.out.wnb, collapsed...)
orig := append(net.Buffers{}, c.out.wnb...)
c.out.nb = c.out.nb[:0]

// Since WriteTo is lopping things off the beginning, we need to remember
// the start position of the underlying array so that we can get back to it.
// Otherwise we'll always "slide forward" and that will result in reallocs.
startOfWnb := c.out.wnb[0:]

// In case it goes away after releasing the lock.
nc := c.nc
apm := c.out.pm

// Capture this (we change the value in some tests)
wdl := c.out.wdl

// Do NOT hold lock during actual IO.
c.mu.Unlock()

Expand All @@ -1416,19 +1445,43 @@ func (c *client) flushOutbound() bool {
nc.SetWriteDeadline(start.Add(wdl))

// Actual write to the socket.
n, err := nb.WriteTo(nc)
n, err := c.out.wnb.WriteTo(nc)
nc.SetWriteDeadline(time.Time{})

lft := time.Since(start)

// Re-acquire client lock.
c.mu.Lock()

// At this point, "wnb" has been mutated by WriteTo and any consumed
// buffers have been lopped off the beginning, so in order to return
// them to the pool, we need to look at the difference between "orig"
// and "wnb".
for i := 0; i < len(orig)-len(c.out.wnb); i++ {
nbPoolPut(orig[i])
}

// At this point it's possible that "nb" has been modified by another
// call to queueOutbound while the lock was released, so we'll leave
// those for the next iteration. Meanwhile it's possible that we only
// managed a partial write of "wnb", so we'll shift anything that
// remains up to the beginning of the array to prevent reallocating.
// Anything left in "wnb" has already been framed for WebSocket conns
// so leave them alone for the next call to flushOutbound.
c.out.wnb = append(startOfWnb[:0], c.out.wnb...)

// If we've written everything but the underlying array of our working
// buffer has grown excessively then free it — the GC will tidy it up
// and we can allocate a new one next time.
if len(c.out.wnb) == 0 && cap(c.out.wnb) > nbPoolSizeLarge*8 {
c.out.wnb = nil
}

// Ignore ErrShortWrite errors, they will be handled as partials.
if err != nil && err != io.ErrShortWrite {
// Handle timeout error (slow consumer) differently
if ne, ok := err.(net.Error); ok && ne.Timeout() {
if closed := c.handleWriteTimeout(n, attempted, len(cnb)); closed {
if closed := c.handleWriteTimeout(n, attempted, len(c.out.nb)); closed {
return true
}
} else {
Expand All @@ -1452,43 +1505,11 @@ func (c *client) flushOutbound() bool {
if c.isWebsocket() {
c.ws.fs -= n
}
c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate on partials.

// Check for partial writes
// TODO(dlc) - zero write with no error will cause lost message and the writeloop to spin.
if n != attempted && n > 0 {
c.handlePartialWrite(nb)
} else if int32(n) >= c.out.sz {
c.out.sws = 0
}

// Adjust based on what we wrote plus any pending.
pt := n + c.out.pb

// Adjust sz as needed downward, keeping power of 2.
// We do this at a slower rate.
if pt < int64(c.out.sz) && c.out.sz > minBufSize {
c.out.sws++
if c.out.sws > shortsToShrink {
c.out.sz >>= 1
}
}
// Adjust sz as needed upward, keeping power of 2.
if pt > int64(c.out.sz) && c.out.sz < maxBufSize {
c.out.sz <<= 1
}

// Check to see if we can reuse buffers.
if lfs != 0 && n >= int64(lfs) {
oldp := cnb[0][:0]
if cap(oldp) >= int(c.out.sz) {
// Replace primary or secondary if they are nil, reusing same buffer.
if c.out.p == nil {
c.out.p = oldp
} else if c.out.s == nil || cap(c.out.s) < int(c.out.sz) {
c.out.s = oldp
}
}
c.handlePartialWrite(c.out.nb)
}

// Check that if there is still data to send and writeLoop is in wait,
Expand Down Expand Up @@ -1989,6 +2010,49 @@ func (c *client) queueOutbound(data []byte) {
// Add to pending bytes total.
c.out.pb += int64(len(data))

// Take a copy of the slice ref so that we can chop bits off the beginning
// without affecting the original "data" slice.
toBuffer := data

// All of the queued []byte have a fixed capacity, so if there's a []byte
// at the tail of the buffer list that isn't full yet, we should top that
// up first. This helps to ensure we aren't pulling more []bytes from the
// pool than we need to.
if len(c.out.nb) > 0 {
last := &c.out.nb[len(c.out.nb)-1]
if free := cap(*last) - len(*last); free > 0 {
if l := len(toBuffer); l < free {
free = l
}
*last = append(*last, toBuffer[:free]...)
toBuffer = toBuffer[free:]
}
}

// Now we can push the rest of the data into new []bytes from the pool
// in fixed size chunks. This ensures we don't go over the capacity of any
// of the buffers and end up reallocating.
for len(toBuffer) > 0 {
var new []byte
if len(c.out.nb) == 0 && len(toBuffer) <= nbPoolSizeSmall {
// If the buffer is empty, try to allocate a small buffer if the
// message will fit in it. This will help for cases like pings.
new = nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:0]
} else {
// If "nb" isn't empty, default to large buffers in all cases as
// this means we are always coalescing future messages into
// larger buffers. Reduces the number of buffers into writev.
new = nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)[:0]
}
l := len(toBuffer)
if c := cap(new); l > c {
l = c
}
new = append(new, toBuffer[:l]...)
c.out.nb = append(c.out.nb, new)
toBuffer = toBuffer[l:]
}

// Check for slow consumer via pending bytes limit.
// ok to return here, client is going away.
if c.kind == CLIENT && c.out.pb > c.out.mp {
Expand All @@ -2004,58 +2068,6 @@ func (c *client) queueOutbound(data []byte) {
return
}

if c.out.p == nil && len(data) < maxBufSize {
if c.out.sz == 0 {
c.out.sz = startBufSize
}
if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) {
c.out.p = c.out.s
c.out.s = nil
} else {
// FIXME(dlc) - make power of 2 if less than maxBufSize?
c.out.p = make([]byte, 0, c.out.sz)
}
}
// Determine if we copy or reference
available := cap(c.out.p) - len(c.out.p)
if len(data) > available {
// We can't fit everything into existing primary, but message will
// fit in next one we allocate or utilize from the secondary.
// So copy what we can.
if available > 0 && len(data) < int(c.out.sz) {
c.out.p = append(c.out.p, data[:available]...)
data = data[available:]
}
// Put the primary on the nb if it has a payload
if len(c.out.p) > 0 {
c.out.nb = append(c.out.nb, c.out.p)
c.out.p = nil
}
// TODO: It was found with LeafNode and Websocket that referencing
// the data buffer when > maxBufSize would cause corruption
// (reproduced with small maxBufSize=10 and TestLeafNodeWSNoBufferCorruption).
// So always make a copy for now.

// We will copy to primary.
if c.out.p == nil {
// Grow here
if (c.out.sz << 1) <= maxBufSize {
c.out.sz <<= 1
}
if len(data) > int(c.out.sz) {
c.out.p = make([]byte, 0, len(data))
} else {
if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { // TODO(dlc) - Size mismatch?
c.out.p = c.out.s
c.out.s = nil
} else {
c.out.p = make([]byte, 0, c.out.sz)
}
}
}
}
c.out.p = append(c.out.p, data...)

// Check here if we should create a stall channel if we are falling behind.
// We do this here since if we wait for consumer's writeLoop it could be
// too late with large number of fan in producers.
Expand Down Expand Up @@ -3249,8 +3261,6 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
client.queueOutbound([]byte(CR_LF))
}

client.out.pm++

// If we are tracking dynamic publish permissions that track reply subjects,
// do that accounting here. We only look at client.replies which will be non-nil.
if client.replies != nil && len(reply) > 0 {
Expand All @@ -3265,7 +3275,7 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
// to intervene before this producer goes back to top of readloop. We are in the producer's
// readloop go routine at this point.
// FIXME(dlc) - We may call this alot, maybe suppress after first call?
if client.out.pm > 1 && client.out.pb > maxBufSize*2 {
if len(client.out.nb) != 0 {
client.flushSignal()
}

Expand Down Expand Up @@ -4672,7 +4682,10 @@ func (c *client) flushAndClose(minimalFlush bool) {
}
c.flushOutbound()
}
c.out.p, c.out.s = nil, nil
for i := range c.out.nb {
nbPoolPut(c.out.nb[i])
}
c.out.nb = nil

// Close the low level connection.
if c.nc != nil {
Expand Down

0 comments on commit b2b3ed9

Please sign in to comment.