Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate CPU and goroutine profiles #4204

Merged
merged 1 commit into from Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 32 additions & 4 deletions server/jetstream_cluster.go
Expand Up @@ -833,7 +833,13 @@ func (js *jetStream) setupMetaGroup() error {
atomic.StoreInt32(&js.clustered, 1)
c.registerWithAccount(sacc)

js.srv.startGoRoutine(js.monitorCluster)
js.srv.startGoRoutine(
js.monitorCluster,
pprofLabels{
"type": "metaleader",
"account": sacc.Name,
},
)
return nil
}

Expand Down Expand Up @@ -3315,7 +3321,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
}
mset.monitorWg.Add(1)
// Start monitoring..
s.startGoRoutine(func() { js.monitorStream(mset, sa, needsNode) })
s.startGoRoutine(
func() { js.monitorStream(mset, sa, needsNode) },
pprofLabels{
"type": "stream",
"account": mset.accName(),
"stream": mset.name(),
},
)
} else if numReplicas == 1 && alreadyRunning {
// We downgraded to R1. Make sure we cleanup the raft node and the stream monitor.
mset.removeNode()
Expand Down Expand Up @@ -3538,7 +3551,14 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if mset != nil {
mset.monitorWg.Add(1)
}
s.startGoRoutine(func() { js.monitorStream(mset, sa, false) })
s.startGoRoutine(
func() { js.monitorStream(mset, sa, false) },
pprofLabels{
"type": "stream",
"account": mset.accName(),
"stream": mset.name(),
},
)
}
} else {
// Single replica stream, process manually here.
Expand Down Expand Up @@ -4150,7 +4170,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Start our monitoring routine if needed.
if !alreadyRunning && !o.isMonitorRunning() {
o.monitorWg.Add(1)
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
s.startGoRoutine(
func() { js.monitorConsumer(o, ca) },
pprofLabels{
"type": "consumer",
"account": mset.accName(),
"stream": mset.name(),
"consumer": ca.Name,
},
)
}
// For existing consumer, only send response if not recovering.
if wasExisting && !js.isMetaRecovering() {
Expand Down
20 changes: 17 additions & 3 deletions server/server.go
Expand Up @@ -28,6 +28,7 @@ import (
"net"
"net/http"
"regexp"
"runtime/pprof"

// Allow dynamic profiling.
_ "net/http/pprof"
Expand Down Expand Up @@ -3501,15 +3502,28 @@ func (s *Server) String() string {
return s.info.Name
}

func (s *Server) startGoRoutine(f func()) bool {
type pprofLabels map[string]string

func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
var started bool
s.grMu.Lock()
defer s.grMu.Unlock()
if s.grRunning {
var labels []string
for _, m := range tags {
for k, v := range m {
labels = append(labels, k, v)
}
}
s.grWG.Add(1)
go f()
go func() {
pprof.SetGoroutineLabels(
pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
)
f()
}()
started = true
}
s.grMu.Unlock()
return started
}

Expand Down
18 changes: 16 additions & 2 deletions server/stream.go
Expand Up @@ -2373,7 +2373,14 @@ func (mset *stream) setupMirrorConsumer() error {
mirror.qch = make(chan struct{})
mirror.wg.Add(1)
ready.Add(1)
if !mset.srv.startGoRoutine(func() { mset.processMirrorMsgs(mirror, &ready) }) {
if !mset.srv.startGoRoutine(
func() { mset.processMirrorMsgs(mirror, &ready) },
pprofLabels{
"type": "mirror",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
},
) {
ready.Done()
}
}
Expand Down Expand Up @@ -2671,7 +2678,14 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
si.qch = make(chan struct{})
si.wg.Add(1)
ready.Add(1)
if !mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si, &ready) }) {
if !mset.srv.startGoRoutine(
func() { mset.processSourceMsgs(si, &ready) },
pprofLabels{
"type": "source",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
},
) {
ready.Done()
}
}
Expand Down