Skip to content

Commit

Permalink
Track slow consumers per connection type
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed Jul 24, 2023
1 parent 8ee2e2f commit 956abd6
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 4 deletions.
15 changes: 14 additions & 1 deletion server/client.go
Expand Up @@ -1722,8 +1722,18 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
return true
}

// Slow consumer here..
// Aggregate slow consumers.
atomic.AddInt64(&c.srv.slowConsumers, 1)
switch c.kind {
case CLIENT:
c.srv.scStats.clients.Add(1)
case ROUTER:
c.srv.scStats.routes.Add(1)
case GATEWAY:
c.srv.scStats.gateways.Add(1)
case LEAF:
c.srv.scStats.leafs.Add(1)
}
if c.acc != nil {
atomic.AddInt64(&c.acc.slowConsumers, 1)
}
Expand Down Expand Up @@ -2223,7 +2233,10 @@ func (c *client) queueOutbound(data []byte) {
// Perf wise, it looks like it is faster to optimistically add than
// checking current pb+len(data) and then add to pb.
c.out.pb -= int64(len(data))

// Increment the total and client's slow consumer counters.
atomic.AddInt64(&c.srv.slowConsumers, 1)
c.srv.scStats.clients.Add(1)
if c.acc != nil {
atomic.AddInt64(&c.acc.slowConsumers, 1)
}
Expand Down
10 changes: 10 additions & 0 deletions server/client_test.go
Expand Up @@ -2027,8 +2027,18 @@ func TestClientSlowConsumerWithoutConnect(t *testing.T) {
if n := atomic.LoadInt64(&s.slowConsumers); n != 1 {
return fmt.Errorf("Expected 1 slow consumer, got: %v", n)
}
if n := s.scStats.clients.Load(); n != 1 {
return fmt.Errorf("Expected 1 slow consumer, got: %v", n)
}
return nil
})
varz, err := s.Varz(nil)
if err != nil {
t.Fatal(err)
}
if varz.SlowConsumersStats.Clients != 1 {
t.Error("Expected a slow consumer client in varz")
}
}

func TestClientNoSlowConsumerIfConnectExpected(t *testing.T) {
Expand Down
8 changes: 5 additions & 3 deletions server/events.go
Expand Up @@ -2002,7 +2002,7 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
a.mu.Unlock()
}

// Lock shoulc be held on entry
// Lock should be held on entry.
func (a *Account) statz() *AccountStat {
localConns := a.numLocalConnections()
leafConns := a.numLocalLeafNodes()
Expand All @@ -2014,10 +2014,12 @@ func (a *Account) statz() *AccountStat {
NumSubs: a.sl.Count(),
Received: DataStats{
Msgs: atomic.LoadInt64(&a.inMsgs),
Bytes: atomic.LoadInt64(&a.inBytes)},
Bytes: atomic.LoadInt64(&a.inBytes),
},
Sent: DataStats{
Msgs: atomic.LoadInt64(&a.outMsgs),
Bytes: atomic.LoadInt64(&a.outBytes)},
Bytes: atomic.LoadInt64(&a.outBytes),
},
SlowConsumers: atomic.LoadInt64(&a.slowConsumers),
}
}
Expand Down
35 changes: 35 additions & 0 deletions server/gateway_test.go
Expand Up @@ -6871,3 +6871,38 @@ func TestGatewaySwitchToInterestOnlyModeImmediately(t *testing.T) {
natsFlush(t, nc)
checkCount(t, gwcb, 1)
}

func TestGatewaySlowConsumer(t *testing.T) {
gatewayMaxPingInterval = 50 * time.Millisecond
defer func() { gatewayMaxPingInterval = gwMaxPingInterval }()

ob := testDefaultOptionsForGateway("B")
sb := RunServer(ob)
defer sb.Shutdown()

oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
sa := RunServer(oa)
defer sa.Shutdown()

waitForInboundGateways(t, sa, 1, 2*time.Second)
waitForOutboundGateways(t, sa, 1, 2*time.Second)
waitForInboundGateways(t, sb, 1, 2*time.Second)
waitForOutboundGateways(t, sb, 1, 2*time.Second)

c := sa.getOutboundGatewayConnection("B")
c.mu.Lock()
c.out.wdl = time.Nanosecond
c.mu.Unlock()

<-time.After(250 * time.Millisecond)
got := sa.NumSlowConsumersGateways()
expected := uint64(1)
if got != 1 {
t.Errorf("got: %d, expected: %d", got, expected)
}
got = sb.NumSlowConsumersGateways()
expected = 0
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
}
45 changes: 45 additions & 0 deletions server/leafnode_test.go
Expand Up @@ -7024,3 +7024,48 @@ func TestLeafNodeSameLocalAccountToMultipleHubs(t *testing.T) {
natsPub(t, nch2, "C", []byte("msgC2"))
checkNoMsg(subc)
}

