Skip to content

Commit

Permalink
logging: align to Kubernetes structured logging, add reconcileID
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Büringer buringerst@vmware.com
  • Loading branch information
sbueringer committed Mar 15, 2022
1 parent 9ee63fc commit 598978c
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 33 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -21,6 +21,7 @@ require (
k8s.io/apimachinery v0.23.0
k8s.io/client-go v0.23.0
k8s.io/component-base v0.23.0
k8s.io/klog/v2 v2.30.0
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b
sigs.k8s.io/yaml v1.3.0
)
Expand Down Expand Up @@ -59,7 +60,6 @@ require (
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.0 // indirect
Expand Down
33 changes: 26 additions & 7 deletions pkg/builder/controller.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -148,9 +149,9 @@ func (blder *Builder) WithOptions(options controller.Options) *Builder {
return blder
}

// WithLogger overrides the controller options's logger used.
func (blder *Builder) WithLogger(log logr.Logger) *Builder {
blder.ctrlOptions.Log = log
// WithLogConstructor overrides the controller options's LogConstructor.
func (blder *Builder) WithLogConstructor(logConstructor func(*reconcile.Request) logr.Logger) *Builder {
blder.ctrlOptions.LogConstructor = logConstructor
return blder
}

Expand Down Expand Up @@ -304,13 +305,31 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
}

controllerName := blder.getControllerName(gvk)

// Setup the logger.
if ctrlOptions.Log.GetSink() == nil {
ctrlOptions.Log = blder.mgr.GetLogger()
if ctrlOptions.LogConstructor == nil {
log = blder.mgr.GetLogger().WithValues(
"controller", controllerName,
"controllerGroup", gvk.Group,
"controllerKind", gvk.Kind,
)

lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]

ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
log := log
if req != nil {
log = log.WithValues(
lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name),
"namespace", req.Namespace, "name", req.Name,
)
}
return log
}
}
ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)

// Build the controller and return.
blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
return err
}
8 changes: 5 additions & 3 deletions pkg/builder/controller_test.go
Expand Up @@ -238,10 +238,10 @@ var _ = Describe("application", func() {

logger := &testLogger{}
newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) {
if options.Log.GetSink() == logger {
if options.LogConstructor(nil).GetSink() == logger {
return controller.New(name, mgr, options)
}
return nil, fmt.Errorf("logger expected %T but found %T", logger, options.Log)
return nil, fmt.Errorf("logger expected %T but found %T", logger, options.LogConstructor)
}

By("creating a controller manager")
Expand All @@ -251,7 +251,9 @@ var _ = Describe("application", func() {
instance, err := ControllerManagedBy(m).
For(&appsv1.ReplicaSet{}).
Owns(&appsv1.ReplicaSet{}).
WithLogger(logr.New(logger)).
WithLogConstructor(func(request *reconcile.Request) logr.Logger {
return logr.New(logger)
}).
Build(noop)
Expect(err).NotTo(HaveOccurred())
Expect(instance).NotTo(BeNil())
Expand Down
25 changes: 19 additions & 6 deletions pkg/controller/controller.go
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
Expand All @@ -46,9 +47,9 @@ type Options struct {
// The overall is a token bucket and the per-item is exponential.
RateLimiter ratelimiter.RateLimiter

// Log is the logger used for this controller and passed to each reconciliation
// request via the context field.
Log logr.Logger
// LogConstructor is used to construct a logger used for this controller and passed
// to each reconciliation via the context field.
LogConstructor func(request *reconcile.Request) logr.Logger

// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
// Defaults to 2 minutes if not set.
Expand Down Expand Up @@ -105,8 +106,20 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
return nil, fmt.Errorf("must specify Name for Controller")
}

if options.Log.GetSink() == nil {
options.Log = mgr.GetLogger()
if options.LogConstructor == nil {
log := mgr.GetLogger().WithValues(
"controller", name,
)
options.LogConstructor = func(req *reconcile.Request) logr.Logger {
log := log
if req != nil {
log = log.WithValues(
"object", klog.KRef(req.Namespace, req.Name),
"namespace", req.Namespace, "name", req.Name,
)
}
return log
}
}

if options.MaxConcurrentReconciles <= 0 {
Expand Down Expand Up @@ -136,7 +149,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
Log: options.Log.WithName("controller").WithName(name).WithValues("controller", name),
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
}, nil
}
35 changes: 20 additions & 15 deletions pkg/internal/controller/controller.go
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/go-logr/logr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand Down Expand Up @@ -83,8 +84,11 @@ type Controller struct {
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription

// Log is used to log messages to users during reconciliation, or for example when a watch is started.
Log logr.Logger
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
// outside the context of a reconciliation.
LogConstructor func(request *reconcile.Request) logr.Logger

// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
RecoverPanic bool
Expand All @@ -99,7 +103,6 @@ type watchDescription struct {

// Reconcile implements reconcile.Reconciler.
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
defer func() {
if r := recover(); r != nil {
if c.RecoverPanic {
Expand All @@ -110,11 +113,11 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ re
return
}

log := logf.FromContext(ctx)
log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r))
panic(r)
}
}()
ctx = logf.IntoContext(ctx, log)
return c.Do.Reconcile(ctx, req)
}

Expand Down Expand Up @@ -144,7 +147,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
return nil
}

c.Log.Info("Starting EventSource", "source", src)
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

Expand Down Expand Up @@ -179,15 +182,15 @@ func (c *Controller) Start(ctx context.Context) error {
// caches to sync so that they have a chance to register their intendeded
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}

// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.Log.Info("Starting Controller")
c.LogConstructor(nil).Info("Starting Controller")

for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
Expand All @@ -204,7 +207,7 @@ func (c *Controller) Start(ctx context.Context) error {
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.Log.Error(err, "Could not wait for Cache to sync")
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
return err
}

Expand All @@ -221,7 +224,7 @@ func (c *Controller) Start(ctx context.Context) error {
c.startWatches = nil

// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
Expand All @@ -241,9 +244,9 @@ func (c *Controller) Start(ctx context.Context) error {
}

<-ctx.Done()
c.Log.Info("Shutdown signal received, waiting for all workers to finish")
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait()
c.Log.Info("All workers finished")
c.LogConstructor(nil).Info("All workers finished")
return nil
}

Expand Down Expand Up @@ -295,19 +298,21 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
c.updateMetrics(time.Since(reconcileStartTS))
}()

// Make sure that the the object is a valid request.
// Make sure that the object is a valid request.
req, ok := obj.(reconcile.Request)
if !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.Queue.Forget(obj)
c.Log.Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
// Return true, don't take a break
return
}

log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
log := c.LogConstructor(&req)

log = log.WithValues("reconcileID", uuid.NewUUID())
ctx = logf.IntoContext(ctx, log)

// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
Expand Down Expand Up @@ -340,7 +345,7 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {

// GetLogger returns this controller's logger.
func (c *Controller) GetLogger() logr.Logger {
return c.Log
return c.LogConstructor(nil)
}

// InjectFunc implement SetFields.Injector.
Expand Down
5 changes: 4 additions & 1 deletion pkg/internal/controller/controller_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -70,7 +71,9 @@ var _ = Describe("controller", func() {
MaxConcurrentReconciles: 1,
Do: fakeReconcile,
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
Log: log.RuntimeLog.WithName("controller").WithName("test"),
LogConstructor: func(_ *reconcile.Request) logr.Logger {
return log.RuntimeLog.WithName("controller").WithName("test")
},
}
Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed())
})
Expand Down

0 comments on commit 598978c

Please sign in to comment.