Skip to content

Commit

Permalink
Annotate CPU and goroutine profiles (#4204)
Browse files Browse the repository at this point in the history
This allows the CPU and goroutine profiles to be annotated with
information that allows us to break down load based on accounts, streams
and consumers. We could probably add more labels elsewhere for other
purposes too. It makes it easier to spot whether there are certain
assets that are responsible for heavy CPU usage, i.e. snapshotting
certain stream states.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison committed Jun 20, 2023
2 parents 165c41f + d2615b7 commit eba833e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 9 deletions.
36 changes: 32 additions & 4 deletions server/jetstream_cluster.go
Expand Up @@ -833,7 +833,13 @@ func (js *jetStream) setupMetaGroup() error {
atomic.StoreInt32(&js.clustered, 1)
c.registerWithAccount(sacc)

js.srv.startGoRoutine(js.monitorCluster)
js.srv.startGoRoutine(
js.monitorCluster,
pprofLabels{
"type": "metaleader",
"account": sacc.Name,
},
)
return nil
}

Expand Down Expand Up @@ -3315,7 +3321,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
}
mset.monitorWg.Add(1)
// Start monitoring..
s.startGoRoutine(func() { js.monitorStream(mset, sa, needsNode) })
s.startGoRoutine(
func() { js.monitorStream(mset, sa, needsNode) },
pprofLabels{
"type": "stream",
"account": mset.accName(),
"stream": mset.name(),
},
)
} else if numReplicas == 1 && alreadyRunning {
// We downgraded to R1. Make sure we cleanup the raft node and the stream monitor.
mset.removeNode()
Expand Down Expand Up @@ -3538,7 +3551,14 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if mset != nil {
mset.monitorWg.Add(1)
}
s.startGoRoutine(func() { js.monitorStream(mset, sa, false) })
s.startGoRoutine(
func() { js.monitorStream(mset, sa, false) },
pprofLabels{
"type": "stream",
"account": mset.accName(),
"stream": mset.name(),
},
)
}
} else {
// Single replica stream, process manually here.
Expand Down Expand Up @@ -4150,7 +4170,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Start our monitoring routine if needed.
if !alreadyRunning && !o.isMonitorRunning() {
o.monitorWg.Add(1)
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
s.startGoRoutine(
func() { js.monitorConsumer(o, ca) },
pprofLabels{
"type": "consumer",
"account": mset.accName(),
"stream": mset.name(),
"consumer": ca.Name,
},
)
}
// For existing consumer, only send response if not recovering.
if wasExisting && !js.isMetaRecovering() {
Expand Down
20 changes: 17 additions & 3 deletions server/server.go
Expand Up @@ -28,6 +28,7 @@ import (
"net"
"net/http"
"regexp"
"runtime/pprof"

// Allow dynamic profiling.
_ "net/http/pprof"
Expand Down Expand Up @@ -3501,15 +3502,28 @@ func (s *Server) String() string {
return s.info.Name
}

func (s *Server) startGoRoutine(f func()) bool {
type pprofLabels map[string]string

func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
var started bool
s.grMu.Lock()
defer s.grMu.Unlock()
if s.grRunning {
var labels []string
for _, m := range tags {
for k, v := range m {
labels = append(labels, k, v)
}
}
s.grWG.Add(1)
go f()
go func() {
pprof.SetGoroutineLabels(
pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
)
f()
}()
started = true
}
s.grMu.Unlock()
return started
}

Expand Down
18 changes: 16 additions & 2 deletions server/stream.go
Expand Up @@ -2373,7 +2373,14 @@ func (mset *stream) setupMirrorConsumer() error {
mirror.qch = make(chan struct{})
mirror.wg.Add(1)
ready.Add(1)
if !mset.srv.startGoRoutine(func() { mset.processMirrorMsgs(mirror, &ready) }) {
if !mset.srv.startGoRoutine(
func() { mset.processMirrorMsgs(mirror, &ready) },
pprofLabels{
"type": "mirror",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
},
) {
ready.Done()
}
}
Expand Down Expand Up @@ -2671,7 +2678,14 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
si.qch = make(chan struct{})
si.wg.Add(1)
ready.Add(1)
if !mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si, &ready) }) {
if !mset.srv.startGoRoutine(
func() { mset.processSourceMsgs(si, &ready) },
pprofLabels{
"type": "source",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
},
) {
ready.Done()
}
}
Expand Down

0 comments on commit eba833e

Please sign in to comment.