Skip to content

Commit

Permalink
[ADDED] LeafNode: Support for s2 compression (#4167)
Browse files Browse the repository at this point in the history
This is similar to PR #4115 but for LeafNodes.
Compression mode can be set on both side (the accept and in remotes).
```
leafnodes {
   port: 7422
   compression: s2_best
   remotes [
       {
         url: "nats://host2:74222"
         compression: s2_better
       }
   ]
}
```
Possible modes are similar than for routes (described in PR #4115),
except that when not defined we default to `s2_auto`.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed May 16, 2023
2 parents 4220502 + e9e334d commit 06bc0fe
Show file tree
Hide file tree
Showing 17 changed files with 1,870 additions and 204 deletions.
1 change: 1 addition & 0 deletions server/accounts_test.go
Expand Up @@ -3532,6 +3532,7 @@ func TestAccountImportDuplicateResponseDeliveryWithLeafnodes(t *testing.T) {
m.Respond([]byte("bar"))
})
lnc.Flush()
checkSubInterest(t, s, "A", "foo", time.Second)

// Make sure it works, but request only wants one, so need second test to show failure, but
// want to make sure we are wired up correctly.
Expand Down
70 changes: 51 additions & 19 deletions server/client.go
Expand Up @@ -84,6 +84,12 @@ const (
okProto = "+OK" + _CRLF_
)

// TLS Hanshake client types
const (
tlsHandshakeLeaf = "leafnode"
tlsHandshakeMQTT = "mqtt"
)

func init() {
rand.Seed(time.Now().UnixNano())
}
Expand Down Expand Up @@ -1275,7 +1281,7 @@ func (c *client) readLoop(pre []byte) {
if ws {
masking = c.ws.maskread
}
checkCompress := c.kind == ROUTER
checkCompress := c.kind == ROUTER || c.kind == LEAF
c.mu.Unlock()

defer func() {
Expand Down Expand Up @@ -1303,7 +1309,9 @@ func (c *client) readLoop(pre []byte) {
wsr.init()
}

var decompress *s2.Reader
var decompress bool
var reader io.Reader
reader = nc

for {
var n int
Expand All @@ -1315,19 +1323,15 @@ func (c *client) readLoop(pre []byte) {
n = len(pre)
pre = nil
} else {
if decompress != nil {
n, err = decompress.Read(b)
} else {
n, err = nc.Read(b)
}
n, err = reader.Read(b)
// If we have any data we will try to parse and exit at the end.
if n == 0 && err != nil {
c.closeConnection(closedStateForErr(err))
return
}
}
if ws {
bufs, err = c.wsRead(wsr, nc, b[:n])
bufs, err = c.wsRead(wsr, reader, b[:n])
if bufs == nil && err != nil {
if err != io.EOF {
c.Errorf("read error: %v", err)
Expand Down Expand Up @@ -1386,11 +1390,13 @@ func (c *client) readLoop(pre []byte) {
}
}

// If we are a ROUTER and have processed an INFO, it is possible that
// If we are a ROUTER/LEAF and have processed an INFO, it is possible that
// we are asked to switch to compression now.
if checkCompress && c.in.flags.isSet(switchToCompression) {
c.in.flags.clear(switchToCompression)
decompress = s2.NewReader(nc)
// For now we support only s2 compression...
reader = s2.NewReader(nc)
decompress = true
}

// Updates stats for client and server that were collected
Expand Down Expand Up @@ -1440,8 +1446,11 @@ func (c *client) readLoop(pre []byte) {
// Refresh nc because in some cases, we have upgraded c.nc to TLS.
if nc != c.nc {
nc = c.nc
if decompress != nil {
decompress.Reset(nc)
if decompress && nc != nil {
// For now we support only s2 compression...
reader.(*s2.Reader).Reset(nc)
} else if !decompress {
reader = nc
}
}
c.mu.Unlock()
Expand Down Expand Up @@ -1545,6 +1554,9 @@ func (c *client) flushOutbound() bool {
if cw != nil {
// We will have to adjust once we have compressed, so remove for now.
c.out.pb -= attempted
if c.isWebsocket() {
c.ws.fs -= attempted
}
}

// Do NOT hold lock during actual IO.
Expand Down Expand Up @@ -1604,6 +1616,9 @@ func (c *client) flushOutbound() bool {
// Adjust if we were compressing.
if cw != nil {
c.out.pb += attempted
if c.isWebsocket() {
c.ws.fs += attempted
}
}

// At this point, "wnb" has been mutated by WriteTo and any consumed
Expand Down Expand Up @@ -2389,23 +2404,40 @@ func (c *client) processPong() {
c.rtt = computeRTT(c.rttStart)
srv := c.srv
reorderGWs := c.kind == GATEWAY && c.gw.outbound
// If compression is currently active for a route connection, if the
// If compression is currently active for a route/leaf connection, if the
// compression configuration is s2_auto, check if we should change
// the compression level.
if c.kind == ROUTER && needsCompression(c.route.compression) {
if co := &(srv.getOpts().Cluster.Compression); co.Mode == CompressionS2Auto {
if cm := selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds); cm != c.route.compression {
c.route.compression = cm
c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
}
c.updateS2AutoCompressionLevel(&srv.getOpts().Cluster.Compression, &c.route.compression)
} else if c.kind == LEAF && needsCompression(c.leaf.compression) {
var co *CompressionOpts
if r := c.leaf.remote; r != nil {
co = &r.Compression
} else {
co = &srv.getOpts().LeafNode.Compression
}
c.updateS2AutoCompressionLevel(co, &c.leaf.compression)
}
c.mu.Unlock()
if reorderGWs {
srv.gateway.orderOutboundConnections()
}
}

// Select the s2 compression level based on the client's current RTT and the configured
// RTT thresholds slice. If current level is different than selected one, save the
// new compression level string and create a new s2 writer.
// Lock held on entry.
func (c *client) updateS2AutoCompressionLevel(co *CompressionOpts, compression *string) {
if co.Mode != CompressionS2Auto {
return
}
if cm := selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds); cm != *compression {
*compression = cm
c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
}
}

// Will return the parts from the raw wire msg.
func (c *client) msgParts(data []byte) (hdr []byte, msg []byte) {
if c != nil && c.pa.hdr > 0 {
Expand Down Expand Up @@ -5646,7 +5678,7 @@ func (c *client) setFirstPingTimer() {
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
addDelay := rand.Int63n(int64(d / 5))
d += time.Duration(addDelay)
// In the case of ROUTER and when compression is configured, it is possible
// In the case of ROUTER/LEAF and when compression is configured, it is possible
// that this timer was already set, but just to detect a stale connection
// since we have to delay the first PING after compression negotiation
// occurred.
Expand Down
132 changes: 132 additions & 0 deletions server/config_check_test.go
Expand Up @@ -1659,6 +1659,138 @@ func TestConfigCheck(t *testing.T) {
errorLine: 6,
errorPos: 7,
},
{
name: "wrong type for leafnodes compression",
config: `
leafnodes {
port: -1
compression: 123
}
`,
err: fmt.Errorf("field %q should be a boolean or a structure, got int64", "compression"),
errorLine: 4,
errorPos: 6,
},
{
name: "wrong type for leafnodes compression mode",
config: `
leafnodes {
port: -1
compression: {
mode: 123
}
}
`,
err: fmt.Errorf("interface conversion: interface {} is int64, not string"),
errorLine: 5,
errorPos: 7,
},
{
name: "wrong type for leafnodes compression rtt thresholds",
config: `
leafnodes {
port: -1
compression: {
mode: "s2_auto"
rtt_thresholds: 123
}
}
`,
err: fmt.Errorf("interface conversion: interface {} is int64, not []interface {}"),
errorLine: 6,
errorPos: 7,
},
{
name: "invalid durations for leafnodes compression rtt thresholds",
config: `
leafnodes {
port: -1
compression: {
mode: "s2_auto"
rtt_thresholds: [abc]
}
}
`,
err: fmt.Errorf("time: invalid duration %q", "abc"),
errorLine: 6,
errorPos: 7,
},
{
name: "wrong type for remote leafnodes compression",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
compression: 123
}
]
}
`,
err: fmt.Errorf("field %q should be a boolean or a structure, got int64", "compression"),
errorLine: 7,
errorPos: 8,
},
{
name: "wrong type for remote leafnodes compression mode",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
compression: {
mode: 123
}
}
]
}
`,
err: fmt.Errorf("interface conversion: interface {} is int64, not string"),
errorLine: 8,
errorPos: 9,
},
{
name: "wrong type for remote leafnodes compression rtt thresholds",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
compression: {
mode: "s2_auto"
rtt_thresholds: 123
}
}
]
}
`,
err: fmt.Errorf("interface conversion: interface {} is int64, not []interface {}"),
errorLine: 9,
errorPos: 9,
},
{
name: "invalid durations for remote leafnodes compression rtt thresholds",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
compression: {
mode: "s2_auto"
rtt_thresholds: [abc]
}
}
]
}
`,
err: fmt.Errorf("time: invalid duration %q", "abc"),
errorLine: 9,
errorPos: 9,
},
}

checkConfig := func(config string) error {
Expand Down

0 comments on commit 06bc0fe

Please sign in to comment.