Skip to content

Commit

Permalink
Add more pprof labels to consumers, sources, mirrors
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 29, 2023
1 parent 720ac60 commit 212d92c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 19 deletions.
28 changes: 24 additions & 4 deletions server/consumer.go
Expand Up @@ -1255,15 +1255,32 @@ func (o *consumer) setLeader(isLeader bool) {
// Snapshot initial info.
o.infoWithSnap(true)

// These are the labels we will use to annotate our goroutines.
labels := pprofLabels{
"type": "consumer",
"account": mset.accName(),
"stream": mset.name(),
"consumer": o.name,
}

// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)
go func() {
setGoRoutineLabels(labels)
o.loopAndGatherMsgs(qch)
}()

// Now start up Go routine to process acks.
go o.processInboundAcks(qch)
go func() {
setGoRoutineLabels(labels)
o.processInboundAcks(qch)
}()

if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)
go func() {
setGoRoutineLabels(labels)
o.processInboundNextMsgReqs(qch)
}()
}

// If we are R>1 spin up our proposal loop.
Expand All @@ -1272,7 +1289,10 @@ func (o *consumer) setLeader(isLeader bool) {
// They must be on server versions >= 2.7.1
o.checkAndSetPendingRequestsOk()
o.checkPendingRequests()
go o.loopAndForwardProposals(qch)
go func() {
setGoRoutineLabels(labels)
o.loopAndForwardProposals(qch)
}()
}

} else {
Expand Down
24 changes: 15 additions & 9 deletions server/server.go
Expand Up @@ -3608,22 +3608,28 @@ func (s *Server) String() string {

type pprofLabels map[string]string

func setGoRoutineLabels(tags ...pprofLabels) {
var labels []string
for _, m := range tags {
for k, v := range m {
labels = append(labels, k, v)
}
}
if len(labels) > 0 {
pprof.SetGoroutineLabels(
pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
)
}
}

func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
var started bool
s.grMu.Lock()
defer s.grMu.Unlock()
if s.grRunning {
var labels []string
for _, m := range tags {
for k, v := range m {
labels = append(labels, k, v)
}
}
s.grWG.Add(1)
go func() {
pprof.SetGoroutineLabels(
pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
)
setGoRoutineLabels(tags...)
f()
}()
started = true
Expand Down
14 changes: 8 additions & 6 deletions server/stream.go
Expand Up @@ -2635,9 +2635,10 @@ func (mset *stream) setupMirrorConsumer() error {
if !mset.srv.startGoRoutine(
func() { mset.processMirrorMsgs(mirror, &ready) },
pprofLabels{
"type": "mirror",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"type": "mirror",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"consumer": mirror.cname,
},
) {
ready.Done()
Expand Down Expand Up @@ -2965,9 +2966,10 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
if !mset.srv.startGoRoutine(
func() { mset.processSourceMsgs(si, &ready) },
pprofLabels{
"type": "source",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"type": "source",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"consumer": si.cname,
},
) {
ready.Done()
Expand Down

0 comments on commit 212d92c

Please sign in to comment.