Skip to content

Commit

Permalink
:bug Controller: Wait for all reconciliations before shutting down
Browse files Browse the repository at this point in the history
Currently, the controller will instantly shutdown and return when its
context gets cancelled, leaving active reconciliations be. This change
makes it wait for those before finishing shutdown.
  • Loading branch information
alvaroaleman committed Mar 14, 2021
1 parent b125a18 commit 8f4937b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 20 deletions.
36 changes: 34 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@ package controller_test
import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"go.uber.org/goleak"
corev1 "k8s.io/api/core/v1"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var _ = Describe("controller.Controller", func() {
Expand Down Expand Up @@ -88,15 +93,42 @@ var _ = Describe("controller.Controller", func() {
It("should not leak goroutines when stopped", func() {
currentGRs := goleak.IgnoreCurrent()

watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
controllerFinished := make(chan struct{})
rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
defer GinkgoRecover()
close(reconcileStarted)
// Make sure reconciliation takes a moment and is not quicker than the controllers
// shutdown.
time.Sleep(50 * time.Millisecond)
// Explicitly test this on top of the leakdetection, as the latter uses Eventually
// so might succeed even when the controller does not wait for all reconciliations
// to finish.
Expect(controllerFinished).NotTo(BeClosed())
return reconcile.Result{}, nil
})

m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).To(Succeed())
close(controllerFinished)
}()

<-reconcileStarted
cancel()
Expect(m.Start(ctx)).To(Succeed())
<-controllerFinished

// force-close keep-alive connections. These'll time anyway (after
// like 30s or so) but force it to speed up the tests.
Expand Down
24 changes: 12 additions & 12 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/go-logr/logr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand Down Expand Up @@ -67,9 +66,6 @@ type Controller struct {
// mu is used to synchronize Controller setup
mu sync.Mutex

// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
JitterPeriod time.Duration

// Started is true if the Controller has been Started
Started bool

Expand Down Expand Up @@ -150,8 +146,12 @@ func (c *Controller) Start(ctx context.Context) error {
c.ctx = ctx

c.Queue = c.MakeQueue()
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
go func() {
<-ctx.Done()
c.Queue.ShutDown()
}()

wg := &sync.WaitGroup{}
err := func() error {
defer c.mu.Unlock()

Expand Down Expand Up @@ -203,19 +203,17 @@ func (c *Controller) Start(ctx context.Context) error {
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil

if c.JitterPeriod == 0 {
c.JitterPeriod = 1 * time.Second
}

// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
go func() {
defer wg.Done()
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}, c.JitterPeriod)
}()
}

c.Started = true
Expand All @@ -226,7 +224,9 @@ func (c *Controller) Start(ctx context.Context) error {
}

<-ctx.Done()
c.Log.Info("Stopping workers")
c.Log.Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait()
c.Log.Info("All workers finished")
return nil
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,6 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request if there is an error and continue processing items", func(done Done) {
// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
ctrl.JitterPeriod = time.Millisecond

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -597,7 +595,6 @@ var _ = Describe("controller", func() {
It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
ctrl.JitterPeriod = time.Millisecond

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -790,9 +787,6 @@ var _ = Describe("controller", func() {
}()
queue.Add(request)

// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
ctrl.JitterPeriod = time.Millisecond

By("Invoking Reconciler which will give an error")
fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
Expect(<-reconciled).To(Equal(request))
Expand Down

0 comments on commit 8f4937b

Please sign in to comment.