Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] LeafNode: Support for s2 compression #4167

Merged
merged 4 commits into from May 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 51 additions & 19 deletions server/client.go
Expand Up @@ -84,6 +84,12 @@ const (
okProto = "+OK" + _CRLF_
)

// TLS Hanshake client types
const (
tlsHandshakeLeaf = "leafnode"
tlsHandshakeMQTT = "mqtt"
)

func init() {
rand.Seed(time.Now().UnixNano())
}
Expand Down Expand Up @@ -1275,7 +1281,7 @@ func (c *client) readLoop(pre []byte) {
if ws {
masking = c.ws.maskread
}
checkCompress := c.kind == ROUTER
checkCompress := c.kind == ROUTER || c.kind == LEAF
c.mu.Unlock()

defer func() {
Expand Down Expand Up @@ -1303,7 +1309,9 @@ func (c *client) readLoop(pre []byte) {
wsr.init()
}

var decompress *s2.Reader
var decompress bool
var reader io.Reader
reader = nc

for {
var n int
Expand All @@ -1315,19 +1323,15 @@ func (c *client) readLoop(pre []byte) {
n = len(pre)
pre = nil
} else {
if decompress != nil {
n, err = decompress.Read(b)
} else {
n, err = nc.Read(b)
}
n, err = reader.Read(b)
// If we have any data we will try to parse and exit at the end.
if n == 0 && err != nil {
c.closeConnection(closedStateForErr(err))
return
}
}
if ws {
bufs, err = c.wsRead(wsr, nc, b[:n])
bufs, err = c.wsRead(wsr, reader, b[:n])
if bufs == nil && err != nil {
if err != io.EOF {
c.Errorf("read error: %v", err)
Expand Down Expand Up @@ -1386,11 +1390,13 @@ func (c *client) readLoop(pre []byte) {
}
}

// If we are a ROUTER and have processed an INFO, it is possible that
// If we are a ROUTER/LEAF and have processed an INFO, it is possible that
// we are asked to switch to compression now.
if checkCompress && c.in.flags.isSet(switchToCompression) {
c.in.flags.clear(switchToCompression)
decompress = s2.NewReader(nc)
// For now we support only s2 compression...
reader = s2.NewReader(nc)
decompress = true
}

// Updates stats for client and server that were collected
Expand Down Expand Up @@ -1440,8 +1446,11 @@ func (c *client) readLoop(pre []byte) {
// Refresh nc because in some cases, we have upgraded c.nc to TLS.
if nc != c.nc {
nc = c.nc
if decompress != nil {
decompress.Reset(nc)
if decompress && nc != nil {
// For now we support only s2 compression...
reader.(*s2.Reader).Reset(nc)
} else if !decompress {
reader = nc
}
}
c.mu.Unlock()
Expand Down Expand Up @@ -1545,6 +1554,9 @@ func (c *client) flushOutbound() bool {
if cw != nil {
// We will have to adjust once we have compressed, so remove for now.
c.out.pb -= attempted
if c.isWebsocket() {
c.ws.fs -= attempted
}
}

// Do NOT hold lock during actual IO.
Expand Down Expand Up @@ -1604,6 +1616,9 @@ func (c *client) flushOutbound() bool {
// Adjust if we were compressing.
if cw != nil {
c.out.pb += attempted
if c.isWebsocket() {
c.ws.fs += attempted
}
}

// At this point, "wnb" has been mutated by WriteTo and any consumed
Expand Down Expand Up @@ -2389,23 +2404,40 @@ func (c *client) processPong() {
c.rtt = computeRTT(c.rttStart)
srv := c.srv
reorderGWs := c.kind == GATEWAY && c.gw.outbound
// If compression is currently active for a route connection, if the
// If compression is currently active for a route/leaf connection, if the
// compression configuration is s2_auto, check if we should change
// the compression level.
if c.kind == ROUTER && needsCompression(c.route.compression) {
if co := &(srv.getOpts().Cluster.Compression); co.Mode == CompressionS2Auto {
if cm := selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds); cm != c.route.compression {
c.route.compression = cm
c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
}
c.updateS2AutoCompressionLevel(&srv.getOpts().Cluster.Compression, &c.route.compression)
} else if c.kind == LEAF && needsCompression(c.leaf.compression) {
var co *CompressionOpts
if r := c.leaf.remote; r != nil {
co = &r.Compression
} else {
co = &srv.getOpts().LeafNode.Compression
}
c.updateS2AutoCompressionLevel(co, &c.leaf.compression)
}
c.mu.Unlock()
if reorderGWs {
srv.gateway.orderOutboundConnections()
}
}

// Select the s2 compression level based on the client's current RTT and the configured
// RTT thresholds slice. If current level is different than selected one, save the
// new compression level string and create a new s2 writer.
// Lock held on entry.
func (c *client) updateS2AutoCompressionLevel(co *CompressionOpts, compression *string) {
if co.Mode != CompressionS2Auto {
return
}
if cm := selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds); cm != *compression {
*compression = cm
c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
}
}

// Will return the parts from the raw wire msg.
func (c *client) msgParts(data []byte) (hdr []byte, msg []byte) {
if c != nil && c.pa.hdr > 0 {
Expand Down Expand Up @@ -5646,7 +5678,7 @@ func (c *client) setFirstPingTimer() {
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
addDelay := rand.Int63n(int64(d / 5))
d += time.Duration(addDelay)
// In the case of ROUTER and when compression is configured, it is possible
// In the case of ROUTER/LEAF and when compression is configured, it is possible
// that this timer was already set, but just to detect a stale connection
// since we have to delay the first PING after compression negotiation
// occurred.
Expand Down
132 changes: 132 additions & 0 deletions server/config_check_test.go
Expand Up @@ -1659,6 +1659,138 @@ func TestConfigCheck(t *testing.T) {
errorLine: 6,
errorPos: 7,
},
{
name: "wrong type for leafnodes compression",
config: `
leafnodes {
port: -1
compression: 123
}
`,
err: fmt.Errorf("field %q should be a boolean or a structure, got int64", "compression"),
errorLine: 4,
errorPos: 6,
},
{
name: "wrong type for leafnodes compression mode",
config: `
leafnodes {
port: -1
compression: {
mode: 123
}
}
`,
err: fmt.Errorf("interface conversion: interface {} is int64, not string"),
errorLine: 5,
errorPos: 7,
},
{
name: "wrong type for leafnodes compression rtt thresholds",
config: `
leafnodes {
port: -1
compression: {
mode: "s2_auto"
rtt_thresholds: 123
}
}
`,
err: fmt.Errorf("interface conversion: interface {} is int64, not []interface {}"),
errorLine: 6,
errorPos: 7,
},
{
name: "invalid durations for leafnodes compression rtt thresholds",
config: `
leafnodes {
port: -1
compression: {
mode: "s2_auto"
rtt_thresholds: [abc]
}
}
`,
err: fmt.Errorf("time: invalid duration %q", "abc"),
errorLine: 6,
errorPos: 7,
},
{
name: "wrong type for remote leafnodes compression",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
compression: 123
}
]
}
`,
err: fmt.Errorf("field %q should be a boolean or a structure, got int64", "compression"),
errorLine: 7,
errorPos: 8,
},
{
name: "wrong type for remote leafnodes compression mode",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
compression: {
mode: 123
}
}
]
}
`,
err: fmt.Errorf("interface conversion: interface {} is int64, not string"),
errorLine: 8,
errorPos: 9,
},
{
name: "wrong type for remote leafnodes compression rtt thresholds",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
compression: {
mode: "s2_auto"
rtt_thresholds: 123
}
}
]
}
`,
err: fmt.Errorf("interface conversion: interface {} is int64, not []interface {}"),
errorLine: 9,
errorPos: 9,
},
{
name: "invalid durations for remote leafnodes compression rtt thresholds",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
compression: {
mode: "s2_auto"
rtt_thresholds: [abc]
}
}
]
}
`,
err: fmt.Errorf("time: invalid duration %q", "abc"),
errorLine: 9,
errorPos: 9,
},
}

checkConfig := func(config string) error {
Expand Down