Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce allocations in WebSockets #4623

Merged
merged 3 commits into from Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 17 additions & 14 deletions server/client.go
Expand Up @@ -346,20 +346,23 @@ func nbPoolGet(sz int) []byte {
}
}

func nbPoolPut(b []byte) {
switch cap(b) {
case nbPoolSizeSmall:
b := (*[nbPoolSizeSmall]byte)(b[0:nbPoolSizeSmall])
nbPoolSmall.Put(b)
case nbPoolSizeMedium:
b := (*[nbPoolSizeMedium]byte)(b[0:nbPoolSizeMedium])
nbPoolMedium.Put(b)
case nbPoolSizeLarge:
b := (*[nbPoolSizeLarge]byte)(b[0:nbPoolSizeLarge])
nbPoolLarge.Put(b)
default:
// Ignore frames that are the wrong size, this might happen
// with WebSocket/MQTT messages as they are framed
func nbPoolPut(in []byte) {
ca := cap(in)
for in = in[:ca]; ca >= nbPoolSizeSmall; ca = cap(in) {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
switch {
case ca >= nbPoolSizeLarge:
b := (*[nbPoolSizeLarge]byte)(in[0:nbPoolSizeLarge:nbPoolSizeLarge])
nbPoolLarge.Put(b)
in = in[nbPoolSizeLarge:]
case ca >= nbPoolSizeMedium:
b := (*[nbPoolSizeMedium]byte)(in[0:nbPoolSizeMedium:nbPoolSizeMedium])
nbPoolMedium.Put(b)
in = in[nbPoolSizeMedium:]
case ca >= nbPoolSizeSmall:
b := (*[nbPoolSizeSmall]byte)(in[0:nbPoolSizeSmall:nbPoolSizeSmall])
nbPoolSmall.Put(b)
in = in[nbPoolSizeSmall:]
}
}
}

Expand Down
33 changes: 15 additions & 18 deletions server/websocket.go
Expand Up @@ -115,6 +115,7 @@ type websocket struct {
nocompfrag bool // No fragment for compressed frames
maskread bool
maskwrite bool
compressor *flate.Writer
cookieJwt string
clientIP string
}
Expand Down Expand Up @@ -1295,7 +1296,13 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
mfs = 0
}
buf := bytes.NewBuffer(nbPoolGet(usz))
cp, _ := flate.NewWriter(buf, flate.BestSpeed)
cp := c.ws.compressor
if cp == nil {
c.ws.compressor, _ = flate.NewWriter(buf, flate.BestSpeed)
cp = c.ws.compressor
} else {
cp.Reset(buf)
}
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
var csz int
for _, b := range nb {
cp.Write(b)
Expand Down Expand Up @@ -1323,9 +1330,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mask {
wsMaskBuf(key, p[:lp])
}
new := nbPoolGet(wsFrameSizeForBrowsers)
lp = copy(new[:wsFrameSizeForBrowsers], p[:lp])
bufs = append(bufs, fh[:n], new[:lp])
bufs = append(bufs, fh[:n], p[:lp])
csz += n + lp
p = p[lp:]
}
Expand All @@ -1335,16 +1340,9 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mask {
wsMaskBuf(key, p)
}
bufs = append(bufs, h)
for len(p) > 0 {
new := nbPoolGet(len(p))
n := copy(new[:cap(new)], p)
bufs = append(bufs, new[:n])
p = p[n:]
}
bufs = append(bufs, h, p)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
csz = len(h) + ol
}
nbPoolPut(b) // No longer needed as we copied from above.
// Add to pb the compressed data size (including headers), but
// remove the original uncompressed data size that was added
// during the queueing.
Expand All @@ -1355,14 +1353,15 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mfs > 0 {
// We are limiting the frame size.
startFrame := func() int {
bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize])
bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize))
return len(bufs) - 1
}
endFrame := func(idx, size int) {
bufs[idx] = bufs[idx][:wsMaxFrameHeaderSize]
n, key := wsFillFrameHeader(bufs[idx], mask, wsFirstFrame, wsFinalFrame, wsUncompressedFrame, wsBinaryMessage, size)
bufs[idx] = bufs[idx][:n]
c.out.pb += int64(n)
c.ws.fs += int64(n + size)
bufs[idx] = bufs[idx][:n]
if mask {
wsMaskBufs(key, bufs[idx+1:])
}
Expand All @@ -1388,10 +1387,8 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if endStart {
fhIdx = startFrame()
}
new := nbPoolGet(total)
n := copy(new[:cap(new)], b[:total])
bufs = append(bufs, new[:n])
b = b[n:]
bufs = append(bufs, b[:total])
b = b[total:]
}
}
if total > 0 {
Expand Down