func TestLeafNodeSlowConsumer(t *testing.T) {
ao := DefaultOptions()
ao.LeafNode.Host = "127.0.0.1"
ao.LeafNode.Port = -1
a := RunServer(ao)
defer a.Shutdown()

c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", ao.LeafNode.Port))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
// Only leafnode slow consumers that made it past connect are tracked
// in the slow consumers counter.
if _, err := c.Write([]byte("CONNECT {}\r\n")); err != nil {
t.Fatalf("Error writing connect: %v", err)
}
if _, err := c.Write([]byte("PING\r\n")); err != nil {
t.Fatalf("Unexpected error writing PING: %v", err)
}
defer c.Close()
// Read info
br := bufio.NewReader(c)
br.ReadLine()
a.mu.Lock()

checkFor(t, time.Second, 15*time.Millisecond, func() error {
a.grMu.Lock()
defer a.grMu.Unlock()
for _, cli := range a.grTmpClients {
cli.out.wdl = time.Nanosecond
return nil
}
return nil
})
a.mu.Unlock()
<-time.After(250 * time.Millisecond)
var (
got = a.NumSlowConsumersLeafs()
expected uint64 = 1
)
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
}
15 changes: 15 additions & 0 deletions server/monitor.go
Expand Up @@ -1212,6 +1212,7 @@ type Varz struct {
TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"`
SystemAccount string `json:"system_account,omitempty"`
PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"`
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"`
}

// JetStreamVarz contains basic runtime information about jetstream
Expand Down Expand Up @@ -1316,6 +1317,14 @@ type WebsocketOptsVarz struct {
// Currently, there are no options defined.
type VarzOptions struct{}

// SlowConsumersStats contains information about the slow consumers from different type of connections.
type SlowConsumersStats struct {
Clients uint64 `json:"clients"`
Routes uint64 `json:"routes"`
Gateways uint64 `json:"gateways"`
Leafs uint64 `json:"leafs"`
}

func myUptime(d time.Duration) string {
// Just use total seconds for uptime, and display days / years
tsecs := d / time.Second
Expand Down Expand Up @@ -1661,6 +1670,12 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64
v.OutMsgs = atomic.LoadInt64(&s.outMsgs)
v.OutBytes = atomic.LoadInt64(&s.outBytes)
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
v.SlowConsumersStats = &SlowConsumersStats{
Clients: s.NumSlowConsumersClients(),
Routes: s.NumSlowConsumersRoutes(),
Gateways: s.NumSlowConsumersGateways(),
Leafs: s.NumSlowConsumersLeafs(),
}
v.PinnedAccountFail = atomic.LoadUint64(&s.pinnedAccFail)

// Make sure to reset in case we are re-using.
Expand Down
25 changes: 25 additions & 0 deletions server/routes_test.go
Expand Up @@ -3800,6 +3800,31 @@ func TestRouteNoLeakOnSlowConsumer(t *testing.T) {
}
return nil
})
var got, expected int64
got = s1.NumSlowConsumers()
expected = 1
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
got = int64(s1.NumSlowConsumersRoutes())
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
got = int64(s1.NumSlowConsumersClients())
expected = 0
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
varz, err := s1.Varz(nil)
if err != nil {
t.Fatal(err)
}
if varz.SlowConsumersStats.Clients != 0 {
t.Error("Expected no slow consumer clients")
}
if varz.SlowConsumersStats.Routes != 1 {
t.Error("Expected a slow consumer route")
}
}

func TestRouteNoLeakOnAuthTimeout(t *testing.T) {
Expand Down
29 changes: 29 additions & 0 deletions server/server.go
Expand Up @@ -122,6 +122,7 @@ type Server struct {
// How often user logon fails due to the issuer account not being pinned.
pinnedAccFail uint64
stats
scStats
mu sync.RWMutex
kp nkeys.KeyPair
xkp nkeys.KeyPair
Expand Down Expand Up @@ -332,6 +333,14 @@ type stats struct {
slowConsumers int64
}

// scStats includes the total and per connection counters of Slow Consumers.
type scStats struct {
clients atomic.Uint64
routes atomic.Uint64
leafs atomic.Uint64
gateways atomic.Uint64
}

// This is used by tests so we can run all server tests with a default route
// or leafnode compression mode. For instance:
// go test -race -v ./server -cluster_compression=fast
Expand Down Expand Up @@ -3387,6 +3396,26 @@ func (s *Server) NumSlowConsumers() int64 {
return atomic.LoadInt64(&s.slowConsumers)
}

// NumSlowConsumersClients will report the number of slow consumers clients.
func (s *Server) NumSlowConsumersClients() uint64 {
return s.scStats.clients.Load()
}

// NumSlowConsumersRoutes will report the number of slow consumers routes.
func (s *Server) NumSlowConsumersRoutes() uint64 {
return s.scStats.routes.Load()
}

// NumSlowConsumersGateways will report the number of slow consumers leafs.
func (s *Server) NumSlowConsumersGateways() uint64 {
return s.scStats.gateways.Load()
}

// NumSlowConsumersLeafs will report the number of slow consumers leafs.
func (s *Server) NumSlowConsumersLeafs() uint64 {
return s.scStats.leafs.Load()
}

// ConfigTime will report the last time the server configuration was loaded.
func (s *Server) ConfigTime() time.Time {
s.mu.RLock()
Expand Down

0 comments on commit 956abd6

Please sign in to comment.