Skip to content

Commit

Permalink
[FIXED] Allow sorting by rtt for connz. (#4157)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>

Resolves #4150
  • Loading branch information
derekcollison committed May 13, 2023
2 parents c31e710 + 421775a commit a982bbc
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 8 deletions.
17 changes: 12 additions & 5 deletions server/monitor.go
Expand Up @@ -137,6 +137,9 @@ type ConnInfo struct {
NameTag string `json:"name_tag,omitempty"`
Tags jwt.TagList `json:"tags,omitempty"`
MQTTClient string `json:"mqtt_client,omitempty"` // This is the MQTT client id

// Internal
rtt int64 // For fast sorting
}

// TLSPeerCert contains basic information about a TLS peer certificate
Expand Down Expand Up @@ -190,9 +193,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {

if opts != nil {
// If no sort option given or sort is by uptime, then sort by cid
if opts.Sort == _EMPTY_ {
sortOpt = ByCid
} else {
if opts.Sort != _EMPTY_ {
sortOpt = opts.Sort
if !sortOpt.IsValid() {
return nil, fmt.Errorf("invalid sorting option: %s", sortOpt)
Expand Down Expand Up @@ -498,6 +499,8 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
sort.Sort(sort.Reverse(byStop{pconns}))
case ByReason:
sort.Sort(byReason{pconns})
case ByRTT:
sort.Sort(sort.Reverse(byRTT{pconns}))
}

minoff := c.Offset
Expand Down Expand Up @@ -527,6 +530,10 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
// Fills in the ConnInfo from the client.
// client should be locked.
func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time, auth bool) {
// For fast sort if required.
rtt := client.getRTT()
ci.rtt = int64(rtt)

ci.Cid = client.cid
ci.MQTTClient = client.getMQTTClientID()
ci.Kind = client.kindString()
Expand All @@ -535,7 +542,7 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time, auth bool)
ci.LastActivity = client.last
ci.Uptime = myUptime(now.Sub(client.start))
ci.Idle = myUptime(now.Sub(client.last))
ci.RTT = client.getRTT().String()
ci.RTT = rtt.String()
ci.OutMsgs = client.outMsgs
ci.OutBytes = client.outBytes
ci.NumSubs = uint32(len(client.subs))
Expand Down Expand Up @@ -584,7 +591,7 @@ func (c *client) getRTT() time.Duration {
if c.rtt == 0 {
// If a real client, go ahead and send ping now to get a value
// for RTT. For tests and telnet, or if client is closing, etc skip.
if c.opts.Lang != "" {
if c.opts.Lang != _EMPTY_ {
c.sendRTTPingLocked()
}
return 0
Expand Down
9 changes: 7 additions & 2 deletions server/monitor_sort_opts.go
Expand Up @@ -45,7 +45,7 @@ const (
ByUptime SortOpt = "uptime" // By the amount of time connections exist
ByStop SortOpt = "stop" // By the stop time for a closed connection
ByReason SortOpt = "reason" // By the reason for a closed connection

ByRTT SortOpt = "rtt" // By the round trip time
)

// Individual sort options provide the Less for sort.Interface. Len and Swap are on cList.
Expand Down Expand Up @@ -139,10 +139,15 @@ func (l byReason) Less(i, j int) bool {
return l.ConnInfos[i].Reason < l.ConnInfos[j].Reason
}

// RTT - Default is descending
type byRTT struct{ ConnInfos }

func (l byRTT) Less(i, j int) bool { return l.ConnInfos[i].rtt < l.ConnInfos[j].rtt }

// IsValid determines if a sort option is valid
func (s SortOpt) IsValid() bool {
switch s {
case "", ByCid, ByStart, BySubs, ByPending, ByOutMsgs, ByInMsgs, ByOutBytes, ByInBytes, ByLast, ByIdle, ByUptime, ByStop, ByReason:
case _EMPTY_, ByCid, ByStart, BySubs, ByPending, ByOutMsgs, ByInMsgs, ByOutBytes, ByInBytes, ByLast, ByIdle, ByUptime, ByStop, ByReason, ByRTT:
return true
default:
return false
Expand Down
50 changes: 49 additions & 1 deletion server/monitor_test.go
@@ -1,4 +1,4 @@
// Copyright 2013-2022 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -4686,3 +4686,51 @@ func TestMonitorConnzOperatorModeFilterByUser(t *testing.T) {
require_True(t, ci.AuthorizedUser == aUser)
}
}

func TestMonitorConnzSortByRTT(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()

for i := 0; i < 10; i++ {
nc, err := nats.Connect(s.ClientURL())
require_NoError(t, err)
defer nc.Close()
}

connz := pollConz(t, s, 1, _EMPTY_, &ConnzOptions{Sort: ByRTT})
require_True(t, connz.NumConns == 10)

var rtt int64
for _, ci := range connz.Conns {
if rtt == 0 {
rtt = ci.rtt
} else {
if ci.rtt > rtt {
t.Fatalf("RTT not in descending order: %v vs %v",
time.Duration(rtt), time.Duration(ci.rtt))
}
rtt = ci.rtt
}
}

// Make sure url works as well.
url := fmt.Sprintf("http://127.0.0.1:%d/connz?sort=rtt", s.MonitorAddr().Port)
connz = pollConz(t, s, 0, url, nil)
require_True(t, connz.NumConns == 10)

rtt = 0
for _, ci := range connz.Conns {
crttd, err := time.ParseDuration(ci.RTT)
require_NoError(t, err)
crtt := int64(crttd)
if rtt == 0 {
rtt = crtt
} else {
if crtt > rtt {
t.Fatalf("RTT not in descending order: %v vs %v",
time.Duration(rtt), time.Duration(crtt))
}
rtt = ci.rtt
}
}
}

0 comments on commit a982bbc

Please sign in to comment.