Skip to content

Commit 11ae057

Browse files
author
Alex Vest
committedOct 4, 2022
Add tests for leadership election
Pull liveness into leadership to ease testing, logically the liveness probe is directly affected by leadership so it makes sense here. Moved some of the components of the controller tests into the testutil package for reuse in my own tests.
1 parent d34c99b commit 11ae057

File tree

6 files changed

+263
-24
lines changed

6 files changed

+263
-24
lines changed
 

‎internal/pkg/cmd/reloader.go

+3-19
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"net/http"
87
"os"
98
"strings"
109

@@ -21,11 +20,6 @@ import (
2120
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2221
)
2322

24-
var (
25-
// Used for liveness probe
26-
healthy bool = true
27-
)
28-
2923
// NewReloaderCommand starts the reloader controller
3024
func NewReloaderCommand() *cobra.Command {
3125
cmd := &cobra.Command{
@@ -167,14 +161,13 @@ func startReloader(cmd *cobra.Command, args []string) {
167161
// Run leadership election
168162
if options.EnableHA {
169163
podName, podNamespace := getHAEnvs()
170-
lock := leadership.GetNewLock(clientset, constants.LockName, podName, podNamespace)
164+
lock := leadership.GetNewLock(clientset.CoordinationV1(), constants.LockName, podName, podNamespace)
171165
ctx, cancel := context.WithCancel(context.Background())
172166
defer cancel()
173-
leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers, healthy)
167+
leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers)
174168
}
175169

176-
http.HandleFunc("/live", healthz)
177-
logrus.Fatal(http.ListenAndServe(":8080", nil))
170+
logrus.Fatal(leadership.Healthz())
178171
}
179172

