Skip to content

Commit

Permalink
Re-add coalescing to outbound queues
Browse files Browse the repository at this point in the history
Originally I thought there was a race condition happening here,
but it turns out it is safe after all and the race condition I
was seeing was due to other problems in the WebSocket code.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Apr 24, 2023
1 parent b0d98df commit e3f9b1f
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 22 deletions.
42 changes: 20 additions & 22 deletions server/client.go
Expand Up @@ -302,8 +302,7 @@ type outbound struct {
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
}

const nbPoolSizeSmall = 512 // Underlying array size of small buffer
const nbPoolSizeMedium = 4096 // Underlying array size of medium buffer
const nbPoolSizeSmall = 4096 // Underlying array size of small buffer
const nbPoolSizeLarge = 65536 // Underlying array size of large buffer

var nbPoolSmall = &sync.Pool{
Expand All @@ -313,13 +312,6 @@ var nbPoolSmall = &sync.Pool{
},
}

var nbPoolMedium = &sync.Pool{
New: func() any {
b := [nbPoolSizeMedium]byte{}
return &b
},
}

var nbPoolLarge = &sync.Pool{
New: func() any {
b := [nbPoolSizeLarge]byte{}
Expand All @@ -328,29 +320,19 @@ var nbPoolLarge = &sync.Pool{
}

func nbPoolGet(sz int) []byte {
var new []byte
switch {
case sz <= nbPoolSizeSmall:
ptr := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)
new = ptr[:0]
case sz <= nbPoolSizeMedium:
ptr := nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)
new = ptr[:0]
return nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:0]
default:
ptr := nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)
new = ptr[:0]
return nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)[:0]
}
return new
}

func nbPoolPut(b []byte) {
switch cap(b) {
case nbPoolSizeSmall:
b := (*[nbPoolSizeSmall]byte)(b[0:nbPoolSizeSmall])
nbPoolSmall.Put(b)
case nbPoolSizeMedium:
b := (*[nbPoolSizeMedium]byte)(b[0:nbPoolSizeMedium])
nbPoolMedium.Put(b)
case nbPoolSizeLarge:
b := (*[nbPoolSizeLarge]byte)(b[0:nbPoolSizeLarge])
nbPoolLarge.Put(b)
Expand Down Expand Up @@ -1447,7 +1429,8 @@ func (c *client) flushOutbound() bool {
// "nb" will be reset back to its starting position so it can be modified
// safely by queueOutbound calls.
c.out.wnb = append(c.out.wnb, collapsed...)
orig := append(net.Buffers{}, c.out.wnb...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)
c.out.nb = c.out.nb[:0]

// Since WriteTo is lopping things off the beginning, we need to remember
Expand Down Expand Up @@ -2041,6 +2024,21 @@ func (c *client) queueOutbound(data []byte) {
// without affecting the original "data" slice.
toBuffer := data

// All of the queued []byte have a fixed capacity, so if there's a []byte
// at the tail of the buffer list that isn't full yet, we should top that
// up first. This helps to ensure we aren't pulling more []bytes from the
// pool than we need to.
if len(c.out.nb) > 0 {
last := &c.out.nb[len(c.out.nb)-1]
if free := cap(*last) - len(*last); free > 0 {
if l := len(toBuffer); l < free {
free = l
}
*last = append(*last, toBuffer[:free]...)
toBuffer = toBuffer[free:]
}
}

// Now we can push the rest of the data into new []bytes from the pool
// in fixed size chunks. This ensures we don't go over the capacity of any
// of the buffers and end up reallocating.
Expand Down
60 changes: 60 additions & 0 deletions server/client_test.go
Expand Up @@ -1483,6 +1483,66 @@ func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) {
}
}

// This test ensures that coalescing into the fixed-size output
// queues works as expected. When bytes are queued up, they should
// not overflow a buffer until the capacity is exceeded, at which
// point a new buffer should be added.
func TestClientOutboundQueueCoalesce(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

clients := s.GlobalAccount().getClients()
if len(clients) != 1 {
t.Fatal("Expecting a client to exist")
}
client := clients[0]
client.mu.Lock()
defer client.mu.Unlock()

// First up, queue something small into the queue.
client.queueOutbound([]byte{1, 2, 3, 4, 5})

if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
if l := len(client.out.nb[0]); l != 5 {
t.Fatalf("Expecting only 5 bytes in the first queued buffer, found %d instead", l)
}

// Then queue up a few more bytes, but not enough
// to overflow into the next buffer.
client.queueOutbound([]byte{6, 7, 8, 9, 10})

if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
if l := len(client.out.nb[0]); l != 10 {
t.Fatalf("Expecting 10 bytes in the first queued buffer, found %d instead", l)
}

// Finally, queue up something that is guaranteed
// to overflow.
b := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:]
b = b[:cap(b)]
client.queueOutbound(b)
if len(client.out.nb) != 2 {
t.Fatal("Expecting buffer to have overflowed")
}
if l := len(client.out.nb[0]); l != cap(b) {
t.Fatalf("Expecting %d bytes in the first queued buffer, found %d instead", cap(b), l)
}
if l := len(client.out.nb[1]); l != 10 {
t.Fatalf("Expecting 10 bytes in the second queued buffer, found %d instead", l)
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestClientOutboundQueueMemory(t *testing.T) {
Expand Down

0 comments on commit e3f9b1f

Please sign in to comment.