Skip to content

Commit

Permalink
🌱 Propagate context from controller.Start to handler
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <vincepri@vmware.com>
  • Loading branch information
vincepri committed Aug 5, 2020
1 parent 97cfffd commit 0e459aa
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func (c *Controller) Start(stop <-chan struct{}) error {
c.Queue = c.MakeQueue()
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed

// TODO: Propagate context from the Runnable interface, when we're ready to change the signature.
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

err := func() error {
defer c.mu.Unlock()

Expand Down Expand Up @@ -170,8 +174,12 @@ func (c *Controller) Start(stop <-chan struct{}) error {
// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
// Process work items
go wait.Until(c.worker, c.JitterPeriod, stop)
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
go wait.Until(func() {
for c.processNextWorkItem(ctx) {
}
}, c.JitterPeriod, stop)
}

c.Started = true
Expand All @@ -186,16 +194,9 @@ func (c *Controller) Start(stop <-chan struct{}) error {
return nil
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem() bool {
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
Expand All @@ -210,10 +211,10 @@ func (c *Controller) processNextWorkItem() bool {
// period.
defer c.Queue.Done(obj)

return c.reconcileHandler(obj)
return c.reconcileHandler(ctx, obj)
}

func (c *Controller) reconcileHandler(obj interface{}) bool {
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) bool {
// Update metrics after processing each item
reconcileStartTS := time.Now()
defer func() {
Expand All @@ -233,7 +234,7 @@ func (c *Controller) reconcileHandler(obj interface{}) bool {
}

log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
ctx := logf.IntoContext(context.Background(), log)
ctx = logf.IntoContext(ctx, log)

// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
// resource to be synced.
Expand Down

0 comments on commit 0e459aa

Please sign in to comment.