Skip to content

Commit

Permalink
Add prof_tags for annotating CPU and goroutine profiles with accoun…
Browse files Browse the repository at this point in the history
…t/stream/consumer info

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jun 5, 2023
1 parent af318be commit b5ed7d9
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 9 deletions.
28 changes: 24 additions & 4 deletions server/jetstream_cluster.go
Expand Up @@ -833,7 +833,11 @@ func (js *jetStream) setupMetaGroup() error {
atomic.StoreInt32(&js.clustered, 1)
c.registerWithAccount(sacc)

js.srv.startGoRoutine(js.monitorCluster)
js.srv.startGoRoutine(
js.monitorCluster,
"type", "metaleader",
"account", sacc.Name,
)
return nil
}

Expand Down Expand Up @@ -3309,7 +3313,12 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
}
mset.monitorWg.Add(1)
// Start monitoring..
s.startGoRoutine(func() { js.monitorStream(mset, sa, needsNode) })
s.startGoRoutine(
func() { js.monitorStream(mset, sa, needsNode) },
"type", "stream",
"account", mset.accName(),
"stream", mset.name(),
)
} else if numReplicas == 1 && alreadyRunning {
// We downgraded to R1. Make sure we cleanup the raft node and the stream monitor.
mset.removeNode()
Expand Down Expand Up @@ -3532,7 +3541,12 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if mset != nil {
mset.monitorWg.Add(1)
}
s.startGoRoutine(func() { js.monitorStream(mset, sa, false) })
s.startGoRoutine(
func() { js.monitorStream(mset, sa, false) },
"type", "stream",
"account", mset.accName(),
"stream", mset.name(),
)
}
} else {
// Single replica stream, process manually here.
Expand Down Expand Up @@ -4138,7 +4152,13 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Start our monitoring routine if needed.
if !alreadyRunning && !o.isMonitorRunning() {
o.monitorWg.Add(1)
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
s.startGoRoutine(
func() { js.monitorConsumer(o, ca) },
"type", "consumer",
"account", mset.accName(),
"stream", mset.name(),
"consumer", ca.Name,
)
}
// For existing consumer, only send response if not recovering.
if wasExisting && !js.isMetaRecovering() {
Expand Down
3 changes: 3 additions & 0 deletions server/opts.go
Expand Up @@ -303,6 +303,7 @@ type Options struct {
Websocket WebsocketOpts `json:"-"`
MQTT MQTTOpts `json:"-"`
ProfPort int `json:"-"`
ProfTags bool `json:"-"`
PidFile string `json:"-"`
PortsFileDir string `json:"-"`
LogFile string `json:"-"`
Expand Down Expand Up @@ -986,6 +987,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
o.PortsFileDir = v.(string)
case "prof_port":
o.ProfPort = int(v.(int64))
case "prof_tags":
o.ProfTags = v.(bool)
case "max_control_line":
if v.(int64) > 1<<31-1 {
err := &configErr{tk, fmt.Sprintf("%s value is too big", k)}
Expand Down
16 changes: 13 additions & 3 deletions server/server.go
Expand Up @@ -28,6 +28,7 @@ import (
"net"
"net/http"
"regexp"
"runtime/pprof"

// Allow dynamic profiling.
_ "net/http/pprof"
Expand Down Expand Up @@ -190,6 +191,7 @@ type Server struct {
grTmpClients map[uint64]*client
grRunning bool
grWG sync.WaitGroup // to wait on various go routines
grTags bool

cproto int64 // number of clients supporting async INFO
configTime time.Time // last time config was loaded
Expand Down Expand Up @@ -2050,6 +2052,7 @@ func (s *Server) Start() {

s.grMu.Lock()
s.grRunning = true
s.grTags = opts.ProfTags
s.grMu.Unlock()

s.startRateLimitLogExpiration()
Expand Down Expand Up @@ -3501,15 +3504,22 @@ func (s *Server) String() string {
return s.info.Name
}

func (s *Server) startGoRoutine(f func()) bool {
func (s *Server) startGoRoutine(f func(), labels ...string) bool {
var started bool
s.grMu.Lock()
defer s.grMu.Unlock()
if s.grRunning {
s.grWG.Add(1)
go f()
go func(applyLabels bool) {
if applyLabels {
pprof.SetGoroutineLabels(
pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
)
}
f()
}(s.grTags && len(labels) > 0)
started = true
}
s.grMu.Unlock()
return started
}

Expand Down
14 changes: 12 additions & 2 deletions server/stream.go
Expand Up @@ -2354,7 +2354,12 @@ func (mset *stream) setupMirrorConsumer() error {
mirror.qch = make(chan struct{})
mirror.wg.Add(1)
ready.Add(1)
if !mset.srv.startGoRoutine(func() { mset.processMirrorMsgs(mirror, &ready) }) {
if !mset.srv.startGoRoutine(
func() { mset.processMirrorMsgs(mirror, &ready) },
"type", "mirror",
"account", mset.acc.Name,
"stream", mset.cfg.Name,
) {
ready.Done()
}
}
Expand Down Expand Up @@ -2652,7 +2657,12 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
si.qch = make(chan struct{})
si.wg.Add(1)
ready.Add(1)
if !mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si, &ready) }) {
if !mset.srv.startGoRoutine(
func() { mset.processSourceMsgs(si, &ready) },
"type", "source",
"account", mset.acc.Name,
"stream", mset.cfg.Name,
) {
ready.Done()
}
}
Expand Down

0 comments on commit b5ed7d9

Please sign in to comment.