Skip to content

Commit

Permalink
Fix monitoring server connz idle time sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
mdawar committed Aug 31, 2023
1 parent 2e1392a commit a7d9cf0
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 59 deletions.
2 changes: 1 addition & 1 deletion server/monitor.go
Expand Up @@ -492,7 +492,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
case ByLast:
sort.Sort(sort.Reverse(byLast{pconns}))
case ByIdle:
sort.Sort(sort.Reverse(byIdle{pconns}))
sort.Sort(sort.Reverse(byIdle{pconns, c.Now}))
case ByUptime:
sort.Sort(byUptime{pconns, time.Now()})
case ByStop:
Expand Down
9 changes: 5 additions & 4 deletions server/monitor_sort_opts.go
Expand Up @@ -92,12 +92,13 @@ func (l byLast) Less(i, j int) bool {
}

// Idle time
type byIdle struct{ ConnInfos }
type byIdle struct {
ConnInfos
now time.Time
}

func (l byIdle) Less(i, j int) bool {
ii := l.ConnInfos[i].LastActivity.Sub(l.ConnInfos[i].Start)
ij := l.ConnInfos[j].LastActivity.Sub(l.ConnInfos[j].Start)
return ii < ij
return l.now.Sub(l.ConnInfos[i].LastActivity) < l.now.Sub(l.ConnInfos[j].LastActivity)
}

// Uptime
Expand Down
86 changes: 86 additions & 0 deletions server/monitor_sort_opts_test.go
@@ -0,0 +1,86 @@
package server

import (
"sort"
"testing"
"time"
)

func TestSortByIdleTime(t *testing.T) {
now := time.Now().UTC()

cases := map[string]ConnInfos{
"zero values": {{}, {}, {}, {}},
"equal last activity times": {
{Start: now.Add(-50 * time.Minute), LastActivity: now.Add(-time.Minute)},
{Start: now.Add(-30 * time.Minute), LastActivity: now.Add(-time.Minute)},
{Start: now.Add(-10 * time.Second), LastActivity: now.Add(-time.Minute)},
{Start: now.Add(-2 * time.Hour), LastActivity: now.Add(-time.Minute)},
},
"last activity in the future": {
{Start: now.Add(-50 * time.Minute), LastActivity: now.Add(10 * time.Minute)}, // +10m
{Start: now.Add(-30 * time.Minute), LastActivity: now.Add(5 * time.Minute)}, // +5m
{Start: now.Add(-24 * time.Hour), LastActivity: now.Add(2 * time.Second)}, // +2s
{Start: now.Add(-10 * time.Second), LastActivity: now.Add(15 * time.Minute)}, // +15m
{Start: now.Add(-2 * time.Hour), LastActivity: now.Add(time.Minute)}, // +1m
},
"unsorted": {
{Start: now.Add(-50 * time.Minute), LastActivity: now.Add(-10 * time.Minute)}, // 10m ago
{Start: now.Add(-30 * time.Minute), LastActivity: now.Add(-5 * time.Minute)}, // 5m ago
{Start: now.Add(-24 * time.Hour), LastActivity: now.Add(-2 * time.Second)}, // 2s ago
{Start: now.Add(-10 * time.Second), LastActivity: now.Add(-15 * time.Minute)}, // 15m ago
{Start: now.Add(-2 * time.Hour), LastActivity: now.Add(-time.Minute)}, // 1m ago
},
"unsorted with zero value start time": {
{LastActivity: now.Add(-10 * time.Minute)}, // 10m ago
{LastActivity: now.Add(-5 * time.Minute)}, // 5m ago
{LastActivity: now.Add(-2 * time.Second)}, // 2s ago
{LastActivity: now.Add(-15 * time.Minute)}, // 15m ago
{LastActivity: now.Add(-time.Minute)}, // 1m ago
},
"sorted": {
{Start: now.Add(-24 * time.Hour), LastActivity: now.Add(-2 * time.Second)}, // 2s ago
{Start: now.Add(-2 * time.Hour), LastActivity: now.Add(-time.Minute)}, // 1m ago
{Start: now.Add(-30 * time.Minute), LastActivity: now.Add(-5 * time.Minute)}, // 5m ago
{Start: now.Add(-50 * time.Minute), LastActivity: now.Add(-10 * time.Minute)}, // 10m ago
{Start: now.Add(-10 * time.Second), LastActivity: now.Add(-15 * time.Minute)}, // 15m ago
},
"sorted with zero value start time": {
{LastActivity: now.Add(-2 * time.Second)}, // 2s ago
{LastActivity: now.Add(-time.Minute)}, // 1m ago
{LastActivity: now.Add(-5 * time.Minute)}, // 5m ago
{LastActivity: now.Add(-10 * time.Minute)}, // 10m ago
{LastActivity: now.Add(-15 * time.Minute)}, // 15m ago
},
}

for name, conns := range cases {
t.Run(name, func(t *testing.T) {
sort.Sort(byIdle{conns, now})

idleDurations := getIdleDurations(conns, now)

if !sortedDurationsAsc(idleDurations) {
t.Errorf("want durations sorted in ascending order, got %v", idleDurations)
}
})
}
}

// getIdleDurations returns a slice of idle durations from a connection info list up until now time.
func getIdleDurations(conns ConnInfos, now time.Time) []time.Duration {
durations := make([]time.Duration, 0, len(conns))

for _, conn := range conns {
durations = append(durations, now.Sub(conn.LastActivity))
}

return durations
}

