Skip to content

Commit

Permalink
Add more pprof labels to consumers, sources, mirrors
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 29, 2023
1 parent 720ac60 commit a706bb4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
28 changes: 24 additions & 4 deletions server/consumer.go
Expand Up @@ -1255,15 +1255,32 @@ func (o *consumer) setLeader(isLeader bool) {
// Snapshot initial info.
o.infoWithSnap(true)

// These are the labels we will use to annotate our goroutines.
labels := pprofLabels{
"type": "consumer",
"account": mset.accName(),
"stream": mset.name(),
"consumer": o.name,
}

// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)
s.startGoRoutine(func() {
defer s.grWG.Done()
o.loopAndGatherMsgs(qch)
}, labels)

// Now start up Go routine to process acks.
go o.processInboundAcks(qch)
s.startGoRoutine(func() {
defer s.grWG.Done()
o.processInboundAcks(qch)
}, labels)

if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)
s.startGoRoutine(func() {
defer s.grWG.Done()
o.processInboundNextMsgReqs(qch)
}, labels)
}

// If we are R>1 spin up our proposal loop.
Expand All @@ -1272,7 +1289,10 @@ func (o *consumer) setLeader(isLeader bool) {
// They must be on server versions >= 2.7.1
o.checkAndSetPendingRequestsOk()
o.checkPendingRequests()
go o.loopAndForwardProposals(qch)
s.startGoRoutine(func() {
defer s.grWG.Done()
o.loopAndForwardProposals(qch)
}, labels)
}

} else {
Expand Down
14 changes: 8 additions & 6 deletions server/stream.go
Expand Up @@ -2635,9 +2635,10 @@ func (mset *stream) setupMirrorConsumer() error {
if !mset.srv.startGoRoutine(
func() { mset.processMirrorMsgs(mirror, &ready) },
pprofLabels{
"type": "mirror",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"type": "mirror",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"consumer": mirror.cname,
},
) {
ready.Done()
Expand Down Expand Up @@ -2965,9 +2966,10 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
if !mset.srv.startGoRoutine(
func() { mset.processSourceMsgs(si, &ready) },
pprofLabels{
"type": "source",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"type": "source",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"consumer": si.cname,
},
) {
ready.Done()
Expand Down

0 comments on commit a706bb4

Please sign in to comment.