Skip to content

Commit 919f75b

Browse files
author
Alex Vest
committedOct 4, 2022
Shutdown on leader election loss
1 parent 16079bd commit 919f75b

File tree

5 files changed

+37
-43
lines changed

5 files changed

+37
-43
lines changed
 

‎internal/pkg/cmd/reloader.go

+30-18
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,12 @@ func startReloader(cmd *cobra.Command, args []string) {
148148
logrus.Fatalf("%s", err)
149149
}
150150

151-
// If HA is enabled then we need to run leadership election
152-
if options.EnableHA {
153-
c.SetLeader(false)
154-
}
155-
156151
controllers = append(controllers, c)
157152

153+
// If HA is enabled we only run the controller when
154+
if options.EnableHA {
155+
continue
156+
}
158157
// Now let's start the controller
159158
stop := make(chan struct{})
160159
defer close(stop)
@@ -164,11 +163,17 @@ func startReloader(cmd *cobra.Command, args []string) {
164163

165164
// Run the leadership election
166165
if options.EnableHA {
166+
var stopChannels []chan struct{}
167+
for i := 0; i < len(controllers); i++ {
168+
stop := make(chan struct{})
169+
stopChannels = append(stopChannels, stop)
170+
}
167171
podName, podNamespace := getHAEnvs()
168172
lock := getNewLock(clientset, constants.LockName, podName, podNamespace)
169173
ctx, cancel := context.WithCancel(context.Background())
170174
defer cancel()
171-
runLeaderElection(lock, ctx, podName, controllers)
175+
runLeaderElection(lock, ctx, cancel, podName, controllers, stopChannels)
176+
return
172177
}
173178

174179
select {}
@@ -187,22 +192,23 @@ func getNewLock(clientset *kubernetes.Clientset, lockName, podname, namespace st
187192
}
188193
}
189194

