Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce allocations in WebSockets #4623

Merged
merged 3 commits into from Oct 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 21 additions & 17 deletions server/websocket.go
Expand Up @@ -115,6 +115,7 @@ type websocket struct {
nocompfrag bool // No fragment for compressed frames
maskread bool
maskwrite bool
compressor *flate.Writer
cookieJwt string
clientIP string
}
Expand Down Expand Up @@ -1295,7 +1296,13 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
mfs = 0
}
buf := bytes.NewBuffer(nbPoolGet(usz))
cp, _ := flate.NewWriter(buf, flate.BestSpeed)
cp := c.ws.compressor
if cp == nil {
c.ws.compressor, _ = flate.NewWriter(buf, flate.BestSpeed)
cp = c.ws.compressor
} else {
cp.Reset(buf)
}
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
var csz int
for _, b := range nb {
cp.Write(b)
Expand Down Expand Up @@ -1323,9 +1330,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mask {
wsMaskBuf(key, p[:lp])
}
new := nbPoolGet(wsFrameSizeForBrowsers)
lp = copy(new[:wsFrameSizeForBrowsers], p[:lp])
bufs = append(bufs, fh[:n], new[:lp])
bufs = append(bufs, fh[:n], p[:lp])
csz += n + lp
p = p[lp:]
}
Expand All @@ -1335,16 +1340,16 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mask {
wsMaskBuf(key, p)
}
bufs = append(bufs, h)
for len(p) > 0 {
new := nbPoolGet(len(p))
n := copy(new[:cap(new)], p)
bufs = append(bufs, new[:n])
p = p[n:]
if ol > 0 {
bufs = append(bufs, h, p)
}
csz = len(h) + ol
}
nbPoolPut(b) // No longer needed as we copied from above.
// Make sure that the compressor no longer holds a reference to
// the bytes.Buffer, so that the underlying memory gets cleaned
// up after flushOutbound/flushAndClose. For this to be safe, we
// always cp.Reset(...) before reusing the compressor again.
cp.Reset(nil)
// Add to pb the compressed data size (including headers), but
// remove the original uncompressed data size that was added
// during the queueing.
Expand All @@ -1355,14 +1360,15 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mfs > 0 {
// We are limiting the frame size.
startFrame := func() int {
bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize])
bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize))
return len(bufs) - 1
}
endFrame := func(idx, size int) {
bufs[idx] = bufs[idx][:wsMaxFrameHeaderSize]
n, key := wsFillFrameHeader(bufs[idx], mask, wsFirstFrame, wsFinalFrame, wsUncompressedFrame, wsBinaryMessage, size)
bufs[idx] = bufs[idx][:n]
c.out.pb += int64(n)
c.ws.fs += int64(n + size)
bufs[idx] = bufs[idx][:n]
if mask {
wsMaskBufs(key, bufs[idx+1:])
}
Expand All @@ -1388,10 +1394,8 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if endStart {
fhIdx = startFrame()
}
new := nbPoolGet(total)
n := copy(new[:cap(new)], b[:total])
bufs = append(bufs, new[:n])
b = b[n:]
bufs = append(bufs, b[:total])
b = b[total:]
}
}
if total > 0 {
Expand Down