-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller_utils.go
182 lines (152 loc) · 8.15 KB
/
controller_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/*
Copyright 2020 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
"time"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util/exec"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// OperatorConfig represents the configuration of the operator
type OperatorConfig struct {
OperatorNamespace string
Image string
ServiceAccount string
NamespaceToWatch string
Parameters map[string]string
}
const (
// OperatorSettingConfigMapName refers to ConfigMap that configures rook ceph operator
OperatorSettingConfigMapName string = "rook-ceph-operator-config"
// UninitializedCephConfigError refers to the error message printed by the Ceph CLI when there is no ceph configuration file
// This typically is raised when the operator has not finished initializing
UninitializedCephConfigError = "error calling conf_read_file"
// OperatorNotInitializedMessage is the message we print when the Operator is not ready to reconcile, typically the ceph.conf has not been generated yet
OperatorNotInitializedMessage = "skipping reconcile since operator is still initializing"
)
var (
// ImmediateRetryResult Return this for a immediate retry of the reconciliation loop with the same request object.
ImmediateRetryResult = reconcile.Result{Requeue: true}
// ImmediateRetryResultNoBackoff Return this for a immediate retry of the reconciliation loop with the same request object.
// Override the exponential backoff behavior by setting the RequeueAfter time explicitly.
ImmediateRetryResultNoBackoff = reconcile.Result{Requeue: true, RequeueAfter: time.Second}
// WaitForRequeueIfCephClusterNotReady waits for the CephCluster to be ready
WaitForRequeueIfCephClusterNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
// WaitForRequeueIfCephClusterIsUpgrading waits until the upgrade is complete
WaitForRequeueIfCephClusterIsUpgrading = reconcile.Result{Requeue: true, RequeueAfter: time.Minute}
// WaitForRequeueIfFinalizerBlocked waits for resources to be cleaned up before the finalizer can be removed
WaitForRequeueIfFinalizerBlocked = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
// WaitForRequeueIfOperatorNotInitialized waits for resources to be cleaned up before the finalizer can be removed
WaitForRequeueIfOperatorNotInitialized = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
// OperatorCephBaseImageVersion is the ceph version in the operator image
OperatorCephBaseImageVersion string
)
func DiscoveryDaemonEnabled(data map[string]string) bool {
return k8sutil.GetValue(data, "ROOK_ENABLE_DISCOVERY_DAEMON", "false") == "true"
}
// SetCephCommandsTimeout sets the timeout value of Ceph commands which are executed from Rook
func SetCephCommandsTimeout(data map[string]string) {
strTimeoutSeconds := k8sutil.GetValue(data, "ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS", "15")
timeoutSeconds, err := strconv.Atoi(strTimeoutSeconds)
if err != nil || timeoutSeconds < 1 {
logger.Warningf("ROOK_CEPH_COMMANDS_TIMEOUT is %q but it should be >= 1, set the default value 15", strTimeoutSeconds)
timeoutSeconds = 15
}
exec.CephCommandsTimeout = time.Duration(timeoutSeconds) * time.Second
}
// canIgnoreHealthErrStatusInReconcile determines whether a status of HEALTH_ERR in the CephCluster can be ignored safely.
func canIgnoreHealthErrStatusInReconcile(cephCluster cephv1.CephCluster, controllerName string) bool {
// Get a list of all the keys causing the HEALTH_ERR status.
var healthErrKeys = make([]string, 0)
for key, health := range cephCluster.Status.CephStatus.Details {
if health.Severity == "HEALTH_ERR" {
healthErrKeys = append(healthErrKeys, key)
}
}
// If there is only one cause for HEALTH_ERR and it's on the allowed list of errors, ignore it.
var allowedErrStatus = []string{"MDS_ALL_DOWN"}
var ignoreHealthErr = len(healthErrKeys) == 1 && contains(allowedErrStatus, healthErrKeys[0])
if ignoreHealthErr {
logger.Debugf("%q: ignoring ceph status %q because only cause is %q (full status is %+v)", controllerName, cephCluster.Status.CephStatus.Health, healthErrKeys[0], cephCluster.Status.CephStatus)
}
return ignoreHealthErr
}
// IsReadyToReconcile determines if a controller is ready to reconcile or not
func IsReadyToReconcile(ctx context.Context, c client.Client, clustercontext *clusterd.Context, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
cephClusterExists := false
// Running ceph commands won't work and the controller will keep re-queuing so I believe it's fine not to check
// Make sure a CephCluster exists before doing anything
var cephCluster cephv1.CephCluster
clusterList := &cephv1.CephClusterList{}
err := c.List(ctx, clusterList, client.InNamespace(namespacedName.Namespace))
if err != nil {
logger.Errorf("%q: failed to fetch CephCluster %v", controllerName, err)
return cephCluster, false, cephClusterExists, ImmediateRetryResult
}
if len(clusterList.Items) == 0 {
logger.Debugf("%q: no CephCluster resource found in namespace %q", controllerName, namespacedName.Namespace)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}
cephClusterExists = true
cephCluster = clusterList.Items[0]
logger.Debugf("%q: CephCluster resource %q found in namespace %q", controllerName, cephCluster.Name, namespacedName.Namespace)
// read the CR status of the cluster
if cephCluster.Status.CephStatus != nil {
var operatorDeploymentOk = cephCluster.Status.CephStatus.Health == "HEALTH_OK" || cephCluster.Status.CephStatus.Health == "HEALTH_WARN"
if operatorDeploymentOk || canIgnoreHealthErrStatusInReconcile(cephCluster, controllerName) {
logger.Debugf("%q: ceph status is %q, operator is ready to run ceph command, reconciling", controllerName, cephCluster.Status.CephStatus.Health)
return cephCluster, true, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}
details := cephCluster.Status.CephStatus.Details
message, ok := details["error"]
if ok && len(details) == 1 && strings.Contains(message.Message, "Error initializing cluster client") {
logger.Infof("%s: skipping reconcile since operator is still initializing", controllerName)
} else {
logger.Infof("%s: CephCluster %q found but skipping reconcile since ceph health is %+v", controllerName, cephCluster.Name, cephCluster.Status.CephStatus)
}
}
logger.Debugf("%q: CephCluster %q initial reconcile is not complete yet...", controllerName, namespacedName.Namespace)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}
// ClusterOwnerRef represents the owner reference of the CephCluster CR
func ClusterOwnerRef(clusterName, clusterID string) metav1.OwnerReference {
blockOwner := true
controller := true
return metav1.OwnerReference{
APIVersion: fmt.Sprintf("%s/%s", ClusterResource.Group, ClusterResource.Version),
Kind: ClusterResource.Kind,
Name: clusterName,
UID: types.UID(clusterID),
BlockOwnerDeletion: &blockOwner,
Controller: &controller,
}
}
// ClusterResource operator-kit Custom Resource Definition
var ClusterResource = k8sutil.CustomResource{
Name: "cephcluster",
Plural: "cephclusters",
Group: cephv1.CustomResourceGroup,
Version: cephv1.Version,
Kind: reflect.TypeOf(cephv1.CephCluster{}).Name(),
APIVersion: fmt.Sprintf("%s/%s", cephv1.CustomResourceGroup, cephv1.Version),
}