190-
func runLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, id string, controllers []*controller.Controller) {
195+
// runLeaderElection runs leadership election. If an instance of the controller is the leader and stops leading it will shutdown.
196+
func runLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller, stopChannels []chan struct{}) {
191197
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
192198
Lock: lock,
193199
ReleaseOnCancel: true,
194-
// TODO Validate that keys persist in the cache for at least one leadership election cycle
195-
LeaseDuration: 15 * time.Second,
196-
RenewDeadline: 10 * time.Second,
197-
RetryPeriod: 2 * time.Second,
200+
LeaseDuration: 15 * time.Second,
201+
RenewDeadline: 10 * time.Second,
202+
RetryPeriod: 2 * time.Second,
198203
Callbacks: leaderelection.LeaderCallbacks{
199204
OnStartedLeading: func(c context.Context) {
200-
setLeader(controllers, true)
201-
logrus.Info("became leader")
205+
logrus.Info("became leader, starting controllers")
206+
runControllers(controllers, stopChannels)
202207
},
203208
OnStoppedLeading: func() {
204-
setLeader(controllers, false)
205-
logrus.Info("no longer leader")
209+
logrus.Info("no longer leader, shutting down")
210+
stopControllers(stopChannels)
211+
cancel()
206212
},
207213
OnNewLeader: func(current_id string) {
208214
if current_id == id {
@@ -215,10 +221,16 @@ func runLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, id str
215221
})
216222
}
217223

218-
func setLeader(controllers []*controller.Controller, isLeader bool) {
219-
for _, c := range controllers {
224+
func runControllers(controllers []*controller.Controller, stopChannels []chan struct{}) {
225+
for i, c := range controllers {
220226
c := c
221-
c.SetLeader(isLeader)
227+
go c.Run(1, stopChannels[i])
228+
}
229+
}
230+
231+
func stopControllers(stopChannels []chan struct{}) {
232+
for _, c := range stopChannels {
233+
close(c)
222234
}
223235
}
224236

‎internal/pkg/controller/controller.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ type Controller struct {
3030
namespace string
3131
ignoredNamespaces util.List
3232
collectors metrics.Collectors
33-
isLeader bool
3433
}
3534

3635
// controllerInitialized flag determines whether controlled is being initialized
@@ -58,7 +57,7 @@ func NewController(
5857
c.informer = informer
5958
c.queue = queue
6059
c.collectors = collectors
61-
c.isLeader = true
60+
logrus.Infof("created controller for: %s", resource)
6261
return &c, nil
6362
}
6463

@@ -143,7 +142,7 @@ func (c *Controller) processNextItem() bool {
143142
defer c.queue.Done(resourceHandler)
144143

145144
// Invoke the method containing the business logic
146-
err := resourceHandler.(handler.ResourceHandler).Handle(c.isLeader)
145+
err := resourceHandler.(handler.ResourceHandler).Handle()
147146
// Handle the error if something went wrong during the execution of the business logic
148147
c.handleErr(err, resourceHandler)
149148
return true
@@ -174,8 +173,3 @@ func (c *Controller) handleErr(err error, key interface{}) {
174173
runtime.HandleError(err)
175174
logrus.Infof("Dropping the key %q out of the queue: %v", key, err)
176175
}
177-
178-
func (c *Controller) SetLeader(isLeader bool) {
179-
c.isLeader = isLeader
180-
logrus.Info("controller active")
181-
}

‎internal/pkg/handler/create.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package handler
22

33
import (
4-
"fmt"
5-
64
"github.com/sirupsen/logrus"
75
"github.com/stakater/Reloader/internal/pkg/metrics"
86
"github.com/stakater/Reloader/internal/pkg/util"
@@ -16,16 +14,12 @@ type ResourceCreatedHandler struct {
1614
}
1715

1816
// Handle processes the newly created resource
19-
func (r ResourceCreatedHandler) Handle(isLeader bool) error {
17+
func (r ResourceCreatedHandler) Handle() error {
2018
if r.Resource == nil {
2119
logrus.Errorf("Resource creation handler received nil resource")
2220
} else {
2321
config, _ := r.GetConfig()
24-
// process resource based on its type
25-
if isLeader {
26-
return doRollingUpgrade(config, r.Collectors)
27-
}
28-
return fmt.Errorf("instance is not leader, will not perform rolling upgrade on %s %s/%s", config.Type, config.ResourceName, config.Namespace)
22+
return doRollingUpgrade(config, r.Collectors)
2923
}
3024
return nil
3125
}

‎internal/pkg/handler/handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ import (
66

77
// ResourceHandler handles the creation and update of resources
88
type ResourceHandler interface {
9-
Handle(isLeader bool) error
9+
Handle() error
1010
GetConfig() (util.Config, string)
1111
}

‎internal/pkg/handler/update.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package handler
22

33
import (
4-
"fmt"
5-
64
"github.com/sirupsen/logrus"
75
"github.com/stakater/Reloader/internal/pkg/metrics"
86
"github.com/stakater/Reloader/internal/pkg/util"
@@ -17,17 +15,13 @@ type ResourceUpdatedHandler struct {
1715
}
1816

1917
// Handle processes the updated resource
20-
func (r ResourceUpdatedHandler) Handle(isLeader bool) error {
18+
func (r ResourceUpdatedHandler) Handle() error {
2119
if r.Resource == nil || r.OldResource == nil {
2220
logrus.Errorf("Resource update handler received nil resource")
2321
} else {
2422
config, oldSHAData := r.GetConfig()
2523
if config.SHAValue != oldSHAData {
26-
// process resource based on its type
27-
if isLeader {
28-
return doRollingUpgrade(config, r.Collectors)
29-
}
30-
return fmt.Errorf("instance is not leader, will not perform rolling upgrade on %s %s/%s", config.Type, config.ResourceName, config.Namespace)
24+
return doRollingUpgrade(config, r.Collectors)
3125
}
3226
}
3327
return nil

0 commit comments

Comments
 (0)
Please sign in to comment.