180173
func getIgnoredNamespacesList(cmd *cobra.Command) (util.List, error) {
@@ -209,12 +202,3 @@ func getIgnoredResourcesList(cmd *cobra.Command) (util.List, error) {
209202

210203
return ignoredResourcesList, nil
211204
}
212-
213-
func healthz(w http.ResponseWriter, req *http.Request) {
214-
if healthy {
215-
w.WriteHeader(http.StatusOK)
216-
return
217-
}
218-
219-
w.WriteHeader(http.StatusInternalServerError)
220-
}

‎internal/pkg/handler/create.go

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func (r ResourceCreatedHandler) Handle() error {
1919
logrus.Errorf("Resource creation handler received nil resource")
2020
} else {
2121
config, _ := r.GetConfig()
22+
// process resource based on its type
2223
return doRollingUpgrade(config, r.Collectors)
2324
}
2425
return nil

‎internal/pkg/handler/update.go

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func (r ResourceUpdatedHandler) Handle() error {
2121
} else {
2222
config, oldSHAData := r.GetConfig()
2323
if config.SHAValue != oldSHAData {
24+
// process resource based on its type
2425
return doRollingUpgrade(config, r.Collectors)
2526
}
2627
}

‎internal/pkg/leadership/leadership.go

+31-5
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,40 @@ package leadership
22

33
import (
44
"context"
5+
"net/http"
56
"time"
67

78
"github.com/sirupsen/logrus"
89
"github.com/stakater/Reloader/internal/pkg/controller"
910
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10-
"k8s.io/client-go/kubernetes"
1111
"k8s.io/client-go/tools/leaderelection"
1212
"k8s.io/client-go/tools/leaderelection/resourcelock"
13+
14+
coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
1315
)
1416

15-
func GetNewLock(clientset *kubernetes.Clientset, lockName, podname, namespace string) *resourcelock.LeaseLock {
17+
const healthPort string = ":9091"
18+
19+
var (
20+
// Used for liveness probe
21+
healthy bool = true
22+
)
23+
24+
func GetNewLock(client coordinationv1.CoordinationV1Interface, lockName, podname, namespace string) *resourcelock.LeaseLock {
1625
return &resourcelock.LeaseLock{
1726
LeaseMeta: v1.ObjectMeta{
1827
Name: lockName,
1928
Namespace: namespace,
2029
},
21-
Client: clientset.CoordinationV1(),
30+
Client: client,
2231
LockConfig: resourcelock.ResourceLockConfig{
2332
Identity: podname,
2433
},
2534
}
2635
}
2736

2837
// runLeaderElection runs leadership election. If an instance of the controller is the leader and stops leading it will shutdown.
29-
func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller, health bool) {
38+
func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller) {
3039
// Construct channels for the controllers to use
3140
var stopChannels []chan struct{}
3241
for i := 0; i < len(controllers); i++ {
@@ -49,7 +58,7 @@ func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel
4958
logrus.Info("no longer leader, shutting down")
5059
stopControllers(stopChannels)
5160
cancel()
52-
health = false
61+
healthy = false
5362
},
5463
OnNewLeader: func(current_id string) {
5564
if current_id == id {
@@ -74,3 +83,20 @@ func stopControllers(stopChannels []chan struct{}) {
7483
close(c)
7584
}
7685
}
86+
87+
// Healthz serves the liveness probe endpoint. If leadership election is
88+
// enabled and a replica stops leading the liveness probe will fail and the
89+
// kubelet will restart the container.
90+
func Healthz() error {
91+
http.HandleFunc("/live", healthz)
92+
return http.ListenAndServe(healthPort, nil)
93+
}
94+
95+
func healthz(w http.ResponseWriter, req *http.Request) {
96+
if healthy {
97+
w.Write([]byte("alive"))
98+
return
99+
}
100+
101+
w.WriteHeader(http.StatusInternalServerError)
102+
}
+213
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
package leadership
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"net/http/httptest"
8+
"os"
9+
"testing"
10+
"time"
11+
12+
"github.com/sirupsen/logrus"
13+
"github.com/stakater/Reloader/internal/pkg/constants"
14+
"github.com/stakater/Reloader/internal/pkg/controller"
15+
"github.com/stakater/Reloader/internal/pkg/handler"
16+
"github.com/stakater/Reloader/internal/pkg/metrics"
17+
"github.com/stakater/Reloader/internal/pkg/options"
18+
"github.com/stakater/Reloader/internal/pkg/testutil"
19+
"github.com/stakater/Reloader/internal/pkg/util"
20+
"github.com/stakater/Reloader/pkg/kube"
21+
)
22+
23+
func TestMain(m *testing.M) {
24+
25+
testutil.CreateNamespace(testutil.Namespace, testutil.Clients.KubernetesClient)
26+
27+
logrus.Infof("Running Testcases")
28+
retCode := m.Run()
29+
30+
testutil.DeleteNamespace(testutil.Namespace, testutil.Clients.KubernetesClient)
31+
32+
os.Exit(retCode)
33+
}
34+
35+
func TestHealthz(t *testing.T) {
36+
request, err := http.NewRequest(http.MethodGet, "/live", nil)
37+
if err != nil {
38+
t.Fatalf(("failed to create request"))
39+
}
40+
41+
response := httptest.NewRecorder()
42+
43+
healthz(response, request)
44+
got := response.Code
45+
want := 200
46+
47+
if got != want {
48+
t.Fatalf("got: %q, want: %q", got, want)
49+
}
50+
51+
// Have the liveness probe serve a 500
52+
healthy = false
53+
54+
request, err = http.NewRequest(http.MethodGet, "/live", nil)
55+
if err != nil {
56+
t.Fatalf(("failed to create request"))
57+
}
58+
59+
response = httptest.NewRecorder()
60+
61+
healthz(response, request)
62+
got = response.Code
63+
want = 500
64+
65+
if got != want {
66+
t.Fatalf("got: %q, want: %q", got, want)
67+
}
68+
}
69+
70+
// TestRunLeaderElection validates that the liveness endpoint serves 500 when
71+
// leadership election fails
72+
func TestRunLeaderElection(t *testing.T) {
73+
ctx, cancel := context.WithCancel(context.TODO())
74+
75+
lock := GetNewLock(testutil.Clients.KubernetesClient.CoordinationV1(), constants.LockName, testutil.Pod, testutil.Namespace)
76+
77+
go RunLeaderElection(lock, ctx, cancel, testutil.Pod, []*controller.Controller{})
78+
79+
// Liveness probe should be serving OK
80+
request, err := http.NewRequest(http.MethodGet, "/live", nil)
81+
if err != nil {
82+
t.Fatalf(("failed to create request"))
83+
}
84+
85+
response := httptest.NewRecorder()
86+
87+
healthz(response, request)
88+
got := response.Code
89+
want := 500
90+
91+
if got != want {
92+
t.Fatalf("got: %q, want: %q", got, want)
93+
}
94+
95+
// Cancel the leader election context, so leadership is released and
96+
// live endpoint serves 500
97+
cancel()
98+
99+
request, err = http.NewRequest(http.MethodGet, "/live", nil)
100+
if err != nil {
101+
t.Fatalf(("failed to create request"))
102+
}
103+
104+
response = httptest.NewRecorder()
105+
106+
healthz(response, request)
107+
got = response.Code
108+
want = 500
109+
110+
if got != want {
111+
t.Fatalf("got: %q, want: %q", got, want)
112+
}
113+
}
114+
115+
// TestRunLeaderElectionWithControllers tests that leadership election works
116+
// wiht real controllers and that on context cancellation the controllers stop
117+
// running.
118+
func TestRunLeaderElectionWithControllers(t *testing.T) {
119+
t.Logf("Creating controller")
120+
var controllers []*controller.Controller
121+
for k := range kube.ResourceMap {
122+
c, err := controller.NewController(testutil.Clients.KubernetesClient, k, testutil.Namespace, []string{}, metrics.NewCollectors())
123+
if err != nil {
124+
logrus.Fatalf("%s", err)
125+
}
126+
127+
controllers = append(controllers, c)
128+
}
129+
time.Sleep(3 * time.Second)
130+
131+
lock := GetNewLock(testutil.Clients.KubernetesClient.CoordinationV1(), fmt.Sprintf("%s-%d", constants.LockName, 1), testutil.Pod, testutil.Namespace)
132+
133+
ctx, cancel := context.WithCancel(context.TODO())
134+
135+
// Start running leadership election, this also starts the controllers
136+
go RunLeaderElection(lock, ctx, cancel, testutil.Pod, controllers)
137+
time.Sleep(3 * time.Second)
138+
139+
// Create some stuff and do a thing
140+
configmapName := testutil.ConfigmapNamePrefix + "-update-" + testutil.RandSeq(5)
141+
configmapClient, err := testutil.CreateConfigMap(testutil.Clients.KubernetesClient, testutil.Namespace, configmapName, "www.google.com")
142+
if err != nil {
143+
t.Fatalf("Error while creating the configmap %v", err)
144+
}
145+
146+
// Creating deployment
147+
_, err = testutil.CreateDeployment(testutil.Clients.KubernetesClient, configmapName, testutil.Namespace, true)
148+
if err != nil {
149+
t.Fatalf("Error in deployment creation: %v", err)
150+
}
151+
152+
// Updating configmap for first time
153+
updateErr := testutil.UpdateConfigMap(configmapClient, testutil.Namespace, configmapName, "", "www.stakater.com")
154+
if updateErr != nil {
155+
t.Fatalf("Configmap was not updated")
156+
}
157+
time.Sleep(3 * time.Second)
158+
159+
// Verifying deployment update
160+
logrus.Infof("Verifying pod envvars has been created")
161+
shaData := testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, testutil.Namespace, configmapName, "www.stakater.com")
162+
config := util.Config{
163+
Namespace: testutil.Namespace,
164+
ResourceName: configmapName,
165+
SHAValue: shaData,
166+
Annotation: options.ConfigmapUpdateOnChangeAnnotation,
167+
}
168+
deploymentFuncs := handler.GetDeploymentRollingUpgradeFuncs()
169+
updated := testutil.VerifyResourceEnvVarUpdate(testutil.Clients, config, constants.ConfigmapEnvVarPostfix, deploymentFuncs)
170+
if !updated {
171+
t.Fatalf("Deployment was not updated")
172+
}
173+
time.Sleep(testutil.SleepDuration)
174+
175+
// Cancel the leader election context, so leadership is released
176+
logrus.Info("shutting down controller from test")
177+
cancel()
178+
time.Sleep(5 * time.Second)
179+
180+
// Updating configmap again
181+
updateErr = testutil.UpdateConfigMap(configmapClient, testutil.Namespace, configmapName, "", "www.stakater.com/new")
182+
if updateErr != nil {
183+
t.Fatalf("Configmap was not updated")
184+
}
185+
186+
// Verifying that the deployment was not updated as leadership has been lost
187+
logrus.Infof("Verifying pod envvars has not been updated")
188+
shaData = testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, testutil.Namespace, configmapName, "www.stakater.com/new")
189+
config = util.Config{
190+
Namespace: testutil.Namespace,
191+
ResourceName: configmapName,
192+
SHAValue: shaData,
193+
Annotation: options.ConfigmapUpdateOnChangeAnnotation,
194+
}
195+
deploymentFuncs = handler.GetDeploymentRollingUpgradeFuncs()
196+
updated = testutil.VerifyResourceEnvVarUpdate(testutil.Clients, config, constants.ConfigmapEnvVarPostfix, deploymentFuncs)
197+
if updated {
198+
t.Fatalf("Deployment was updated")
199+
}
200+
201+
// Deleting deployment
202+
err = testutil.DeleteDeployment(testutil.Clients.KubernetesClient, testutil.Namespace, configmapName)
203+
if err != nil {
204+
logrus.Errorf("Error while deleting the deployment %v", err)
205+
}
206+
207+
// Deleting configmap
208+
err = testutil.DeleteConfigMap(testutil.Clients.KubernetesClient, testutil.Namespace, configmapName)
209+
if err != nil {
210+
logrus.Errorf("Error while deleting the configmap %v", err)
211+
}
212+
time.Sleep(testutil.SleepDuration)
213+
}