// sortedDurationsAsc checks if a time.Duration slice is sorted in ascending order.
func sortedDurationsAsc(durations []time.Duration) bool {
return sort.SliceIsSorted(durations, func(i, j int) bool {
return durations[i] < durations[j]
})
}
119 changes: 65 additions & 54 deletions server/monitor_test.go
Expand Up @@ -1186,77 +1186,88 @@ func TestConnzSortedByIdle(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()

url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
url := fmt.Sprintf("http://%s/connz?sort=idle", s.MonitorAddr())
now := time.Now()

testIdle := func(mode int) {
firstClient := createClientConnSubscribeAndPublish(t, s)
defer firstClient.Close()
firstClient.Subscribe("client.1", func(m *nats.Msg) {})
firstClient.Flush()

secondClient := createClientConnSubscribeAndPublish(t, s)
defer secondClient.Close()

// Make it such that the second client started 10 secs ago. 10 is important since bug
// was strcmp, e.g. 1s vs 11s
var cid uint64
switch mode {
case 0:
cid = uint64(2)
case 1:
cid = uint64(4)
}
client := s.getClient(cid)
if client == nil {
t.Fatalf("Error looking up client %v\n", 2)
}
clients := []struct {
start time.Time // Client start time.
last time.Time // Client last activity time.
}{
{start: now.Add(-10 * time.Second), last: now.Add(-5 * time.Second)},
{start: now.Add(-20 * time.Second), last: now.Add(-10 * time.Second)},
{start: now.Add(-3 * time.Second), last: now.Add(-2 * time.Second)},
{start: now.Add(-30 * time.Second), last: now.Add(-20 * time.Second)},
}

// We want to make sure that we set start/last after the server has finished
// updating this client's last activity. Doing another Flush() now (even though
// one is done in createClientConnSubscribeAndPublish) ensures that server has
// finished updating the client's last activity, since for that last flush there
// should be no new message/sub/unsub activity.
secondClient.Flush()
testIdle := func(mode int) {
// Connect the specified number of clients.
for _, clientTime := range clients {
clientConn := createClientConnSubscribeAndPublish(t, s)
defer clientConn.Close()

client.mu.Lock()
client.start = client.start.Add(-10 * time.Second)
client.last = client.start
client.mu.Unlock()
cid, err := clientConn.GetClientID()
if err != nil {
t.Fatalf("error getting the client CID: %v", err)
}

// The Idle granularity is a whole second
time.Sleep(time.Second)
firstClient.Publish("client.1", []byte("new message"))
client := s.getClient(cid)
if client == nil {
t.Fatalf("error looking up client %d", cid)
}

c := pollConz(t, s, mode, url+"connz?sort=idle", &ConnzOptions{Sort: ByIdle})
// Make sure we are returned 2 connections...
if len(c.Conns) != 2 {
t.Fatalf("Expected to get two connections, got %v", len(c.Conns))
// Change the client's start and last activity times.
client.mu.Lock()
client.start = clientTime.start
client.last = clientTime.last
client.mu.Unlock()
}

// And that the Idle time is valid (even if equal to "0s")
if c.Conns[0].Idle == "" || c.Conns[1].Idle == "" {
t.Fatal("Expected Idle value to be valid")
}
c := pollConz(t, s, mode, url, &ConnzOptions{Sort: ByIdle})

idle1, err := time.ParseDuration(c.Conns[0].Idle)
if err != nil {
t.Fatalf("Unable to parse duration %v, err=%v", c.Conns[0].Idle, err)
}
idle2, err := time.ParseDuration(c.Conns[1].Idle)
if err != nil {
t.Fatalf("Unable to parse duration %v, err=%v", c.Conns[0].Idle, err)
wantConns := len(clients)
gotConns := len(c.Conns)

if gotConns != wantConns {
t.Fatalf("want %d connections, got %d", wantConns, gotConns)
}

if idle2 < idle1 {
t.Fatalf("Expected conns sorted in descending order by Idle, got %v < %v\n",
idle2, idle1)
idleDurations := getConnsIdleDurations(t, c.Conns)

if !sortedDurationsDesc(idleDurations) {
t.Errorf("want durations sorted in descending order, got %v", idleDurations)
}
}

for mode := 0; mode < 2; mode++ {
testIdle(mode)
}
}

// getConnsIdleDurations returns a slice of parsed idle durations from a connection info slice.
func getConnsIdleDurations(t *testing.T, conns []*ConnInfo) []time.Duration {
t.Helper()

durations := make([]time.Duration, 0, len(conns))

for _, conn := range conns {
idle, err := time.ParseDuration(conn.Idle)
if err != nil {
t.Fatalf("error parsing duration %q: %v", conn.Idle, err)
}
durations = append(durations, idle)
}

return durations
}

// sortedDurationsDesc checks if a time.Duration slice is sorted in descending order.
func sortedDurationsDesc(durations []time.Duration) bool {
return sort.SliceIsSorted(durations, func(i, j int) bool {
// Must be longer than the next duration.
return durations[i] > durations[j]
})
}

func TestConnzSortBadRequest(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
Expand Down

0 comments on commit a7d9cf0

Please sign in to comment.