/
spec.go
724 lines (640 loc) · 25.5 KB
/
spec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
/*
Copyright 2018 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package controller provides Kubernetes controller/pod/container spec items used for many Ceph daemons
package controller
import (
"context"
"fmt"
"os"
"path"
"reflect"
"strings"
"github.com/coreos/pkg/capnslog"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/daemon/ceph/client"
"github.com/rook/rook/pkg/operator/ceph/config"
"github.com/rook/rook/pkg/operator/ceph/config/keyring"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util/display"
v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
// ConfigInitContainerName is the name which is given to the config initialization container
// in all Ceph pods.
ConfigInitContainerName = "config-init"
logVolumeName = "rook-ceph-log"
volumeMountSubPath = "data"
crashVolumeName = "rook-ceph-crash"
daemonSocketDir = "/run/ceph"
initialDelaySecondsNonOSDDaemon int32 = 10
initialDelaySecondsOSDDaemon int32 = 45
logCollector = "log-collector"
DaemonIDLabel = "ceph_daemon_id"
daemonTypeLabel = "ceph_daemon_type"
ExternalMgrAppName = "rook-ceph-mgr-external"
ServiceExternalMetricName = "http-external-metrics"
)
type daemonConfig struct {
daemonType string
daemonID string
}
var logger = capnslog.NewPackageLogger("github.com/rook/rook", "ceph-spec")
var (
cronLogRotate = `
CEPH_CLIENT_ID=%s
PERIODICITY=%s
LOG_ROTATE_CEPH_FILE=/etc/logrotate.d/ceph
if [ -z "$PERIODICITY" ]; then
PERIODICITY=24h
fi
# edit the logrotate file to only rotate a specific daemon log
# otherwise we will logrotate log files without reloading certain daemons
# this might happen when multiple daemons run on the same machine
sed -i "s|*.log|$CEPH_CLIENT_ID.log|" "$LOG_ROTATE_CEPH_FILE"
while true; do
sleep "$PERIODICITY"
echo "starting log rotation"
logrotate --verbose --force "$LOG_ROTATE_CEPH_FILE"
echo "I am going to sleep now, see you in $PERIODICITY"
done
`
)
// return the volume and matching volume mount for mounting the config override ConfigMap into
// containers as "/etc/ceph/ceph.conf".
func configOverrideConfigMapVolumeAndMount() (v1.Volume, v1.VolumeMount) {
secretAndConfigMapVolumeProjections := []v1.VolumeProjection{}
name := k8sutil.ConfigOverrideName // configmap name and name of volume
dir := config.EtcCephDir
file := "ceph.conf"
// TL;DR: mount the configmap's "config" to a file called "ceph.conf" with 0444 permissions
// security: allow to be read by everyone since now ceph processes run as 'ceph' and not 'root' user
// Further investigation needs to be done to copy the ceph.conf and change its ownership
// since configuring a owner of a ConfigMap secret is currently impossible
// This also works around the following issue: https://tracker.ceph.com/issues/38606
//
// This design choice avoids the crash/restart situation in Rook
// If we don't set 0444 to the ceph.conf configuration file during its respawn (with exec) the ceph-mgr
// won't be able to read the ceph.conf and the container will die, the "restart" count will increase in k8s
// This will mislead users thinking something won't wrong but that a false positive
mode := int32(0444)
projectionConfigMap := &v1.ConfigMapProjection{Items: []v1.KeyToPath{{Key: k8sutil.ConfigOverrideVal, Path: file, Mode: &mode}}}
projectionConfigMap.Name = name
configMapProjection := v1.VolumeProjection{
ConfigMap: projectionConfigMap,
}
secretAndConfigMapVolumeProjections = append(secretAndConfigMapVolumeProjections, configMapProjection)
v := v1.Volume{
Name: name,
VolumeSource: v1.VolumeSource{
Projected: &v1.ProjectedVolumeSource{
Sources: secretAndConfigMapVolumeProjections,
},
},
}
// configmap's "config" to "/etc/ceph/ceph.conf"
m := v1.VolumeMount{
Name: name,
ReadOnly: true, // should be no reason to write to the config in pods, so enforce this
MountPath: dir,
}
return v, m
}
// ConfGeneratedInPodVolumeAndMount generate an empty dir of /etc/ceph
func ConfGeneratedInPodVolumeAndMount() (v1.Volume, v1.VolumeMount) {
name := "ceph-conf-emptydir"
dir := config.EtcCephDir
v := v1.Volume{Name: name, VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{}}}
// configmap's "config" to "/etc/ceph/ceph.conf"
m := v1.VolumeMount{
Name: name,
MountPath: dir,
}
return v, m
}
// PodVolumes fills in the volumes parameter with the common list of Kubernetes volumes for use in Ceph pods.
// This function is only used for OSDs.
func PodVolumes(dataPaths *config.DataPathMap, dataDirHostPath string, confGeneratedInPod bool) []v1.Volume {
dataDirSource := v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}
if dataDirHostPath != "" {
dataDirSource = v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: dataDirHostPath}}
}
configVolume, _ := configOverrideConfigMapVolumeAndMount()
if confGeneratedInPod {
configVolume, _ = ConfGeneratedInPodVolumeAndMount()
}
v := []v1.Volume{
{Name: k8sutil.DataDirVolume, VolumeSource: dataDirSource},
configVolume,
}
v = append(v, StoredLogAndCrashVolume(dataPaths.HostLogDir(), dataPaths.HostCrashDir())...)
return v
}
// CephVolumeMounts returns the common list of Kubernetes volume mounts for Ceph containers.
// This function is only used for OSDs.
func CephVolumeMounts(dataPaths *config.DataPathMap, confGeneratedInPod bool) []v1.VolumeMount {
_, configMount := configOverrideConfigMapVolumeAndMount()
if confGeneratedInPod {
_, configMount = ConfGeneratedInPodVolumeAndMount()
}
v := []v1.VolumeMount{
{Name: k8sutil.DataDirVolume, MountPath: k8sutil.DataDir},
configMount,
// Rook doesn't run in ceph containers, so it doesn't need the config override mounted
}
v = append(v, StoredLogAndCrashVolumeMount(dataPaths.ContainerLogDir(), dataPaths.ContainerCrashDir())...)
return v
}
// RookVolumeMounts returns the common list of Kubernetes volume mounts for Rook containers.
// This function is only used by OSDs.
func RookVolumeMounts(dataPaths *config.DataPathMap, confGeneratedInPod bool) []v1.VolumeMount {
return CephVolumeMounts(dataPaths, confGeneratedInPod)
}
// DaemonVolumesBase returns the common / static set of volumes.
func DaemonVolumesBase(dataPaths *config.DataPathMap, keyringResourceName string) []v1.Volume {
configOverrideVolume, _ := configOverrideConfigMapVolumeAndMount()
vols := []v1.Volume{
configOverrideVolume,
}
if keyringResourceName != "" {
vols = append(vols, keyring.Volume().Resource(keyringResourceName))
}
if dataPaths.HostLogAndCrashDir != "" {
// logs are not persisted to host
vols = append(vols, StoredLogAndCrashVolume(dataPaths.HostLogDir(), dataPaths.HostCrashDir())...)
}
return vols
}
// DaemonVolumesDataPVC returns a PVC volume source for daemon container data.
func DaemonVolumesDataPVC(pvcName string) v1.Volume {
return v1.Volume{
Name: "ceph-daemon-data",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcName,
},
},
}
}
// DaemonVolumesDataHostPath returns HostPath volume source for daemon container
// data.
func DaemonVolumesDataHostPath(dataPaths *config.DataPathMap) []v1.Volume {
vols := []v1.Volume{}
if dataPaths.ContainerDataDir == "" {
// no data is stored in container, and therefore no data can be persisted to host
return vols
}
// when data is not persisted to host, the data may still be shared between init/run containers
src := v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}
if dataPaths.HostDataDir != "" {
// data is persisted to host
src = v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: dataPaths.HostDataDir}}
}
return append(vols, v1.Volume{Name: "ceph-daemon-data", VolumeSource: src})
}
// DaemonVolumesContainsPVC returns true if a volume exists with a volume source
// configured with a persistent volume claim.
func DaemonVolumesContainsPVC(volumes []v1.Volume) bool {
for _, volume := range volumes {
if volume.VolumeSource.PersistentVolumeClaim != nil {
return true
}
}
return false
}
// DaemonVolumes returns the pod volumes used by all Ceph daemons. If keyring resource name is
// empty, there will be no keyring volume created from a secret.
func DaemonVolumes(dataPaths *config.DataPathMap, keyringResourceName string) []v1.Volume {
vols := DaemonVolumesBase(dataPaths, keyringResourceName)
vols = append(vols, DaemonVolumesDataHostPath(dataPaths)...)
return vols
}
// DaemonVolumeMounts returns volume mounts which correspond to the DaemonVolumes. These
// volume mounts are shared by most all Ceph daemon containers, both init and standard. If keyring
// resource name is empty, there will be no keyring mounted in the container.
func DaemonVolumeMounts(dataPaths *config.DataPathMap, keyringResourceName string) []v1.VolumeMount {
_, configOverrideMount := configOverrideConfigMapVolumeAndMount()
mounts := []v1.VolumeMount{
configOverrideMount,
}
if keyringResourceName != "" {
mounts = append(mounts, keyring.VolumeMount().Resource(keyringResourceName))
}
if dataPaths.HostLogAndCrashDir != "" {
// logs are not persisted to host, so no mount is needed
mounts = append(mounts, StoredLogAndCrashVolumeMount(dataPaths.ContainerLogDir(), dataPaths.ContainerCrashDir())...)
}
if dataPaths.ContainerDataDir == "" {
// no data is stored in container, so there are no more mounts
return mounts
}
return append(mounts,
v1.VolumeMount{Name: "ceph-daemon-data", MountPath: dataPaths.ContainerDataDir},
)
}
// see AddVolumeMountSubPath
func addVolumeMountSubPathContainer(c *v1.Container, volumeMountName string) {
for i := range c.VolumeMounts {
v := &c.VolumeMounts[i]
if v.Name == volumeMountName {
v.SubPath = volumeMountSubPath
}
}
}
// AddVolumeMountSubPath updates each init and regular container of the podspec
// such that each volume mount attached to a container is mounted under a
// subpath in the source volume. This is important because some daemons may not
// start if the volume mount directory is non-empty. When the volume is the root
// of an ext4 file system, one may find a "lost+found" directory.
func AddVolumeMountSubPath(podSpec *v1.PodSpec, volumeMountName string) {
for i := range podSpec.InitContainers {
c := &podSpec.InitContainers[i]
addVolumeMountSubPathContainer(c, volumeMountName)
}
for i := range podSpec.Containers {
c := &podSpec.Containers[i]
addVolumeMountSubPathContainer(c, volumeMountName)
}
}
// DaemonFlags returns the command line flags used by all Ceph daemons.
func DaemonFlags(cluster *client.ClusterInfo, spec *cephv1.ClusterSpec, daemonID string) []string {
flags := append(
config.DefaultFlags(cluster.FSID, keyring.VolumeMount().KeyringFilePath()),
config.NewFlag("id", daemonID),
// Ceph daemons in Rook will run as 'ceph' instead of 'root'
// If we run on a version of Ceph does not these flags it will simply ignore them
//run ceph daemon process under the 'ceph' user
config.NewFlag("setuser", "ceph"),
// run ceph daemon process under the 'ceph' group
config.NewFlag("setgroup", "ceph"),
)
flags = append(flags, NetworkBindingFlags(cluster, spec)...)
return flags
}
// AdminFlags returns the command line flags used for Ceph commands requiring admin authentication.
func AdminFlags(cluster *client.ClusterInfo) []string {
return append(
config.DefaultFlags(cluster.FSID, keyring.VolumeMount().AdminKeyringFilePath()),
config.NewFlag("setuser", "ceph"),
config.NewFlag("setgroup", "ceph"),
)
}
func NetworkBindingFlags(cluster *client.ClusterInfo, spec *cephv1.ClusterSpec) []string {
var args []string
// As of Pacific, Ceph supports dual-stack, so setting IPv6 family without disabling IPv4 binding actually enables dual-stack
// This is likely not user's intent, so on Pacific let's make sure to disable IPv4 when IPv6 is selected
if !spec.Network.DualStack {
switch spec.Network.IPFamily {
case cephv1.IPv4:
args = append(args, config.NewFlag("ms-bind-ipv4", "true"))
args = append(args, config.NewFlag("ms-bind-ipv6", "false"))
case cephv1.IPv6:
args = append(args, config.NewFlag("ms-bind-ipv4", "false"))
args = append(args, config.NewFlag("ms-bind-ipv6", "true"))
}
} else {
if cluster.CephVersion.IsAtLeastPacific() {
args = append(args, config.NewFlag("ms-bind-ipv4", "true"))
args = append(args, config.NewFlag("ms-bind-ipv6", "true"))
} else {
logger.Info("dual-stack is only supported on ceph pacific")
// Still acknowledge IPv6, nothing to do for IPv4 since it will always be "on"
if spec.Network.IPFamily == cephv1.IPv6 {
args = append(args, config.NewFlag("ms-bind-ipv6", "true"))
}
}
}
return args
}
// ContainerEnvVarReference returns a reference to a Kubernetes container env var of the given name
// which can be used in command or argument fields.
func ContainerEnvVarReference(envVarName string) string {
return fmt.Sprintf("$(%s)", envVarName)
}
// DaemonEnvVars returns the container environment variables used by all Ceph daemons.
func DaemonEnvVars(image string) []v1.EnvVar {
return append(
k8sutil.ClusterDaemonEnvVars(image),
config.StoredMonHostEnvVars()...,
)
}
// AppLabels returns labels common for all Rook-Ceph applications which may be useful for admins.
// App name is the name of the application: e.g., 'rook-ceph-mon', 'rook-ceph-mgr', etc.
func AppLabels(appName, namespace string) map[string]string {
return map[string]string{
k8sutil.AppAttr: appName,
k8sutil.ClusterAttr: namespace,
}
}
// CephDaemonAppLabels returns pod labels common to all Rook-Ceph pods which may be useful for admins.
// App name is the name of the application: e.g., 'rook-ceph-mon', 'rook-ceph-mgr', etc.
// Daemon type is the Ceph daemon type: "mon", "mgr", "osd", "mds", "rgw"
// Daemon ID is the ID portion of the Ceph daemon name: "a" for "mon.a"; "c" for "mds.c"
func CephDaemonAppLabels(appName, namespace, daemonType, daemonID string, includeNewLabels bool) map[string]string {
labels := AppLabels(appName, namespace)
// New labels cannot be applied to match selectors during upgrade
if includeNewLabels {
labels[daemonTypeLabel] = daemonType
}
labels[DaemonIDLabel] = daemonID
// Also report the daemon id keyed by its daemon type: "mon: a", "mds: c", etc.
labels[daemonType] = daemonID
return labels
}
// CheckPodMemory verify pod's memory limit is valid
func CheckPodMemory(name string, resources v1.ResourceRequirements, cephPodMinimumMemory uint64) error {
// Ceph related PR: https://github.com/ceph/ceph/pull/26856
podMemoryLimit := resources.Limits.Memory()
podMemoryRequest := resources.Requests.Memory()
// If nothing was provided let's just return
// This means no restrictions on pod's resources
if podMemoryLimit.IsZero() && podMemoryRequest.IsZero() {
return nil
}
if !podMemoryLimit.IsZero() {
// This means LIMIT and REQUEST are either identical or different but still we use LIMIT as a reference
if uint64(podMemoryLimit.Value()) < display.MbTob(cephPodMinimumMemory) {
// allow the configuration if less than the min, but print a warning
logger.Warningf("running the %q daemon(s) with %dMB of ram, but at least %dMB is recommended", name, display.BToMb(uint64(podMemoryLimit.Value())), cephPodMinimumMemory)
}
// This means LIMIT < REQUEST
// Kubernetes will refuse to schedule that pod however it's still valuable to indicate that user's input was incorrect
if uint64(podMemoryLimit.Value()) < uint64(podMemoryRequest.Value()) {
extraErrorLine := `\n
User has specified a pod memory limit %dmb below the pod memory request %dmb in the cluster CR.\n
Rook will create pods that are expected to fail to serve as a more apparent error indicator to the user.`
return errors.Errorf(extraErrorLine, display.BToMb(uint64(podMemoryLimit.Value())), display.BToMb(uint64(podMemoryRequest.Value())))
}
}
return nil
}
// ChownCephDataDirsInitContainer returns an init container which `chown`s the given data
// directories as the `ceph:ceph` user in the container. It also `chown`s the Ceph log dir in the
// container automatically.
// Doing a chown in a post start lifecycle hook does not reliably complete before the OSD
// process starts, which can cause the pod to fail without the lifecycle hook's chown command
// completing. It can take an arbitrarily long time for a pod restart to successfully chown the
// directory. This is a race condition for all daemons; therefore, do this in an init container.
// See more discussion here: https://github.com/rook/rook/pull/3594#discussion_r312279176
func ChownCephDataDirsInitContainer(
dpm config.DataPathMap,
containerImage string,
volumeMounts []v1.VolumeMount,
resources v1.ResourceRequirements,
securityContext *v1.SecurityContext,
) v1.Container {
args := make([]string, 0, 5)
args = append(args,
"--verbose",
"--recursive",
"ceph:ceph",
config.VarLogCephDir,
config.VarLibCephCrashDir,
)
if dpm.ContainerDataDir != "" {
args = append(args, dpm.ContainerDataDir)
}
return v1.Container{
Name: "chown-container-data-dir",
Command: []string{"chown"},
Args: args,
Image: containerImage,
VolumeMounts: volumeMounts,
Resources: resources,
SecurityContext: securityContext,
}
}
// GenerateMinimalCephConfInitContainer returns an init container that will generate the most basic
// Ceph config for connecting non-Ceph daemons to a Ceph cluster (e.g., nfs-ganesha). Effectively
// what this means is that it generates '/etc/ceph/ceph.conf' with 'mon_host' populated and a
// keyring path associated with the user given. 'mon_host' is determined by the 'ROOK_CEPH_MON_HOST'
// env var present in other Ceph daemon pods, and the keyring is expected to be mounted into the
// container with a Kubernetes pod volume+mount.
func GenerateMinimalCephConfInitContainer(
username, keyringPath string,
containerImage string,
volumeMounts []v1.VolumeMount,
resources v1.ResourceRequirements,
securityContext *v1.SecurityContext,
) v1.Container {
cfgPath := client.DefaultConfigFilePath()
// Note that parameters like $(PARAM) will be replaced by Kubernetes with env var content before
// container creation.
confScript := `
set -xEeuo pipefail
cat << EOF > ` + cfgPath + `
[global]
mon_host = $(ROOK_CEPH_MON_HOST)
[` + username + `]
keyring = ` + keyringPath + `
EOF
chmod 444 ` + cfgPath + `
cat ` + cfgPath + `
`
return v1.Container{
Name: "generate-minimal-ceph-conf",
Command: []string{"/bin/bash", "-c", confScript},
Args: []string{},
Image: containerImage,
VolumeMounts: volumeMounts,
Env: config.StoredMonHostEnvVars(),
Resources: resources,
SecurityContext: securityContext,
}
}
// StoredLogAndCrashVolume returns a pod volume sourced from the stored log and crashes files.
func StoredLogAndCrashVolume(hostLogDir, hostCrashDir string) []v1.Volume {
return []v1.Volume{
{
Name: logVolumeName,
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{Path: hostLogDir},
},
},
{
Name: crashVolumeName,
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{Path: hostCrashDir},
},
},
}
}
// StoredLogAndCrashVolumeMount returns a pod volume sourced from the stored log and crashes files.
func StoredLogAndCrashVolumeMount(varLogCephDir, varLibCephCrashDir string) []v1.VolumeMount {
return []v1.VolumeMount{
{
Name: logVolumeName,
ReadOnly: false,
MountPath: varLogCephDir,
},
{
Name: crashVolumeName,
ReadOnly: false,
MountPath: varLibCephCrashDir,
},
}
}
// GenerateLivenessProbeExecDaemon makes sure a daemon has a socket and that it can be called and returns 0
func GenerateLivenessProbeExecDaemon(daemonType, daemonID string) *v1.Probe {
confDaemon := getDaemonConfig(daemonType, daemonID)
initialDelaySeconds := initialDelaySecondsNonOSDDaemon
if daemonType == config.OsdType {
initialDelaySeconds = initialDelaySecondsOSDDaemon
}
return &v1.Probe{
Handler: v1.Handler{
Exec: &v1.ExecAction{
// Run with env -i to clean env variables in the exec context
// This avoids conflict with the CEPH_ARGS env
//
// Example:
// env -i sh -c "ceph --admin-daemon /run/ceph/ceph-osd.0.asok status"
Command: []string{
"env",
"-i",
"sh",
"-c",
fmt.Sprintf("ceph --admin-daemon %s %s", confDaemon.buildSocketPath(), confDaemon.buildAdminSocketCommand()),
},
},
},
InitialDelaySeconds: initialDelaySeconds,
}
}
func getDaemonConfig(daemonType, daemonID string) *daemonConfig {
return &daemonConfig{
daemonType: string(daemonType),
daemonID: daemonID,
}
}
func (c *daemonConfig) buildSocketName() string {
return fmt.Sprintf("ceph-%s.%s.asok", c.daemonType, c.daemonID)
}
func (c *daemonConfig) buildSocketPath() string {
return path.Join(daemonSocketDir, c.buildSocketName())
}
func (c *daemonConfig) buildAdminSocketCommand() string {
command := "status"
if c.daemonType == config.MonType {
command = "mon_status"
}
return command
}
// PodSecurityContext detects if the pod needs privileges to run
func PodSecurityContext() *v1.SecurityContext {
privileged := false
if os.Getenv("ROOK_HOSTPATH_REQUIRES_PRIVILEGED") == "true" {
privileged = true
}
return &v1.SecurityContext{
Privileged: &privileged,
}
}
// LogCollectorContainer runs a cron job to rotate logs
func LogCollectorContainer(daemonID, ns string, c cephv1.ClusterSpec) *v1.Container {
return &v1.Container{
Name: logCollector,
Command: []string{
"/bin/bash",
"-x", // Print commands and their arguments as they are executed
"-e", // Exit immediately if a command exits with a non-zero status.
"-m", // Terminal job control, allows job to be terminated by SIGTERM
"-c", // Command to run
fmt.Sprintf(cronLogRotate, daemonID, c.LogCollector.Periodicity),
},
Image: c.CephVersion.Image,
VolumeMounts: DaemonVolumeMounts(config.NewDatalessDaemonDataPathMap(ns, c.DataDirHostPath), ""),
SecurityContext: PodSecurityContext(),
Resources: cephv1.GetLogCollectorResources(c.Resources),
// We need a TTY for the bash job control (enabled by -m)
TTY: true,
}
}
// CreateExternalMetricsEndpoints creates external metric endpoint
func createExternalMetricsEndpoints(namespace string, monitoringSpec cephv1.MonitoringSpec, ownerInfo *k8sutil.OwnerInfo) (*v1.Endpoints, error) {
labels := AppLabels("rook-ceph-mgr", namespace)
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: ExternalMgrAppName,
Namespace: namespace,
Labels: labels,
},
Subsets: []v1.EndpointSubset{
{
Addresses: monitoringSpec.ExternalMgrEndpoints,
Ports: []v1.EndpointPort{
{
Name: ServiceExternalMetricName,
Port: int32(monitoringSpec.ExternalMgrPrometheusPort),
Protocol: v1.ProtocolTCP,
},
},
},
},
}
err := ownerInfo.SetControllerReference(endpoints)
if err != nil {
return nil, errors.Wrapf(err, "failed to set owner reference to metric endpoints %q", endpoints.Name)
}
return endpoints, nil
}
func ConfigureExternalMetricsEndpoint(ctx *clusterd.Context, monitoringSpec cephv1.MonitoringSpec, clusterInfo *client.ClusterInfo, ownerInfo *k8sutil.OwnerInfo) error {
if len(monitoringSpec.ExternalMgrEndpoints) == 0 {
logger.Debug("no metric endpoint configured, doing nothing")
return nil
}
// Get active mgr
var activeMgrAddr string
// We use mgr dump and not stat because we want the IP address
mgrMap, err := client.CephMgrMap(ctx, clusterInfo)
if err != nil {
logger.Errorf("failed to get mgr map. %v", err)
} else {
activeMgrAddr = extractMgrIP(mgrMap.ActiveAddr)
}
logger.Debugf("active mgr addr %q", activeMgrAddr)
// If the active manager is different than the one in the spec we override it
// This happens when a standby manager becomes active
if activeMgrAddr != monitoringSpec.ExternalMgrEndpoints[0].IP {
monitoringSpec.ExternalMgrEndpoints[0].IP = activeMgrAddr
}
// Create external monitoring Endpoints
endpoint, err := createExternalMetricsEndpoints(clusterInfo.Namespace, monitoringSpec, ownerInfo)
if err != nil {
return err
}
// Get the endpoint to see if anything needs to be updated
currentEndpoints, err := ctx.Clientset.CoreV1().Endpoints(clusterInfo.Namespace).Get(context.TODO(), endpoint.Name, metav1.GetOptions{})
if err != nil && !kerrors.IsNotFound(err) {
return errors.Wrap(err, "failed to fetch endpoints")
}
// If endpoints are identical there is nothing to do
if reflect.DeepEqual(currentEndpoints, endpoint) {
return nil
}
logger.Debugf("diff between current endpoint and newly generated one: %v \n", cmp.Diff(currentEndpoints, endpoint, cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })))
_, err = k8sutil.CreateOrUpdateEndpoint(ctx.Clientset, clusterInfo.Namespace, endpoint)
if err != nil {
return errors.Wrap(err, "failed to create or update mgr endpoint")
}
return nil
}
func extractMgrIP(rawActiveAddr string) string {
return strings.Split(rawActiveAddr, ":")[0]
}