‎internal/pkg/testutil/kube.go

+14
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/stakater/Reloader/internal/pkg/callbacks"
1717
"github.com/stakater/Reloader/internal/pkg/constants"
1818
"github.com/stakater/Reloader/internal/pkg/crypto"
19+
"github.com/stakater/Reloader/internal/pkg/metrics"
1920
"github.com/stakater/Reloader/internal/pkg/options"
2021
"github.com/stakater/Reloader/internal/pkg/util"
2122
"github.com/stakater/Reloader/pkg/kube"
@@ -34,6 +35,19 @@ var (
3435
SecretResourceType = "secrets"
3536
)
3637

38+
var (
39+
Clients = kube.GetClients()
40+
Pod = "test-reloader-" + RandSeq(5)
41+
Namespace = "test-reloader-" + RandSeq(5)
42+
ConfigmapNamePrefix = "testconfigmap-reloader"
43+
SecretNamePrefix = "testsecret-reloader"
44+
Data = "dGVzdFNlY3JldEVuY29kaW5nRm9yUmVsb2FkZXI="
45+
NewData = "dGVzdE5ld1NlY3JldEVuY29kaW5nRm9yUmVsb2FkZXI="
46+
UpdatedData = "dGVzdFVwZGF0ZWRTZWNyZXRFbmNvZGluZ0ZvclJlbG9hZGVy"
47+
Collectors = metrics.NewCollectors()
48+
SleepDuration = 3 * time.Second
49+
)
50+
3751
// CreateNamespace creates namespace for testing
3852
func CreateNamespace(namespace string, client kubernetes.Interface) {
3953
_, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})

0 commit comments

Comments
 (0)
Please sign in to comment.