Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore outbound queue coalescing #4093

Merged
merged 1 commit into from Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 20 additions & 9 deletions server/client.go
Expand Up @@ -328,19 +328,14 @@ var nbPoolLarge = &sync.Pool{
}

func nbPoolGet(sz int) []byte {
var new []byte
switch {
case sz <= nbPoolSizeSmall:
ptr := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)
new = ptr[:0]
return nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:0]
case sz <= nbPoolSizeMedium:
ptr := nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)
new = ptr[:0]
return nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)[:0]
default:
ptr := nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)
new = ptr[:0]
return nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)[:0]
}
return new
}

func nbPoolPut(b []byte) {
Expand Down Expand Up @@ -1447,7 +1442,8 @@ func (c *client) flushOutbound() bool {
// "nb" will be reset back to its starting position so it can be modified
// safely by queueOutbound calls.
c.out.wnb = append(c.out.wnb, collapsed...)
orig := append(net.Buffers{}, c.out.wnb...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L1479-1485 to return consumed buffers to the pool. The net.Buffers.WriteTo() function modifies the receiver slice by trimming sent buffers off the beginning, so we need to keep a copy of those references to recycle them.

c.out.nb = c.out.nb[:0]

// Since WriteTo is lopping things off the beginning, we need to remember
Expand Down Expand Up @@ -2041,6 +2037,21 @@ func (c *client) queueOutbound(data []byte) {
// without affecting the original "data" slice.
toBuffer := data

// All of the queued []byte have a fixed capacity, so if there's a []byte
// at the tail of the buffer list that isn't full yet, we should top that
// up first. This helps to ensure we aren't pulling more []bytes from the
// pool than we need to.
if len(c.out.nb) > 0 {
last := &c.out.nb[len(c.out.nb)-1]
if free := cap(*last) - len(*last); free > 0 {
if l := len(toBuffer); l < free {
free = l
}
*last = append(*last, toBuffer[:free]...)
toBuffer = toBuffer[free:]
}
}

// Now we can push the rest of the data into new []bytes from the pool
// in fixed size chunks. This ensures we don't go over the capacity of any
// of the buffers and end up reallocating.
Expand Down
60 changes: 60 additions & 0 deletions server/client_test.go
Expand Up @@ -1483,6 +1483,66 @@ func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) {
}
}

// This test ensures that coalescing into the fixed-size output
// queues works as expected. When bytes are queued up, they should
// not overflow a buffer until the capacity is exceeded, at which
// point a new buffer should be added.
func TestClientOutboundQueueCoalesce(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

clients := s.GlobalAccount().getClients()
if len(clients) != 1 {
t.Fatal("Expecting a client to exist")
}
client := clients[0]
client.mu.Lock()
defer client.mu.Unlock()

// First up, queue something small into the queue.
client.queueOutbound([]byte{1, 2, 3, 4, 5})

if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
if l := len(client.out.nb[0]); l != 5 {
t.Fatalf("Expecting only 5 bytes in the first queued buffer, found %d instead", l)
}

// Then queue up a few more bytes, but not enough
// to overflow into the next buffer.
client.queueOutbound([]byte{6, 7, 8, 9, 10})

if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
if l := len(client.out.nb[0]); l != 10 {
t.Fatalf("Expecting 10 bytes in the first queued buffer, found %d instead", l)
}

// Finally, queue up something that is guaranteed
// to overflow.
b := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:]
b = b[:cap(b)]
client.queueOutbound(b)
if len(client.out.nb) != 2 {
t.Fatal("Expecting buffer to have overflowed")
}
if l := len(client.out.nb[0]); l != cap(b) {
t.Fatalf("Expecting %d bytes in the first queued buffer, found %d instead", cap(b), l)
}
if l := len(client.out.nb[1]); l != 10 {
t.Fatalf("Expecting 10 bytes in the second queued buffer, found %d instead", l)
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestClientOutboundQueueMemory(t *testing.T) {
Expand Down