Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Support for route S2 compression #4115

Merged
merged 6 commits into from Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions server/accounts.go
Expand Up @@ -1338,19 +1338,19 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool
// FIXME(dlc) - We need to clean these up but this should happen
// already with the auto-expire logic.
if responder != nil && responder.kind != CLIENT {
a.mu.Lock()
si.acc.mu.Lock()
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
if si.m1 != nil {
m1, m2 := sl, si.m1
m1.merge(m2)
a.mu.Unlock()
si.acc.mu.Unlock()
a.srv.sendInternalAccountMsg(a, si.latency.subject, m1)
a.mu.Lock()
si.rc = nil
a.mu.Unlock()
return true
}
si.m1 = sl
a.mu.Unlock()
si.acc.mu.Unlock()
return false
} else {
a.srv.sendInternalAccountMsg(a, si.latency.subject, sl)
Expand Down
140 changes: 111 additions & 29 deletions server/client.go
Expand Up @@ -32,6 +32,7 @@ import (
"sync/atomic"
"time"

"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
)

Expand Down Expand Up @@ -136,6 +137,7 @@ const (
skipFlushOnClose // Marks that flushOutbound() should not be called on connection close.
expectConnect // Marks if this connection is expected to send a CONNECT
connectProcessFinished // Marks if this connection has finished the connect process.
compressionNegotiated // Marks if this connection has negotiated compression level with remote.
)

// set the flag (would be equivalent to set the boolean to true)
Expand Down Expand Up @@ -301,6 +303,7 @@ type outbound struct {
mp int64 // Snapshot of max pending for client.
lft time.Duration // Last flush time for Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
cw *s2.Writer
}

const nbPoolSizeSmall = 512 // Underlying array size of small buffer
Expand Down Expand Up @@ -408,10 +411,12 @@ const (
type readCacheFlag uint16

const (
hasMappings readCacheFlag = 1 << iota // For account subject mappings.
sysGroup = "_sys_"
hasMappings readCacheFlag = 1 << iota // For account subject mappings.
switchToCompression readCacheFlag = 1 << 1
)

const sysGroup = "_sys_"

// Used in readloop to cache hot subject lookups and group statistics.
type readCache struct {
// These are for clients who are bound to a single account.
Expand Down Expand Up @@ -1270,6 +1275,7 @@ func (c *client) readLoop(pre []byte) {
if ws {
masking = c.ws.maskread
}
checkCompress := c.kind == ROUTER
c.mu.Unlock()

defer func() {
Expand Down Expand Up @@ -1297,6 +1303,8 @@ func (c *client) readLoop(pre []byte) {
wsr.init()
}

var decompress *s2.Reader

for {
var n int
var err error
Expand All @@ -1307,7 +1315,11 @@ func (c *client) readLoop(pre []byte) {
n = len(pre)
pre = nil
} else {
n, err = nc.Read(b)
if decompress != nil {
n, err = decompress.Read(b)
} else {
n, err = nc.Read(b)
}
// If we have any data we will try to parse and exit at the end.
if n == 0 && err != nil {
c.closeConnection(closedStateForErr(err))
Expand Down Expand Up @@ -1374,6 +1386,13 @@ func (c *client) readLoop(pre []byte) {
}
}

// If we are a ROUTER and have processed an INFO, it is possible that
// we are asked to switch to compression now.
if checkCompress && c.in.flags.isSet(switchToCompression) {
c.in.flags.clear(switchToCompression)
decompress = s2.NewReader(nc)
}

// Updates stats for client and server that were collected
// from parsing through the buffer.
if c.in.msgs > 0 {
Expand Down Expand Up @@ -1419,7 +1438,12 @@ func (c *client) readLoop(pre []byte) {
// re-snapshot the account since it can change during reload, etc.
acc = c.acc
// Refresh nc because in some cases, we have upgraded c.nc to TLS.
nc = c.nc
if nc != c.nc {
nc = c.nc
if decompress != nil {
decompress.Reset(nc)
}
}
c.mu.Unlock()

// Connection was closed
Expand Down Expand Up @@ -1506,27 +1530,61 @@ func (c *client) flushOutbound() bool {
// previous write, and in the case of WebSockets, that data may already
// be framed, so we are careful not to re-frame "wnb" here. Instead we
// will just frame up "nb" and append it onto whatever is left on "wnb".
// "nb" will be reset back to its starting position so it can be modified
// safely by queueOutbound calls.
c.out.wnb = append(c.out.wnb, collapsed...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)
c.out.nb = c.out.nb[:0]

// Since WriteTo is lopping things off the beginning, we need to remember
// the start position of the underlying array so that we can get back to it.
// Otherwise we'll always "slide forward" and that will result in reallocs.
startOfWnb := c.out.wnb[0:]
// "nb" will be set to nil so that we can manipulate "collapsed" outside
// of the client's lock, which is interesting in case of compression.
c.out.nb = nil

// In case it goes away after releasing the lock.
nc := c.nc

// Capture this (we change the value in some tests)
wdl := c.out.wdl

// Check for compression
cw := c.out.cw
if cw != nil {
// We will have to adjust once we have compressed, so remove for now.
c.out.pb -= attempted
}

// Do NOT hold lock during actual IO.
c.mu.Unlock()

// Compress outside of the lock
if cw != nil {
var err error
bb := bytes.Buffer{}

cw.Reset(&bb)
for _, buf := range collapsed {
if _, err = cw.Write(buf); err != nil {
break
}
}
if err == nil {
err = cw.Close()
}
if err != nil {
c.Errorf("Error compressing data: %v", err)
c.markConnAsClosed(WriteError)
return false
}
collapsed = append(net.Buffers(nil), bb.Bytes())
attempted = int64(len(collapsed[0]))
}

// This is safe to do outside of the lock since "collapsed" is no longer
// referenced in c.out.nb (which can be modified in queueOutboud() while
// the lock is released).
c.out.wnb = append(c.out.wnb, collapsed...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)

// Since WriteTo is lopping things off the beginning, we need to remember
// the start position of the underlying array so that we can get back to it.
// Otherwise we'll always "slide forward" and that will result in reallocs.
startOfWnb := c.out.wnb[0:]

// flush here
start := time.Now()

Expand All @@ -1543,6 +1601,11 @@ func (c *client) flushOutbound() bool {
// Re-acquire client lock.
c.mu.Lock()

// Adjust if we were compressing.
if cw != nil {
c.out.pb += attempted
}

// At this point, "wnb" has been mutated by WriteTo and any consumed
// buffers have been lopped off the beginning, so in order to return
// them to the pool, we need to look at the difference between "orig"
Expand Down Expand Up @@ -2322,6 +2385,17 @@ func (c *client) processPong() {
c.rtt = computeRTT(c.rttStart)
srv := c.srv
reorderGWs := c.kind == GATEWAY && c.gw.outbound
// For routes, check if we have compression auto and if we should change
// the compression level. However, exclude the route if compression is
// "not supported", which indicates that this is a route to an older server.
if c.kind == ROUTER && c.route.compression != CompressionNotSupported {
if opts := srv.getOpts(); opts.Cluster.Compression.Mode == CompressionS2Auto {
if cm := selectS2AutoModeBasedOnRTT(c.rtt, opts.Cluster.Compression.RTTThresholds); cm != c.route.compression {
c.route.compression = cm
c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
}
}
}
c.mu.Unlock()
if reorderGWs {
srv.gateway.orderOutboundConnections()
Expand Down Expand Up @@ -4629,9 +4703,7 @@ func (c *client) processPingTimer() {
var sendPing bool

pingInterval := c.srv.getOpts().PingInterval
if c.kind == GATEWAY {
pingInterval = adjustPingIntervalForGateway(pingInterval)
}
pingInterval = adjustPingInterval(c.kind, pingInterval)
now := time.Now()
needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL

Expand Down Expand Up @@ -4666,11 +4738,18 @@ func (c *client) processPingTimer() {
c.mu.Unlock()
}

// Returns the smallest value between the given `d` and `gatewayMaxPingInterval` durations.
// Invoked for connections known to be of GATEWAY type.
func adjustPingIntervalForGateway(d time.Duration) time.Duration {
if d > gatewayMaxPingInterval {
return gatewayMaxPingInterval
// Returns the smallest value between the given `d` and some max value
// based on the connection kind.
func adjustPingInterval(kind int, d time.Duration) time.Duration {
switch kind {
case ROUTER:
if d > routeMaxPingInterval {
return routeMaxPingInterval
}
case GATEWAY:
if d > gatewayMaxPingInterval {
return gatewayMaxPingInterval
}
}
return d
}
Expand All @@ -4681,9 +4760,7 @@ func (c *client) setPingTimer() {
return
}
d := c.srv.getOpts().PingInterval
if c.kind == GATEWAY {
d = adjustPingIntervalForGateway(d)
}
d = adjustPingInterval(c.kind, d)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}

Expand Down Expand Up @@ -5549,15 +5626,20 @@ func (c *client) setFirstPingTimer() {
if d > firstPingInterval {
d = firstPingInterval
}
if c.kind == GATEWAY {
d = adjustPingIntervalForGateway(d)
}
d = adjustPingInterval(c.kind, d)
} else if d > firstClientPingInterval {
d = firstClientPingInterval
}
}
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
addDelay := rand.Int63n(int64(d / 5))
d += time.Duration(addDelay)
// In the case of ROUTER and when compression is configured, it is possible
// that this timer was already set, but just to detect a stale connection
// since we have to delay the first PING after compression negotiation
// occurred.
if c.ping.tmr != nil {
c.ping.tmr.Stop()
}
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}
65 changes: 0 additions & 65 deletions server/client_test.go
Expand Up @@ -1543,71 +1543,6 @@ func TestClientOutboundQueueCoalesce(t *testing.T) {
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestClientOutboundQueueMemory(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

var before runtime.MemStats
var after runtime.MemStats

var err error
clients := make([]*nats.Conn, 50000)
wait := &sync.WaitGroup{}
wait.Add(len(clients))

for i := 0; i < len(clients); i++ {
clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer clients[i].Close()

clients[i].Subscribe("test", func(m *nats.Msg) {
wait.Done()
})
}

runtime.GC()
runtime.ReadMemStats(&before)

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

var m [48000]byte
if err = nc.Publish("test", m[:]); err != nil {
t.Fatal(err)
}

wait.Wait()

runtime.GC()
runtime.ReadMemStats(&after)

hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc)
ms := float64(len(m))
diff := float64(ha) - float64(hb)
inc := (diff / float64(hb)) * 100

fmt.Printf("Message size: %.1fKB\n", ms/1024)
fmt.Printf("Subscribed clients: %d\n", len(clients))
fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024)
fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024)
fmt.Printf("Heap allocs delta: %.1f%%\n", inc)

// TODO: What threshold makes sense here for a failure?
/*
if inc > 10 {
t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc)
}
*/
}

func TestClientTraceRace(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
Expand Down