Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add profilez server endpoint #3774

Merged
merged 1 commit into from Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/events.go
Expand Up @@ -949,6 +949,10 @@ func (s *Server) initEventTracking() {
optz := &HealthzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.healthz(&optz.HealthzOptions), nil })
},
"PROFILEZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &ProfilezEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.profilez(&optz.ProfilezOptions), nil })
},
}
for name, req := range monSrvc {
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
Expand Down Expand Up @@ -1574,6 +1578,12 @@ type HealthzEventOptions struct {
EventFilterOptions
}

// In the context of system events, ProfilezEventOptions are options passed to Profilez
type ProfilezEventOptions struct {
ProfilezOptions
EventFilterOptions
}

// returns true if the request does NOT apply to this server and can be ignored.
// DO NOT hold the server lock when
func (s *Server) filterRequest(fOpts *EventFilterOptions) bool {
Expand Down
2 changes: 1 addition & 1 deletion server/events_test.go
Expand Up @@ -1661,7 +1661,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 50, sa)
checkExpectedSubs(t, 52, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Expand Down
41 changes: 41 additions & 0 deletions server/monitor.go
Expand Up @@ -14,6 +14,7 @@
package server

import (
"bytes"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
Expand All @@ -26,6 +27,7 @@ import (
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -2639,6 +2641,12 @@ type HealthzOptions struct {
JSServerOnly bool `json:"js-server-only,omitempty"`
}

// ProfilezOptions are options passed to Profilez
type ProfilezOptions struct {
Name string `json:"name"`
Debug int `json:"debug"`
}

type StreamDetail struct {
Name string `json:"name"`
Created time.Time `json:"created"`
Expand Down Expand Up @@ -3123,3 +3131,36 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
// Success.
return health
}

type ProfilezStatus struct {
Profile []byte `json:"profile"`
Error string `json:"error"`
}

func (s *Server) profilez(opts *ProfilezOptions) *ProfilezStatus {
if s.profiler == nil {
return &ProfilezStatus{
Error: "Profiling is not enabled",
}
}
if opts.Name == _EMPTY_ {
return &ProfilezStatus{
Error: "Profile name not specified",
}
}
profile := pprof.Lookup(opts.Name)
if profile == nil {
return &ProfilezStatus{
Error: fmt.Sprintf("Profile %q not found", opts.Name),
}
}
var buffer bytes.Buffer
if err := profile.WriteTo(&buffer, opts.Debug); err != nil {
return &ProfilezStatus{
Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err),
}
}
return &ProfilezStatus{
Profile: buffer.Bytes(),
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
}
}
37 changes: 36 additions & 1 deletion server/monitor_test.go
Expand Up @@ -3925,7 +3925,7 @@ func TestMonitorAccountz(t *testing.T) {
body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?acc=$SYS", s.MonitorAddr().Port, AccountzPath)))
require_Contains(t, body, `"account_detail": {`)
require_Contains(t, body, `"account_name": "$SYS",`)
require_Contains(t, body, `"subscriptions": 44,`)
require_Contains(t, body, `"subscriptions": 46,`)
require_Contains(t, body, `"is_system": true,`)
require_Contains(t, body, `"system_account": "$SYS"`)

Expand Down Expand Up @@ -4585,3 +4585,38 @@ func TestServerIDZRequest(t *testing.T) {
require_True(t, sid.Name == "TEST22")
require_True(t, strings.HasPrefix(sid.ID, "N"))
}

func TestMonitorProfilez(t *testing.T) {
s := RunServer(DefaultOptions())
defer s.Shutdown()

// First of all, check that the profiles aren't accessible
// when profiling hasn't been started in the usual way.
if ps := s.profilez(&ProfilezOptions{
Name: "allocs", Debug: 0,
}); ps.Error == "" {
t.Fatal("Profile should not be accessible when profiling not started")
}

// Then start profiling.
s.StartProfiler()

// Now check that all of the profiles that we expect are
// returning instead of erroring.
for _, try := range []*ProfilezOptions{
{Name: "allocs", Debug: 0},
{Name: "allocs", Debug: 1},
{Name: "block", Debug: 0},
{Name: "goroutine", Debug: 0},
{Name: "goroutine", Debug: 1},
{Name: "goroutine", Debug: 2},
{Name: "heap", Debug: 0},
{Name: "heap", Debug: 1},
{Name: "mutex", Debug: 0},
{Name: "threadcreate", Debug: 0},
} {
if ps := s.profilez(try); ps.Error != _EMPTY_ {
t.Fatalf("Unexpected error on %v: %s", try, ps.Error)
}
}
}