@@ -148,13 +148,12 @@ func startReloader(cmd *cobra.Command, args []string) {
148
148
logrus .Fatalf ("%s" , err )
149
149
}
150
150
151
- // If HA is enabled then we need to run leadership election
152
- if options .EnableHA {
153
- c .SetLeader (false )
154
- }
155
-
156
151
controllers = append (controllers , c )
157
152
153
+ // If HA is enabled we only run the controller when
154
+ if options .EnableHA {
155
+ continue
156
+ }
158
157
// Now let's start the controller
159
158
stop := make (chan struct {})
160
159
defer close (stop )
@@ -164,11 +163,17 @@ func startReloader(cmd *cobra.Command, args []string) {
164
163
165
164
// Run the leadership election
166
165
if options .EnableHA {
166
+ var stopChannels []chan struct {}
167
+ for i := 0 ; i < len (controllers ); i ++ {
168
+ stop := make (chan struct {})
169
+ stopChannels = append (stopChannels , stop )
170
+ }
167
171
podName , podNamespace := getHAEnvs ()
168
172
lock := getNewLock (clientset , constants .LockName , podName , podNamespace )
169
173
ctx , cancel := context .WithCancel (context .Background ())
170
174
defer cancel ()
171
- runLeaderElection (lock , ctx , podName , controllers )
175
+ runLeaderElection (lock , ctx , cancel , podName , controllers , stopChannels )
176
+ return
172
177
}
173
178
174
179
select {}
@@ -187,22 +192,23 @@ func getNewLock(clientset *kubernetes.Clientset, lockName, podname, namespace st
187
192
}
188
193
}
189
194
190
- func runLeaderElection (lock * resourcelock.LeaseLock , ctx context.Context , id string , controllers []* controller.Controller ) {
195
+ // runLeaderElection runs leadership election. If an instance of the controller is the leader and stops leading it will shutdown.
196
+ func runLeaderElection (lock * resourcelock.LeaseLock , ctx context.Context , cancel context.CancelFunc , id string , controllers []* controller.Controller , stopChannels []chan struct {}) {
191
197
leaderelection .RunOrDie (ctx , leaderelection.LeaderElectionConfig {
192
198
Lock : lock ,
193
199
ReleaseOnCancel : true ,
194
- // TODO Validate that keys persist in the cache for at least one leadership election cycle
195
- LeaseDuration : 15 * time .Second ,
196
- RenewDeadline : 10 * time .Second ,
197
- RetryPeriod : 2 * time .Second ,
200
+ LeaseDuration : 15 * time .Second ,
201
+ RenewDeadline : 10 * time .Second ,
202
+ RetryPeriod : 2 * time .Second ,
198
203
Callbacks : leaderelection.LeaderCallbacks {
199
204
OnStartedLeading : func (c context.Context ) {
200
- setLeader ( controllers , true )
201
- logrus . Info ( "became leader" )
205
+ logrus . Info ( "became leader, starting controllers" )
206
+ runControllers ( controllers , stopChannels )
202
207
},
203
208
OnStoppedLeading : func () {
204
- setLeader (controllers , false )
205
- logrus .Info ("no longer leader" )
209
+ logrus .Info ("no longer leader, shutting down" )
210
+ stopControllers (stopChannels )
211
+ cancel ()
206
212
},
207
213
OnNewLeader : func (current_id string ) {
208
214
if current_id == id {
@@ -215,10 +221,16 @@ func runLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, id str
215
221
})
216
222
}
217
223
218
- func setLeader (controllers []* controller.Controller , isLeader bool ) {
219
- for _ , c := range controllers {
224
+ func runControllers (controllers []* controller.Controller , stopChannels [] chan struct {} ) {
225
+ for i , c := range controllers {
220
226
c := c
221
- c .SetLeader (isLeader )
227
+ go c .Run (1 , stopChannels [i ])
228
+ }
229
+ }
230
+
231
+ func stopControllers (stopChannels []chan struct {}) {
232
+ for _ , c := range stopChannels {
233
+ close (c )
222
234
}
223
235
}
224
236
0 commit comments