Skip to content

Commit

Permalink
export manager.Server to provide an official HTTP server manager runn…
Browse files Browse the repository at this point in the history
…able

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
  • Loading branch information
joelanford committed Apr 3, 2024
1 parent 5615941 commit 84f4ebd
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 25 deletions.
19 changes: 9 additions & 10 deletions pkg/manager/internal.go
Expand Up @@ -284,9 +284,8 @@ func (cm *controllerManager) addHealthProbeServer() error {
mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
}

return cm.add(&server{
Kind: "health probe",
Log: cm.logger,
return cm.add(&Server{
Name: "health probe",
Server: srv,
Listener: cm.healthProbeListener,
})
Expand All @@ -302,9 +301,8 @@ func (cm *controllerManager) addPprofServer() error {
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

return cm.add(&server{
Kind: "pprof",
Log: cm.logger,
return cm.add(&Server{
Name: "pprof",
Server: srv,
Listener: cm.pprofListener,
})
Expand Down Expand Up @@ -384,11 +382,12 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
}
}

// First start any internal HTTP servers, which includes health probes, metrics and profiling if enabled.
// First start any HTTP servers, which includes health probes and profiling, if enabled.
//
// WARNING: Internal HTTP servers MUST start before any cache is populated, otherwise it would block
// conversion webhooks to be ready for serving which make the cache never get ready.
if err := cm.runnables.HTTPServers.Start(cm.internalCtx); err != nil {
// WARNING: HTTPServers includes the health probes, which MUST start before any cache is populated, otherwise
// it would block conversion webhooks to be ready for serving which make the cache never get ready.
logCtx := logr.NewContext(cm.internalCtx, cm.logger)
if err := cm.runnables.HTTPServers.Start(logCtx); err != nil {
if err != nil {
return fmt.Errorf("failed to start HTTP servers: %w", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/manager/runnable_group.go
Expand Up @@ -54,7 +54,10 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
// The runnables added after Start are started directly.
func (r *runnables) Add(fn Runnable) error {
switch runnable := fn.(type) {
case *server:
case *Server:
if runnable.NeedLeaderElection() {
return r.LeaderElection.Add(fn, nil)
}
return r.HTTPServers.Add(fn, nil)
case hasCache:
return r.Caches.Add(fn, func(ctx context.Context) bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/runnable_group_test.go
Expand Up @@ -10,6 +10,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
Expand All @@ -22,7 +23,7 @@ var _ = Describe("runnables", func() {
})

It("should add HTTP servers to the appropriate group", func() {
server := &server{}
server := &Server{}
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(server)).To(Succeed())
Expect(r.HTTPServers.startQueue).To(HaveLen(1))
Expand Down
74 changes: 61 additions & 13 deletions pkg/manager/server.go
Expand Up @@ -21,41 +21,89 @@ import (
"errors"
"net"
"net/http"
"time"

"github.com/go-logr/logr"
crlog "sigs.k8s.io/controller-runtime/pkg/log"
)

// server is a general purpose HTTP server Runnable for a manager
// to serve some internal handlers such as health probes, metrics and profiling.
type server struct {
Kind string
Log logr.Logger
Server *http.Server
var (
_ Runnable = (*Server)(nil)
_ LeaderElectionRunnable = (*Server)(nil)
)

// Server is a general purpose HTTP server Runnable for a manager.
// It is used to serve some internal handlers for health probes and profiling,
// but it can also be used to run custom servers.
type Server struct {
// Name is an optional string that describes the purpose of the server. It is used in logs to distinguish
// among multiple servers.
Name string

// Server is the HTTP server to run. It is required.
Server *http.Server

// Listener is an optional listener to use. If not set, the server start a listener using the server.Addr.
// Using a listener is useful when the port reservation needs to happen in advance of this runnable starting.
Listener net.Listener

// OnlyServeWhenLeader is an optional bool that indicates that the server should only be started when the manager is the leader.
OnlyServeWhenLeader bool

// ShutdownTimeout is an optional duration that indicates how long to wait for the server to shutdown gracefully. If not set,
// the server will wait indefinitely for all connections to close.
ShutdownTimeout *time.Duration
}

func (s *server) Start(ctx context.Context) error {
log := s.Log.WithValues("kind", s.Kind, "addr", s.Listener.Addr())
// Start starts the server. It will block until the server is stopped or an error occurs.
func (s *Server) Start(ctx context.Context) error {
log := crlog.FromContext(ctx)
if s.Name != "" {
log = log.WithValues("name", s.Name)
}
log = log.WithValues("addr", s.addr())

serverShutdown := make(chan struct{})
go func() {
<-ctx.Done()
log.Info("shutting down server")
if err := s.Server.Shutdown(context.Background()); err != nil {

shutdownCtx := context.Background()
if s.ShutdownTimeout != nil {
var shutdownCancel context.CancelFunc
shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), *s.ShutdownTimeout)
defer shutdownCancel()
}

if err := s.Server.Shutdown(shutdownCtx); err != nil {
log.Error(err, "error shutting down server")
}
close(serverShutdown)
}()

log.Info("starting server")
if err := s.Server.Serve(s.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
if err := s.serve(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}

<-serverShutdown
return nil
}

func (s *server) NeedLeaderElection() bool {
return false
// NeedLeaderElection returns true if the server should only be started when the manager is the leader.
func (s *Server) NeedLeaderElection() bool {
return s.OnlyServeWhenLeader
}

func (s *Server) addr() string {
if s.Listener != nil {
return s.Listener.Addr().String()
}
return s.Server.Addr
}

func (s *Server) serve() error {
if s.Listener != nil {
return s.Server.Serve(s.Listener)
}
return s.Server.ListenAndServe()
}

0 comments on commit 84f4ebd

Please sign in to comment.