Skip to content

Commit

Permalink
[ADDED] Support for route S2 compression (#4115)
Browse files Browse the repository at this point in the history
The new field `compression` in the `cluster{}` block allows to
specify which compression mode to use between servers.

It can be simply specified as a boolean or a string for the
simple modes, or as an object for the "s2_auto" mode where
a list of RTT thresholds can be specified.

By default, if no compression field is specified, the server
will default to "accept", which means that a server will accept
compression from a remote and switch to that same compression
mode, but will otherwise not initiate compression. That is,
if 2 servers are configured with "accept", then compression
will actually be "off". If one of the server had say s2_fast
then they would both use this mode.

Here is the way to specify compression with a simple string:
```
cluster {
..
  # Possible values are "disabled", "off", "enabled", "on",
  # "accept", "s2_fast", "s2_better", "s2_best" or "s2_auto"
  compression: s2_fast
}
```
If the compression field is simply set to "s2_auto", then
the server will use default RTT thresholds of 10ms, 50ms 
and 100ms for the "uncompressed", "fast", "better"
and "best" modes.

To specify a different list of thresholds for the s2_auto,
here is how it would look like:
```
cluster {
..
  compression: {
    mode: s2_auto
    # This means that for RTT up to 5ms (included), then
    # the compression level will be "uncompressed", then
    # from 5ms+ to 15ms, the mode will switch to "s2_fast",
    # then from 15ms+ to 50ms, the level will switch to
    # "s2_better", and anything above 50ms will result
    # in the "s2_best" compression mode.
    rtt_thresholds: [5ms, 15ms, 50ms]
  }
}
```

If a server has compression mode set (other than "off") but
connects to an older server, there will be no compression between
those 2 routes.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
derekcollison committed Apr 28, 2023
2 parents d573b78 + 349f01e commit 0ba93ce
Show file tree
Hide file tree
Showing 23 changed files with 1,599 additions and 123 deletions.
6 changes: 3 additions & 3 deletions server/accounts.go
Expand Up @@ -1338,19 +1338,19 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool
// FIXME(dlc) - We need to clean these up but this should happen
// already with the auto-expire logic.
if responder != nil && responder.kind != CLIENT {
a.mu.Lock()
si.acc.mu.Lock()
if si.m1 != nil {
m1, m2 := sl, si.m1
m1.merge(m2)
a.mu.Unlock()
si.acc.mu.Unlock()
a.srv.sendInternalAccountMsg(a, si.latency.subject, m1)
a.mu.Lock()
si.rc = nil
a.mu.Unlock()
return true
}
si.m1 = sl
a.mu.Unlock()
si.acc.mu.Unlock()
return false
} else {
a.srv.sendInternalAccountMsg(a, si.latency.subject, sl)
Expand Down
140 changes: 111 additions & 29 deletions server/client.go
Expand Up @@ -32,6 +32,7 @@ import (
"sync/atomic"
"time"

"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
)

Expand Down Expand Up @@ -136,6 +137,7 @@ const (
skipFlushOnClose // Marks that flushOutbound() should not be called on connection close.
expectConnect // Marks if this connection is expected to send a CONNECT
connectProcessFinished // Marks if this connection has finished the connect process.
compressionNegotiated // Marks if this connection has negotiated compression level with remote.
)

// set the flag (would be equivalent to set the boolean to true)
Expand Down Expand Up @@ -301,6 +303,7 @@ type outbound struct {
mp int64 // Snapshot of max pending for client.
lft time.Duration // Last flush time for Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
cw *s2.Writer
}

const nbPoolSizeSmall = 512 // Underlying array size of small buffer
Expand Down Expand Up @@ -408,10 +411,12 @@ const (
type readCacheFlag uint16

const (
hasMappings readCacheFlag = 1 << iota // For account subject mappings.
sysGroup = "_sys_"
hasMappings readCacheFlag = 1 << iota // For account subject mappings.
switchToCompression readCacheFlag = 1 << 1
)

const sysGroup = "_sys_"

// Used in readloop to cache hot subject lookups and group statistics.
type readCache struct {
// These are for clients who are bound to a single account.
Expand Down Expand Up @@ -1270,6 +1275,7 @@ func (c *client) readLoop(pre []byte) {
if ws {
masking = c.ws.maskread
}
checkCompress := c.kind == ROUTER
c.mu.Unlock()

defer func() {
Expand Down Expand Up @@ -1297,6 +1303,8 @@ func (c *client) readLoop(pre []byte) {
wsr.init()
}

var decompress *s2.Reader

for {
var n int
var err error
Expand All @@ -1307,7 +1315,11 @@ func (c *client) readLoop(pre []byte) {
n = len(pre)
pre = nil
} else {
n, err = nc.Read(b)
if decompress != nil {
n, err = decompress.Read(b)
} else {
n, err = nc.Read(b)
}
// If we have any data we will try to parse and exit at the end.
if n == 0 && err != nil {
c.closeConnection(closedStateForErr(err))
Expand Down Expand Up @@ -1374,6 +1386,13 @@ func (c *client) readLoop(pre []byte) {
}
}

// If we are a ROUTER and have processed an INFO, it is possible that
// we are asked to switch to compression now.
if checkCompress && c.in.flags.isSet(switchToCompression) {
c.in.flags.clear(switchToCompression)
decompress = s2.NewReader(nc)
}

// Updates stats for client and server that were collected
// from parsing through the buffer.
if c.in.msgs > 0 {
Expand Down Expand Up @@ -1419,7 +1438,12 @@ func (c *client) readLoop(pre []byte) {
// re-snapshot the account since it can change during reload, etc.
acc = c.acc
// Refresh nc because in some cases, we have upgraded c.nc to TLS.
nc = c.nc
if nc != c.nc {
nc = c.nc
if decompress != nil {
decompress.Reset(nc)
}
}
c.mu.Unlock()

// Connection was closed
Expand Down Expand Up @@ -1506,27 +1530,61 @@ func (c *client) flushOutbound() bool {
// previous write, and in the case of WebSockets, that data may already
// be framed, so we are careful not to re-frame "wnb" here. Instead we
// will just frame up "nb" and append it onto whatever is left on "wnb".
// "nb" will be reset back to its starting position so it can be modified
// safely by queueOutbound calls.
c.out.wnb = append(c.out.wnb, collapsed...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)
c.out.nb = c.out.nb[:0]

// Since WriteTo is lopping things off the beginning, we need to remember
// the start position of the underlying array so that we can get back to it.
// Otherwise we'll always "slide forward" and that will result in reallocs.
startOfWnb := c.out.wnb[0:]
// "nb" will be set to nil so that we can manipulate "collapsed" outside
// of the client's lock, which is interesting in case of compression.
c.out.nb = nil

// In case it goes away after releasing the lock.
nc := c.nc

// Capture this (we change the value in some tests)
wdl := c.out.wdl

// Check for compression
cw := c.out.cw
if cw != nil {
// We will have to adjust once we have compressed, so remove for now.
c.out.pb -= attempted
}

// Do NOT hold lock during actual IO.
c.mu.Unlock()

// Compress outside of the lock
if cw != nil {
var err error
bb := bytes.Buffer{}

cw.Reset(&bb)
for _, buf := range collapsed {
if _, err = cw.Write(buf); err != nil {
break
}
}
if err == nil {
err = cw.Close()
}
if err != nil {
c.Errorf("Error compressing data: %v", err)
c.markConnAsClosed(WriteError)
return false
}
collapsed = append(net.Buffers(nil), bb.Bytes())
attempted = int64(len(collapsed[0]))
}

// This is safe to do outside of the lock since "collapsed" is no longer
// referenced in c.out.nb (which can be modified in queueOutboud() while
// the lock is released).
c.out.wnb = append(c.out.wnb, collapsed...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)

// Since WriteTo is lopping things off the beginning, we need to remember
// the start position of the underlying array so that we can get back to it.
// Otherwise we'll always "slide forward" and that will result in reallocs.
startOfWnb := c.out.wnb[0:]

// flush here
start := time.Now()

Expand All @@ -1543,6 +1601,11 @@ func (c *client) flushOutbound() bool {
// Re-acquire client lock.
c.mu.Lock()

// Adjust if we were compressing.
if cw != nil {
c.out.pb += attempted
}

// At this point, "wnb" has been mutated by WriteTo and any consumed
// buffers have been lopped off the beginning, so in order to return
// them to the pool, we need to look at the difference between "orig"
Expand Down Expand Up @@ -2322,6 +2385,17 @@ func (c *client) processPong() {
c.rtt = computeRTT(c.rttStart)
srv := c.srv
reorderGWs := c.kind == GATEWAY && c.gw.outbound
// For routes, check if we have compression auto and if we should change
// the compression level. However, exclude the route if compression is
// "not supported", which indicates that this is a route to an older server.
if c.kind == ROUTER && c.route.compression != CompressionNotSupported {
if opts := srv.getOpts(); opts.Cluster.Compression.Mode == CompressionS2Auto {
if cm := selectS2AutoModeBasedOnRTT(c.rtt, opts.Cluster.Compression.RTTThresholds); cm != c.route.compression {
c.route.compression = cm
c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
}
}
}
c.mu.Unlock()
if reorderGWs {
srv.gateway.orderOutboundConnections()
Expand Down Expand Up @@ -4629,9 +4703,7 @@ func (c *client) processPingTimer() {
var sendPing bool

pingInterval := c.srv.getOpts().PingInterval
if c.kind == GATEWAY {
pingInterval = adjustPingIntervalForGateway(pingInterval)
}
pingInterval = adjustPingInterval(c.kind, pingInterval)
now := time.Now()
needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL

Expand Down Expand Up @@ -4666,11 +4738,18 @@ func (c *client) processPingTimer() {
c.mu.Unlock()
}

// Returns the smallest value between the given `d` and `gatewayMaxPingInterval` durations.
// Invoked for connections known to be of GATEWAY type.
func adjustPingIntervalForGateway(d time.Duration) time.Duration {
if d > gatewayMaxPingInterval {
return gatewayMaxPingInterval
// Returns the smallest value between the given `d` and some max value
// based on the connection kind.
func adjustPingInterval(kind int, d time.Duration) time.Duration {
switch kind {
case ROUTER:
if d > routeMaxPingInterval {
return routeMaxPingInterval
}
case GATEWAY:
if d > gatewayMaxPingInterval {
return gatewayMaxPingInterval
}
}
return d
}
Expand All @@ -4681,9 +4760,7 @@ func (c *client) setPingTimer() {
return
}
d := c.srv.getOpts().PingInterval
if c.kind == GATEWAY {
d = adjustPingIntervalForGateway(d)
}
d = adjustPingInterval(c.kind, d)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}

Expand Down Expand Up @@ -5549,15 +5626,20 @@ func (c *client) setFirstPingTimer() {
if d > firstPingInterval {
d = firstPingInterval
}
if c.kind == GATEWAY {
d = adjustPingIntervalForGateway(d)
}
d = adjustPingInterval(c.kind, d)
} else if d > firstClientPingInterval {
d = firstClientPingInterval
}
}
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
addDelay := rand.Int63n(int64(d / 5))
d += time.Duration(addDelay)
// In the case of ROUTER and when compression is configured, it is possible
// that this timer was already set, but just to detect a stale connection
// since we have to delay the first PING after compression negotiation
// occurred.
if c.ping.tmr != nil {
c.ping.tmr.Stop()
}
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}
65 changes: 0 additions & 65 deletions server/client_test.go
Expand Up @@ -1543,71 +1543,6 @@ func TestClientOutboundQueueCoalesce(t *testing.T) {
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestClientOutboundQueueMemory(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

var before runtime.MemStats
var after runtime.MemStats

var err error
clients := make([]*nats.Conn, 50000)
wait := &sync.WaitGroup{}
wait.Add(len(clients))

for i := 0; i < len(clients); i++ {
clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer clients[i].Close()

clients[i].Subscribe("test", func(m *nats.Msg) {
wait.Done()
})
}

runtime.GC()
runtime.ReadMemStats(&before)

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

var m [48000]byte
if err = nc.Publish("test", m[:]); err != nil {
t.Fatal(err)
}

wait.Wait()

runtime.GC()
runtime.ReadMemStats(&after)

hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc)
ms := float64(len(m))
diff := float64(ha) - float64(hb)
inc := (diff / float64(hb)) * 100

fmt.Printf("Message size: %.1fKB\n", ms/1024)
fmt.Printf("Subscribed clients: %d\n", len(clients))
fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024)
fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024)
fmt.Printf("Heap allocs delta: %.1f%%\n", inc)

// TODO: What threshold makes sense here for a failure?
/*
if inc > 10 {
t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc)
}
*/
}

func TestClientTraceRace(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
Expand Down

0 comments on commit 0ba93ce

Please sign in to comment.