Skip to content

Commit

Permalink
Annotate CPU and goroutine profiles with account/stream/consumer info
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jun 20, 2023
1 parent 165c41f commit d2615b7
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 9 deletions.
36 changes: 32 additions & 4 deletions server/jetstream_cluster.go
Expand Up @@ -833,7 +833,13 @@ func (js *jetStream) setupMetaGroup() error {
atomic.StoreInt32(&js.clustered, 1)
c.registerWithAccount(sacc)

js.srv.startGoRoutine(js.monitorCluster)
js.srv.startGoRoutine(
js.monitorCluster,
pprofLabels{
"type": "metaleader",
"account": sacc.Name,
},
)
return nil
}

Expand Down Expand Up @@ -3315,7 +3321,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
}
mset.monitorWg.Add(1)
// Start monitoring..
s.startGoRoutine(func() { js.monitorStream(mset, sa, needsNode) })
s.startGoRoutine(
func() { js.monitorStream(mset, sa, needsNode) },
pprofLabels{
"type": "stream",
"account": mset.accName(),
"stream": mset.name(),
},
)
} else if numReplicas == 1 && alreadyRunning {
// We downgraded to R1. Make sure we cleanup the raft node and the stream monitor.
mset.removeNode()
Expand Down Expand Up @@ -3538,7 +3551,14 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if mset != nil {
mset.monitorWg.Add(1)
}
s.startGoRoutine(func() { js.monitorStream(mset, sa, false) })
s.startGoRoutine(
func() { js.monitorStream(mset, sa, false) },
pprofLabels{
"type": "stream",
"account": mset.accName(),
"stream": mset.name(),
},
)
}
} else {
// Single replica stream, process manually here.
Expand Down Expand Up @@ -4150,7 +4170,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Start our monitoring routine if needed.
if !alreadyRunning && !o.isMonitorRunning() {
o.monitorWg.Add(1)
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
s.startGoRoutine(
func() { js.monitorConsumer(o, ca) },
pprofLabels{
"type": "consumer",
"account": mset.accName(),
"stream": mset.name(),
"consumer": ca.Name,
},
)
}
// For existing consumer, only send response if not recovering.
if wasExisting && !js.isMetaRecovering() {
Expand Down
20 changes: 17 additions & 3 deletions server/server.go
Expand Up @@ -28,6 +28,7 @@ import (
"net"
"net/http"
"regexp"
"runtime/pprof"

// Allow dynamic profiling.
_ "net/http/pprof"
Expand Down Expand Up @@ -3501,15 +3502,28 @@ func (s *Server) String() string {
return s.info.Name
}

func (s *Server) startGoRoutine(f func()) bool {
type pprofLabels map[string]string

func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
var started bool
s.grMu.Lock()
defer s.grMu.Unlock()
if s.grRunning {
var labels []string
for _, m := range tags {
for k, v := range m {
labels = append(labels, k, v)
}
}
s.grWG.Add(1)
go f()
go func() {
pprof.SetGoroutineLabels(
pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
)
f()
}()
started = true
}
s.grMu.Unlock()
return started
}

Expand Down
18 changes: 16 additions & 2 deletions server/stream.go
Expand Up @@ -2373,7 +2373,14 @@ func (mset *stream) setupMirrorConsumer() error {
mirror.qch = make(chan struct{})
mirror.wg.Add(1)
ready.Add(1)
if !mset.srv.startGoRoutine(func() { mset.processMirrorMsgs(mirror, &ready) }) {
if !mset.srv.startGoRoutine(
func() { mset.processMirrorMsgs(mirror, &ready) },
pprofLabels{
"type": "mirror",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
},
) {
ready.Done()
}
}
Expand Down Expand Up @@ -2671,7 +2678,14 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
si.qch = make(chan struct{})
si.wg.Add(1)
ready.Add(1)
if !mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si, &ready) }) {
if !mset.srv.startGoRoutine(
func() { mset.processSourceMsgs(si, &ready) },
pprofLabels{
"type": "source",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
},
) {
ready.Done()
}
}
Expand Down

0 comments on commit d2615b7

Please sign in to comment.