Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠ logging: align to Kubernetes structured logging, add reconcileID #1827

Merged
merged 1 commit into from Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -21,6 +21,7 @@ require (
k8s.io/apimachinery v0.23.0
k8s.io/client-go v0.23.0
k8s.io/component-base v0.23.0
k8s.io/klog/v2 v2.30.0
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b
sigs.k8s.io/yaml v1.3.0
)
Expand Down Expand Up @@ -59,7 +60,6 @@ require (
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.0 // indirect
Expand Down
33 changes: 26 additions & 7 deletions pkg/builder/controller.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -148,9 +149,9 @@ func (blder *Builder) WithOptions(options controller.Options) *Builder {
return blder
}

// WithLogger overrides the controller options's logger used.
func (blder *Builder) WithLogger(log logr.Logger) *Builder {
blder.ctrlOptions.Log = log
// WithLogConstructor overrides the controller options's LogConstructor.
func (blder *Builder) WithLogConstructor(logConstructor func(*reconcile.Request) logr.Logger) *Builder {
blder.ctrlOptions.LogConstructor = logConstructor
return blder
}

Expand Down Expand Up @@ -304,13 +305,31 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
}

controllerName := blder.getControllerName(gvk)

// Setup the logger.
if ctrlOptions.Log.GetSink() == nil {
ctrlOptions.Log = blder.mgr.GetLogger()
if ctrlOptions.LogConstructor == nil {
log = blder.mgr.GetLogger().WithValues(
"controller", controllerName,
"controllerGroup", gvk.Group,
"controllerKind", gvk.Kind,
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
)

lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]

ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
log := log
if req != nil {
log = log.WithValues(
lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name),
"namespace", req.Namespace, "name", req.Name,
)
}
return log
}
}
ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)

// Build the controller and return.
blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
return err
}
8 changes: 5 additions & 3 deletions pkg/builder/controller_test.go
Expand Up @@ -238,10 +238,10 @@ var _ = Describe("application", func() {

logger := &testLogger{}
newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) {
if options.Log.GetSink() == logger {
if options.LogConstructor(nil).GetSink() == logger {
return controller.New(name, mgr, options)
}
return nil, fmt.Errorf("logger expected %T but found %T", logger, options.Log)
return nil, fmt.Errorf("logger expected %T but found %T", logger, options.LogConstructor)
}

By("creating a controller manager")
Expand All @@ -251,7 +251,9 @@ var _ = Describe("application", func() {
instance, err := ControllerManagedBy(m).
For(&appsv1.ReplicaSet{}).
Owns(&appsv1.ReplicaSet{}).
WithLogger(logr.New(logger)).
WithLogConstructor(func(request *reconcile.Request) logr.Logger {
return logr.New(logger)
}).
Build(noop)
Expect(err).NotTo(HaveOccurred())
Expect(instance).NotTo(BeNil())
Expand Down
25 changes: 19 additions & 6 deletions pkg/controller/controller.go
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
Expand All @@ -46,9 +47,9 @@ type Options struct {
// The overall is a token bucket and the per-item is exponential.
RateLimiter ratelimiter.RateLimiter

// Log is the logger used for this controller and passed to each reconciliation
// request via the context field.
Log logr.Logger
// LogConstructor is used to construct a logger used for this controller and passed
// to each reconciliation via the context field.
LogConstructor func(request *reconcile.Request) logr.Logger

// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
// Defaults to 2 minutes if not set.
Expand Down Expand Up @@ -105,8 +106,20 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
return nil, fmt.Errorf("must specify Name for Controller")
}

if options.Log.GetSink() == nil {
options.Log = mgr.GetLogger()
if options.LogConstructor == nil {
log := mgr.GetLogger().WithValues(
"controller", name,
)
options.LogConstructor = func(req *reconcile.Request) logr.Logger {
log := log
if req != nil {
log = log.WithValues(
"object", klog.KRef(req.Namespace, req.Name),
"namespace", req.Namespace, "name", req.Name,
)
}
return log
}
}

if options.MaxConcurrentReconciles <= 0 {
Expand Down Expand Up @@ -136,7 +149,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
Log: options.Log.WithName("controller").WithName(name).WithValues("controller", name),
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
}, nil
}
35 changes: 20 additions & 15 deletions pkg/internal/controller/controller.go
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/go-logr/logr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand Down Expand Up @@ -83,8 +84,11 @@ type Controller struct {
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription

// Log is used to log messages to users during reconciliation, or for example when a watch is started.
Log logr.Logger
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
// outside the context of a reconciliation.
LogConstructor func(request *reconcile.Request) logr.Logger
sbueringer marked this conversation as resolved.
Show resolved Hide resolved

// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
RecoverPanic bool
Expand All @@ -99,7 +103,6 @@ type watchDescription struct {

// Reconcile implements reconcile.Reconciler.
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
if r := recover(); r != nil {
if c.RecoverPanic {
Expand All @@ -110,11 +113,11 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ re
return
}

log := logf.FromContext(ctx)
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r))
panic(r)
}
}()
ctx = logf.IntoContext(ctx, log)
return c.Do.Reconcile(ctx, req)
}

Expand Down Expand Up @@ -144,7 +147,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
return nil
}

c.Log.Info("Starting EventSource", "source", src)
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

Expand Down Expand Up @@ -179,15 +182,15 @@ func (c *Controller) Start(ctx context.Context) error {
// caches to sync so that they have a chance to register their intendeded
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}

// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.Log.Info("Starting Controller")
c.LogConstructor(nil).Info("Starting Controller")

for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
Expand All @@ -204,7 +207,7 @@ func (c *Controller) Start(ctx context.Context) error {
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.Log.Error(err, "Could not wait for Cache to sync")
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
return err
}

Expand All @@ -221,7 +224,7 @@ func (c *Controller) Start(ctx context.Context) error {
c.startWatches = nil

// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
Expand All @@ -241,9 +244,9 @@ func (c *Controller) Start(ctx context.Context) error {
}

<-ctx.Done()
c.Log.Info("Shutdown signal received, waiting for all workers to finish")
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait()
c.Log.Info("All workers finished")
c.LogConstructor(nil).Info("All workers finished")
return nil
}

Expand Down Expand Up @@ -295,19 +298,21 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
c.updateMetrics(time.Since(reconcileStartTS))
}()

// Make sure that the the object is a valid request.
// Make sure that the object is a valid request.
req, ok := obj.(reconcile.Request)
if !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.Queue.Forget(obj)
c.Log.Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
// Return true, don't take a break
return
}

log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
log := c.LogConstructor(&req)

log = log.WithValues("reconcileID", uuid.NewUUID())
ctx = logf.IntoContext(ctx, log)

// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
Expand Down Expand Up @@ -340,7 +345,7 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {

// GetLogger returns this controller's logger.
func (c *Controller) GetLogger() logr.Logger {
return c.Log
return c.LogConstructor(nil)
}

// InjectFunc implement SetFields.Injector.
Expand Down
5 changes: 4 additions & 1 deletion pkg/internal/controller/controller_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -70,7 +71,9 @@ var _ = Describe("controller", func() {
MaxConcurrentReconciles: 1,
Do: fakeReconcile,
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
Log: log.RuntimeLog.WithName("controller").WithName("test"),
LogConstructor: func(_ *reconcile.Request) logr.Logger {
return log.RuntimeLog.WithName("controller").WithName("test")
},
}
Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed())
})
Expand Down