Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Introduce pprof server to manager #1943

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"net"
"net/http"
"net/http/pprof"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -106,6 +107,9 @@ type controllerManager struct {
// Healthz probe handler
healthzHandler *healthz.Handler

// pprofListener is used to serve pprof
pprofListener net.Listener

// controllerConfig are the global controller options.
controllerConfig config.Controller

Expand Down Expand Up @@ -343,6 +347,24 @@ func (cm *controllerManager) serveHealthProbes() {
go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener)
}

func (cm *controllerManager) addPprofServer() error {
mux := http.NewServeMux()
srv := httpserver.New(mux)

mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
zqzten marked this conversation as resolved.
Show resolved Hide resolved

return cm.add(&server{
Kind: "pprof",
Log: cm.logger,
Server: srv,
Listener: cm.pprofListener,
})
}

func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) {
log = log.WithValues("kind", kind, "addr", ln.Addr())

Expand Down Expand Up @@ -440,6 +462,13 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
cm.serveHealthProbes()
}

// Add pprof server
if cm.pprofListener != nil {
if err := cm.addPprofServer(); err != nil {
return fmt.Errorf("failed to add pprof server: %w", err)
}
}

// First start any webhook servers, which includes conversion, validation, and defaulting
// webhooks that are registered.
//
Expand Down
33 changes: 33 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ type Options struct {
// Liveness probe endpoint name, defaults to "healthz"
LivenessEndpointName string

// PprofBindAddress is the TCP address that the controller should bind to
// for serving pprof.
// It can be set to "" or "0" to disable the pprof serving.
// Since pprof may contain sensitive information, make sure to protect it
// before exposing it to public.
PprofBindAddress string
zqzten marked this conversation as resolved.
Show resolved Hide resolved

// Port is the port that the webhook server serves at.
// It is used to set webhook.Server.Port if WebhookServer is not set.
Port int
Expand Down Expand Up @@ -308,6 +315,7 @@ type Options struct {
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
newMetricsListener func(addr string) (net.Listener, error)
newHealthProbeListener func(addr string) (net.Listener, error)
newPprofListener func(addr string) (net.Listener, error)
}

// BaseContextFunc is a function used to provide a base Context to Runnables
Expand Down Expand Up @@ -417,6 +425,13 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, err
}

// Create pprof listener. This will throw an error if the bind
// address is invalid or already in use.
pprofListener, err := options.newPprofListener(options.PprofBindAddress)
if err != nil {
return nil, fmt.Errorf("failed to new pprof listener: %w", err)
}

errChan := make(chan error)
runnables := newRunnables(options.BaseContext, errChan)

Expand Down Expand Up @@ -444,6 +459,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
pprofListener: pprofListener,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
Expand Down Expand Up @@ -574,6 +590,19 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) {
return ln, nil
}

// defaultPprofListener creates the default pprof listener bound to the given address.
func defaultPprofListener(addr string) (net.Listener, error) {
if addr == "" || addr == "0" {
return nil, nil
}

ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error listening on %s: %w", addr, err)
}
return ln, nil
}

// defaultBaseContext is used as the BaseContext value in Options if one
// has not already been set.
func defaultBaseContext() context.Context {
Expand Down Expand Up @@ -634,6 +663,10 @@ func setOptionsDefaults(options Options) Options {
options.newHealthProbeListener = defaultHealthProbeListener
}

if options.newPprofListener == nil {
options.newPprofListener = defaultPprofListener
}

if options.GracefulShutdownTimeout == nil {
gracefulShutdownTimeout := defaultGracefulShutdownPeriod
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
Expand Down
97 changes: 97 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ var _ = Describe("manger.Manager", func() {
LeaderElectionID: "test-leader-election-id-2",
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
PprofBindAddress: "0",
})
Expect(err).To(BeNil())

Expand Down Expand Up @@ -193,6 +194,7 @@ var _ = Describe("manger.Manager", func() {
LeaderElectionID: "test-leader-election-id-3",
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
PprofBindAddress: "0",
})
Expect(err).To(BeNil())

Expand Down Expand Up @@ -227,6 +229,7 @@ var _ = Describe("manger.Manager", func() {
},
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
PprofBindAddress: "0",
})
Expect(err).ToNot(HaveOccurred())
Expect(m1).ToNot(BeNil())
Expand All @@ -247,6 +250,7 @@ var _ = Describe("manger.Manager", func() {
},
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
PprofBindAddress: "0",
})
Expect(err).ToNot(HaveOccurred())
Expect(m2).ToNot(BeNil())
Expand Down Expand Up @@ -1280,6 +1284,99 @@ var _ = Describe("manger.Manager", func() {
})
})

Context("should start serving pprof", func() {
var listener net.Listener
var opts Options

BeforeEach(func() {
listener = nil
opts = Options{
newPprofListener: func(addr string) (net.Listener, error) {
var err error
listener, err = defaultPprofListener(addr)
return listener, err
},
}
})

AfterEach(func() {
if listener != nil {
listener.Close()
}
})

It("should stop serving pprof when stop is called", func() {
opts.PprofBindAddress = ":0"
m, err := New(cfg, opts)
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()
<-m.Elected()

// Check the pprof started
endpoint := fmt.Sprintf("http://%s", listener.Addr().String())
_, err = http.Get(endpoint)
Expect(err).NotTo(HaveOccurred())

// Shutdown the server
cancel()

// Expect the pprof server to shutdown
Eventually(func() error {
_, err = http.Get(endpoint)
return err
}, 10*time.Second).ShouldNot(Succeed())
})

It("should serve pprof endpoints", func() {
opts.PprofBindAddress = ":0"
m, err := New(cfg, opts)
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()
<-m.Elected()

pprofIndexEndpoint := fmt.Sprintf("http://%s/debug/pprof/", listener.Addr().String())
resp, err := http.Get(pprofIndexEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))

pprofCmdlineEndpoint := fmt.Sprintf("http://%s/debug/pprof/cmdline", listener.Addr().String())
resp, err = http.Get(pprofCmdlineEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))

pprofProfileEndpoint := fmt.Sprintf("http://%s/debug/pprof/profile", listener.Addr().String())
resp, err = http.Get(pprofProfileEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))

pprofSymbolEndpoint := fmt.Sprintf("http://%s/debug/pprof/symbol", listener.Addr().String())
resp, err = http.Get(pprofSymbolEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))

pprofTraceEndpoint := fmt.Sprintf("http://%s/debug/pprof/trace", listener.Addr().String())
resp, err = http.Get(pprofTraceEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))
})
})

Describe("Add", func() {
It("should immediately start the Component if the Manager has already Started another Component",
func() {
Expand Down
61 changes: 61 additions & 0 deletions pkg/manager/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2022 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package manager

import (
"context"
"errors"
"net"
"net/http"

"github.com/go-logr/logr"
)

// server is a general purpose HTTP server Runnable for a manager
// to serve some internal handlers such as health probes, metrics and profiling.
type server struct {
Kind string
Log logr.Logger
Server *http.Server
Listener net.Listener
}

func (s *server) Start(ctx context.Context) error {
log := s.Log.WithValues("kind", s.Kind, "addr", s.Listener.Addr())

serverShutdown := make(chan struct{})
go func() {
<-ctx.Done()
log.Info("shutting down server")
if err := s.Server.Shutdown(context.Background()); err != nil {
log.Error(err, "error shutting down server")
}
close(serverShutdown)
}()

log.Info("starting server")
if err := s.Server.Serve(s.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}

<-serverShutdown
zqzten marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func (s *server) NeedLeaderElection() bool {
return false
}