Skip to content

Commit

Permalink
Re-add coalescing to outbound queues
Browse files Browse the repository at this point in the history
Originally I thought there was a race condition happening here,
but it turns out it is safe after all and the race condition I
was seeing was due to other problems in the WebSocket code.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Apr 25, 2023
1 parent b0d98df commit f04336e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 9 deletions.
29 changes: 20 additions & 9 deletions server/client.go
Expand Up @@ -328,19 +328,14 @@ var nbPoolLarge = &sync.Pool{
}

func nbPoolGet(sz int) []byte {
var new []byte
switch {
case sz <= nbPoolSizeSmall:
ptr := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)
new = ptr[:0]
return nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:0]
case sz <= nbPoolSizeMedium:
ptr := nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)
new = ptr[:0]
return nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)[:0]
default:
ptr := nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)
new = ptr[:0]
return nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)[:0]
}
return new
}

func nbPoolPut(b []byte) {
Expand Down Expand Up @@ -1447,7 +1442,8 @@ func (c *client) flushOutbound() bool {
// "nb" will be reset back to its starting position so it can be modified
// safely by queueOutbound calls.
c.out.wnb = append(c.out.wnb, collapsed...)
orig := append(net.Buffers{}, c.out.wnb...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)
c.out.nb = c.out.nb[:0]

// Since WriteTo is lopping things off the beginning, we need to remember
Expand Down Expand Up @@ -2041,6 +2037,21 @@ func (c *client) queueOutbound(data []byte) {
// without affecting the original "data" slice.
toBuffer := data

// All of the queued []byte have a fixed capacity, so if there's a []byte
// at the tail of the buffer list that isn't full yet, we should top that
// up first. This helps to ensure we aren't pulling more []bytes from the
// pool than we need to.
if len(c.out.nb) > 0 {
last := &c.out.nb[len(c.out.nb)-1]
if free := cap(*last) - len(*last); free > 0 {
if l := len(toBuffer); l < free {
free = l
}
*last = append(*last, toBuffer[:free]...)
toBuffer = toBuffer[free:]
}
}

// Now we can push the rest of the data into new []bytes from the pool
// in fixed size chunks. This ensures we don't go over the capacity of any
// of the buffers and end up reallocating.
Expand Down
60 changes: 60 additions & 0 deletions server/client_test.go
Expand Up @@ -1483,6 +1483,66 @@ func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) {
}
}

// This test ensures that coalescing into the fixed-size output
// queues works as expected. When bytes are queued up, they should
// not overflow a buffer until the capacity is exceeded, at which
// point a new buffer should be added.
func TestClientOutboundQueueCoalesce(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

clients := s.GlobalAccount().getClients()
if len(clients) != 1 {
t.Fatal("Expecting a client to exist")
}
client := clients[0]
client.mu.Lock()
defer client.mu.Unlock()

// First up, queue something small into the queue.
client.queueOutbound([]byte{1, 2, 3, 4, 5})

if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
if l := len(client.out.nb[0]); l != 5 {
t.Fatalf("Expecting only 5 bytes in the first queued buffer, found %d instead", l)
}

// Then queue up a few more bytes, but not enough
// to overflow into the next buffer.
client.queueOutbound([]byte{6, 7, 8, 9, 10})

if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
if l := len(client.out.nb[0]); l != 10 {
t.Fatalf("Expecting 10 bytes in the first queued buffer, found %d instead", l)
}

// Finally, queue up something that is guaranteed
// to overflow.
b := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:]
b = b[:cap(b)]
client.queueOutbound(b)
if len(client.out.nb) != 2 {
t.Fatal("Expecting buffer to have overflowed")
}
if l := len(client.out.nb[0]); l != cap(b) {
t.Fatalf("Expecting %d bytes in the first queued buffer, found %d instead", cap(b), l)
}
if l := len(client.out.nb[1]); l != 10 {
t.Fatalf("Expecting 10 bytes in the second queued buffer, found %d instead", l)
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestClientOutboundQueueMemory(t *testing.T) {
Expand Down

0 comments on commit f04336e

Please sign in to comment.