-
Notifications
You must be signed in to change notification settings - Fork 50
/
drplacementcontrol_controller.go
2067 lines (1609 loc) · 61.4 KB
/
drplacementcontrol_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2021 The RamenDR authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"encoding/json"
"fmt"
"reflect"
"regexp"
"time"
"github.com/ghodss/yaml"
"github.com/go-logr/logr"
ocmworkv1 "github.com/open-cluster-management/api/work/v1"
viewv1beta1 "github.com/open-cluster-management/multicloud-operators-foundation/pkg/apis/view/v1beta1"
plrv1 "github.com/open-cluster-management/multicloud-operators-placementrule/pkg/apis/apps/v1"
errorswrapper "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
rmn "github.com/ramendr/ramen/api/v1alpha1"
rmnutil "github.com/ramendr/ramen/controllers/util"
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
const (
// DRPC CR finalizer
DRPCFinalizer string = "drpc.ramendr.openshift.io/finalizer"
// Ramen scheduler
RamenScheduler string = "ramen"
ClonedPlacementRuleNameFormat string = "clonedprule-%s-%s"
)
// prometheus metrics
type timerState string
const (
timerStart timerState = "start"
timerStop timerState = "stop"
)
type timerWrapper struct {
gauge prometheus.Gauge // used for "last only" fine-grained timer
histogram prometheus.Histogram // used for cumulative data
timer prometheus.Timer // use prometheus.NewTimer to use/reuse this timer across reconciles
reconcileState rmn.DRState // used to track for spurious reconcile avoidance
}
// set default values for guageWrapper
func newTimerWrapper(gauge prometheus.Gauge, histogram prometheus.Histogram) timerWrapper {
wrapper := timerWrapper{}
wrapper.gauge = gauge
wrapper.timer = prometheus.Timer{}
wrapper.histogram = histogram
wrapper.reconcileState = rmn.Deployed // should never use a timer from Initial state; "reserved"
return wrapper
}
var (
failoverTime = newTimerWrapper(
prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ramen_failover_time",
Help: "Duration of the last failover event",
}),
prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "ramen_failover_histogram",
Help: "Histogram of all failover timers (seconds)",
Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 12), // start=1.0, factor=2.0, buckets=12
}),
)
failbackTime = newTimerWrapper(
prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ramen_failback_time",
Help: "Duration of the last failback event",
}),
prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "ramen_failback_histogram",
Help: "Histogram of all failback timers (seconds)",
Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 12), // start=1.0, factor=2.0, buckets=12
}),
)
relocateTime = newTimerWrapper(
prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ramen_relocate_time",
Help: "Duration of the last relocate time",
}),
prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "ramen_relocate_histogram",
Help: "Histogram of all relocate timers (seconds)",
Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 12), // start=1.0, factor=2.0, buckets=12
}),
)
)
func init() {
// register custom metrics with the global Prometheus registry
metrics.Registry.MustRegister(failbackTime.gauge, failoverTime.gauge, relocateTime.gauge)
}
var WaitForPVRestoreToComplete = errorswrapper.New("Waiting for PV restore to complete")
// ProgressCallback of function type
type ProgressCallback func(string, string)
// DRPlacementControlReconciler reconciles a DRPlacementControl object
type DRPlacementControlReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Callback ProgressCallback
}
func ManifestWorkPredicateFunc() predicate.Funcs {
mwPredicate := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := ctrl.Log.WithName("ManifestWork")
oldMW, ok := e.ObjectOld.DeepCopyObject().(*ocmworkv1.ManifestWork)
if !ok {
log.Info("Failed to deep copy older ManifestWork")
return false
}
newMW, ok := e.ObjectNew.DeepCopyObject().(*ocmworkv1.ManifestWork)
if !ok {
log.Info("Failed to deep copy newer ManifestWork")
return false
}
log.Info(fmt.Sprintf("Update event for MW %s/%s", oldMW.Name, oldMW.Namespace))
return !reflect.DeepEqual(oldMW.Status, newMW.Status)
},
}
return mwPredicate
}
func filterMW(mw *ocmworkv1.ManifestWork) []ctrl.Request {
return []ctrl.Request{
reconcile.Request{
NamespacedName: types.NamespacedName{
Name: mw.Annotations[rmnutil.DRPCNameAnnotation],
Namespace: mw.Annotations[rmnutil.DRPCNamespaceAnnotation],
},
},
}
}
func ManagedClusterViewPredicateFunc() predicate.Funcs {
log := ctrl.Log.WithName("MCV")
mcvPredicate := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
oldMCV, ok := e.ObjectOld.DeepCopyObject().(*viewv1beta1.ManagedClusterView)
if !ok {
log.Info("Failed to deep copy older MCV")
return false
}
newMCV, ok := e.ObjectNew.DeepCopyObject().(*viewv1beta1.ManagedClusterView)
if !ok {
log.Info("Failed to deep copy newer MCV")
return false
}
log.Info(fmt.Sprintf("Update event for MCV %s/%s", oldMCV.Name, oldMCV.Namespace))
return !reflect.DeepEqual(oldMCV.Status, newMCV.Status)
},
DeleteFunc: func(e event.DeleteEvent) bool {
log.Info("Delete event for MCV")
return false
},
}
return mcvPredicate
}
func filterMCV(mcv *viewv1beta1.ManagedClusterView) []ctrl.Request {
return []ctrl.Request{
reconcile.Request{
NamespacedName: types.NamespacedName{
Name: mcv.Annotations[rmnutil.DRPCNameAnnotation],
Namespace: mcv.Annotations[rmnutil.DRPCNamespaceAnnotation],
},
},
}
}
func GetDRPCCondition(status *rmn.DRPlacementControlStatus, conditionType string) (int, *metav1.Condition) {
if len(status.Conditions) == 0 {
return -1, nil
}
for i := range status.Conditions {
if status.Conditions[i].Type == conditionType {
return i, &status.Conditions[i]
}
}
return -1, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *DRPlacementControlReconciler) SetupWithManager(mgr ctrl.Manager) error {
mwPred := ManifestWorkPredicateFunc()
mwMapFun := handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
mw, ok := obj.(*ocmworkv1.ManifestWork)
if !ok {
ctrl.Log.Info("ManifestWork map function received non-MW resource")
return []reconcile.Request{}
}
ctrl.Log.Info(fmt.Sprintf("Filtering ManifestWork (%s/%s)", mw.Name, mw.Namespace))
return filterMW(mw)
}))
mcvPred := ManagedClusterViewPredicateFunc()
mcvMapFun := handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(obj client.Object) []reconcile.Request {
mcv, ok := obj.(*viewv1beta1.ManagedClusterView)
if !ok {
ctrl.Log.Info("ManagedClusterView map function received non-MCV resource")
return []reconcile.Request{}
}
ctrl.Log.Info(fmt.Sprintf("Filtering MCV (%s/%s)", mcv.Name, mcv.Namespace))
return filterMCV(mcv)
}))
return ctrl.NewControllerManagedBy(mgr).
For(&rmn.DRPlacementControl{}).
Watches(&source.Kind{Type: &ocmworkv1.ManifestWork{}}, mwMapFun, builder.WithPredicates(mwPred)).
Watches(&source.Kind{Type: &viewv1beta1.ManagedClusterView{}}, mcvMapFun, builder.WithPredicates(mcvPred)).
Complete(r)
}
//nolint:lll
// +kubebuilder:rbac:groups=ramendr.openshift.io,resources=drplacementcontrols,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=ramendr.openshift.io,resources=drplacementcontrols/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=ramendr.openshift.io,resources=drplacementcontrols/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps.open-cluster-management.io,resources=placementrules,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.open-cluster-management.io,resources=placementrules/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cluster.open-cluster-management.io,resources=managedclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=work.open-cluster-management.io,resources=manifestworks,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=view.open-cluster-management.io,resources=managedclusterviews,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=ramendr.openshift.io,resources=drpolicies,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the DRPlacementControl object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.7.0/pkg/reconcile
func (r *DRPlacementControlReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues("DRPC", req.NamespacedName)
logger.Info("Entering reconcile loop")
defer logger.Info("Exiting reconcile loop")
drpc := &rmn.DRPlacementControl{}
err := r.Client.Get(ctx, req.NamespacedName, drpc)
if err != nil {
if errors.IsNotFound(err) {
logger.Info(fmt.Sprintf("DRCP object not found %v", req.NamespacedName))
// Request object not found, could have been deleted after reconcile request.
return ctrl.Result{}, nil
}
return ctrl.Result{}, errorswrapper.Wrap(err, "failed to get DRPC object")
}
return r.reconcileDRPCInstance(ctx, drpc)
}
func (r *DRPlacementControlReconciler) reconcileDRPCInstance(ctx context.Context,
drpc *rmn.DRPlacementControl) (ctrl.Result, error) {
drPolicy, err := r.getDRPolicy(ctx, drpc.Spec.DRPolicyRef.Name, drpc.Spec.DRPolicyRef.Namespace)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get DRPolicy %w", err)
}
// Currently validation of schedule in DRPolicy is done here. When
// there is a reconciler for DRPolicy, then probably this validation
// has to be done there and can be removed from here.
err = r.validateSchedule(drPolicy)
if err != nil {
r.Log.Error(err, "failed to validate schedule")
// Should it be no requeue? as there is no reconcile till user
// changes desired spec to a valid value
return ctrl.Result{}, err
}
// Check if the drpc instance is marked for deletion, which is indicated by the
// deletion timestamp being set.
if drpc.GetDeletionTimestamp() != nil {
return r.processDeletion(ctx, drpc)
}
err = r.addDRPCFinalizer(ctx, drpc)
if err != nil {
return ctrl.Result{}, err
}
drpcPlRule, userPlRule, err := r.getPlacementRules(ctx, drpc, drPolicy)
if err != nil {
r.Log.Error(err, "failed to get PlacementRules")
return ctrl.Result{}, err
}
// Make sure that we give time to the cloned PlacementRule to run and produces decisions
if drpcPlRule != nil && len(drpcPlRule.Status.Decisions) == 0 {
const initialWaitTime = 5
return ctrl.Result{RequeueAfter: time.Second * initialWaitTime}, nil
}
d := DRPCInstance{
reconciler: r, ctx: ctx, log: r.Log, instance: drpc, needStatusUpdate: false,
userPlacementRule: userPlRule, drpcPlacementRule: drpcPlRule, drPolicy: drPolicy,
mwu: rmnutil.MWUtil{Client: r.Client, Ctx: ctx, Log: r.Log, InstName: drpc.Name, InstNamespace: drpc.Namespace},
}
return r.processAndHandleResponse(&d)
}
func (r *DRPlacementControlReconciler) processAndHandleResponse(d *DRPCInstance) (ctrl.Result, error) {
requeue := d.startProcessing()
r.Log.Info("Finished processing", "Requeue?", requeue)
if !requeue {
r.Log.Info("Done reconciling")
r.Callback(d.instance.Name, string(d.getLastDRState()))
}
if d.mcvRequestInProgress {
duration := d.getRequeueDuration()
r.Log.Info(fmt.Sprintf("Requeing after %v", duration))
return reconcile.Result{RequeueAfter: duration}, nil
}
if requeue {
r.Log.Info("Requeing...")
return ctrl.Result{Requeue: true}, nil
}
const sanityCheckDelay = 10 // for 10 mins
return ctrl.Result{RequeueAfter: time.Minute * sanityCheckDelay}, nil
}
func (r *DRPlacementControlReconciler) getDRPolicy(ctx context.Context,
name, namespace string) (*rmn.DRPolicy, error) {
drPolicy := &rmn.DRPolicy{}
err := r.Client.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, drPolicy)
if err != nil {
r.Log.Error(err, "failed to get DRPolicy")
return nil, fmt.Errorf("%w", err)
}
return drPolicy, nil
}
func (r *DRPlacementControlReconciler) addDRPCFinalizer(ctx context.Context, drpc *rmn.DRPlacementControl) error {
if !controllerutil.ContainsFinalizer(drpc, DRPCFinalizer) {
controllerutil.AddFinalizer(drpc, DRPCFinalizer)
if err := r.Update(ctx, drpc); err != nil {
r.Log.Error(err, "Failed to add finalizer", "finalizer", DRPCFinalizer)
return fmt.Errorf("%w", err)
}
}
return nil
}
func (r *DRPlacementControlReconciler) processDeletion(ctx context.Context,
drpc *rmn.DRPlacementControl) (ctrl.Result, error) {
r.Log.Info("Processing DRPC deletion")
if controllerutil.ContainsFinalizer(drpc, DRPCFinalizer) {
// Run finalization logic for dprc.
// If the finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.finalizeDRPC(ctx, drpc); err != nil {
return ctrl.Result{}, err
}
// Remove DRPCFinalizer. The object will be deleted once
// the finalizer is removed
controllerutil.RemoveFinalizer(drpc, DRPCFinalizer)
err := r.Update(ctx, drpc)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update drpc %w", err)
}
r.Callback(drpc.Name, "deleted")
}
return ctrl.Result{}, nil
}
func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *rmn.DRPlacementControl) error {
r.Log.Info("Finalizing DRPC")
clonedPlRuleName := fmt.Sprintf(ClonedPlacementRuleNameFormat, drpc.Name, drpc.Namespace)
mwu := rmnutil.MWUtil{Client: r.Client, Ctx: ctx, Log: r.Log, InstName: drpc.Name, InstNamespace: drpc.Namespace}
preferredCluster := drpc.Spec.PreferredCluster
if preferredCluster == "" {
clonedPlRule, err := r.getClonedPlacementRule(ctx, clonedPlRuleName, drpc.Namespace)
if err != nil {
r.Log.Info("Cloned placement rule not found")
return nil
}
if len(clonedPlRule.Status.Decisions) != 0 {
preferredCluster = clonedPlRule.Status.Decisions[0].ClusterName
}
}
clustersToClean := []string{preferredCluster}
if drpc.Spec.FailoverCluster != "" {
clustersToClean = append(clustersToClean, drpc.Spec.FailoverCluster)
}
// delete manifestworks (VRG)
for idx := range clustersToClean {
err := mwu.DeleteManifestWorksForCluster(clustersToClean[idx])
if err != nil {
return fmt.Errorf("%w", err)
}
}
// delete cloned placementrule, if created
if drpc.Spec.PreferredCluster == "" {
return r.deleteClonedPlacementRule(ctx, clonedPlRuleName, drpc.Namespace)
}
return nil
}
func (r *DRPlacementControlReconciler) getPlacementRules(ctx context.Context,
drpc *rmn.DRPlacementControl,
drPolicy *rmn.DRPolicy) (*plrv1.PlacementRule, *plrv1.PlacementRule, error) {
userPlRule, err := r.getUserPlacementRule(ctx, drpc)
if err != nil {
return nil, nil, err
}
if err = r.annotatePlacementRule(ctx, drpc, userPlRule); err != nil {
return nil, nil, err
}
var drpcPlRule *plrv1.PlacementRule
// create the cloned placementrule if and only if the Spec.PreferredCluster is not provided
if drpc.Spec.PreferredCluster == "" {
drpcPlRule, err = r.getOrClonePlacementRule(ctx, drpc, drPolicy, userPlRule)
if err != nil {
return nil, nil, err
}
}
return drpcPlRule, userPlRule, nil
}
func (r *DRPlacementControlReconciler) getUserPlacementRule(ctx context.Context,
drpc *rmn.DRPlacementControl) (*plrv1.PlacementRule, error) {
r.Log.Info("Getting User PlacementRule", "placement", drpc.Spec.PlacementRef)
if drpc.Spec.PlacementRef.Namespace == "" {
drpc.Spec.PlacementRef.Namespace = drpc.Namespace
}
userPlacementRule := &plrv1.PlacementRule{}
err := r.Client.Get(ctx,
types.NamespacedName{Name: drpc.Spec.PlacementRef.Name, Namespace: drpc.Spec.PlacementRef.Namespace},
userPlacementRule)
if err != nil {
return nil, fmt.Errorf("failed to get placementrule error: %w", err)
}
scName := userPlacementRule.Spec.SchedulerName
if scName != RamenScheduler {
return nil, fmt.Errorf("placementRule %s does not have the ramen scheduler. Scheduler used %s",
userPlacementRule.Name, scName)
}
if userPlacementRule.Spec.ClusterReplicas == nil || *userPlacementRule.Spec.ClusterReplicas != 1 {
r.Log.Info("User PlacementRule replica count is not set to 1, reconciliation will only" +
" schedule it to a single cluster")
}
return userPlacementRule, nil
}
func (r *DRPlacementControlReconciler) annotatePlacementRule(ctx context.Context,
drpc *rmn.DRPlacementControl, plRule *plrv1.PlacementRule) error {
if plRule.ObjectMeta.Annotations == nil {
plRule.ObjectMeta.Annotations = map[string]string{}
}
ownerName := plRule.ObjectMeta.Annotations[rmnutil.DRPCNameAnnotation]
ownerNamespace := plRule.ObjectMeta.Annotations[rmnutil.DRPCNamespaceAnnotation]
if ownerName == "" {
plRule.ObjectMeta.Annotations[rmnutil.DRPCNameAnnotation] = drpc.Name
plRule.ObjectMeta.Annotations[rmnutil.DRPCNamespaceAnnotation] = drpc.Namespace
err := r.Update(ctx, plRule)
if err != nil {
r.Log.Error(err, "Failed to update PlacementRule annotation", "PlRuleName", plRule.Name)
return fmt.Errorf("failed to update PlacementRule %s annotation '%s/%s' (%w)",
plRule.Name, rmnutil.DRPCNameAnnotation, drpc.Name, err)
}
return nil
}
if ownerName != drpc.Name || ownerNamespace != drpc.Namespace {
r.Log.Info("PlacementRule not owned by this DRPC", "PlRuleName", plRule.Name)
return fmt.Errorf("PlacementRule %s not owned by this DRPC '%s/%s'",
plRule.Name, drpc.Name, drpc.Namespace)
}
return nil
}
func (r *DRPlacementControlReconciler) getOrClonePlacementRule(ctx context.Context,
drpc *rmn.DRPlacementControl, drPolicy *rmn.DRPolicy,
userPlRule *plrv1.PlacementRule) (*plrv1.PlacementRule, error) {
r.Log.Info("Getting PlacementRule or cloning it", "placement", drpc.Spec.PlacementRef)
clonedPlRuleName := fmt.Sprintf(ClonedPlacementRuleNameFormat, drpc.Name, drpc.Namespace)
clonedPlRule, err := r.getClonedPlacementRule(ctx, clonedPlRuleName, drpc.Namespace)
if err != nil {
if errors.IsNotFound(err) {
clonedPlRule, err = r.clonePlacementRule(ctx, drPolicy, userPlRule, clonedPlRuleName)
if err != nil {
return nil, fmt.Errorf("failed to create cloned placementrule error: %w", err)
}
} else {
r.Log.Error(err, "Failed to get drpc placementRule", "name", clonedPlRuleName)
return nil, err
}
}
return clonedPlRule, nil
}
func (r *DRPlacementControlReconciler) getClonedPlacementRule(ctx context.Context,
clonedPlRuleName, namespace string) (*plrv1.PlacementRule, error) {
r.Log.Info("Getting cloned PlacementRule", "name", clonedPlRuleName)
clonedPlRule := &plrv1.PlacementRule{}
err := r.Client.Get(ctx, types.NamespacedName{Name: clonedPlRuleName, Namespace: namespace}, clonedPlRule)
if err != nil {
return nil, fmt.Errorf("failed to get placementrule error: %w", err)
}
return clonedPlRule, nil
}
func (r *DRPlacementControlReconciler) clonePlacementRule(ctx context.Context,
drPolicy *rmn.DRPolicy, userPlRule *plrv1.PlacementRule,
clonedPlRuleName string) (*plrv1.PlacementRule, error) {
r.Log.Info("Creating a clone placementRule from", "name", userPlRule.Name)
clonedPlRule := &plrv1.PlacementRule{}
userPlRule.DeepCopyInto(clonedPlRule)
clonedPlRule.Name = clonedPlRuleName
clonedPlRule.ResourceVersion = ""
clonedPlRule.Spec.SchedulerName = ""
err := r.addClusterPeersToPlacementRule(drPolicy, clonedPlRule)
if err != nil {
r.Log.Error(err, "Failed to add cluster peers to cloned placementRule", "name", clonedPlRuleName)
return nil, err
}
err = r.Create(ctx, clonedPlRule)
if err != nil {
r.Log.Error(err, "failed to clone placement rule", "name", clonedPlRule.Name)
return nil, errorswrapper.Wrap(err, "failed to create PlacementRule")
}
return clonedPlRule, nil
}
func (r *DRPlacementControlReconciler) validateSchedule(drPolicy *rmn.DRPolicy) error {
r.Log.Info("Validating schedule from DRPolicy")
if drPolicy.Spec.SchedulingInterval == "" {
return fmt.Errorf("scheduling interval empty for the DRPolicy (%s)", drPolicy.Name)
}
re := regexp.MustCompile(`^\d+[mhd]$`)
if !re.MatchString(drPolicy.Spec.SchedulingInterval) {
return fmt.Errorf("failed to match the scheduling interval string %s", drPolicy.Spec.SchedulingInterval)
}
return nil
}
func (r *DRPlacementControlReconciler) deleteClonedPlacementRule(ctx context.Context,
name, namespace string) error {
plRule, err := r.getClonedPlacementRule(ctx, name, namespace)
if err != nil {
return err
}
err = r.Client.Delete(ctx, plRule)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to delete cloned plRule %w", err)
}
return nil
}
func (r *DRPlacementControlReconciler) addClusterPeersToPlacementRule(
drPolicy *rmn.DRPolicy, plRule *plrv1.PlacementRule) error {
if len(drPolicy.Spec.DRClusterSet) == 0 {
return fmt.Errorf("DRPolicy %s is missing DR clusters", drPolicy.Name)
}
for idx := range drPolicy.Spec.DRClusterSet {
plRule.Spec.Clusters = append(plRule.Spec.Clusters, plrv1.GenericClusterReference{
Name: drPolicy.Spec.DRClusterSet[idx].Name,
})
}
r.Log.Info(fmt.Sprintf("Added clusters %v to placementRule from DRPolicy %s", plRule.Spec.Clusters, drPolicy.Name))
return nil
}
type DRPCInstance struct {
reconciler *DRPlacementControlReconciler
ctx context.Context
log logr.Logger
instance *rmn.DRPlacementControl
drPolicy *rmn.DRPolicy
needStatusUpdate bool
mcvRequestInProgress bool
userPlacementRule *plrv1.PlacementRule
drpcPlacementRule *plrv1.PlacementRule
mwu rmnutil.MWUtil
}
func (d *DRPCInstance) startProcessing() bool {
d.log.Info("Starting to process placement")
requeue := true
done, err := d.processPlacement()
if err != nil {
d.log.Info("Process placement", "error", err.Error())
return requeue
}
if d.needStatusUpdate {
if err := d.updateDRPCStatus(); err != nil {
d.log.Error(err, "failed to update status")
return requeue
}
}
requeue = !done
d.log.Info("Completed processing placement", "requeue", requeue)
return requeue
}
func (d *DRPCInstance) processPlacement() (bool, error) {
d.log.Info("Process DRPC Placement")
switch d.instance.Spec.Action {
case rmn.ActionFailover:
return d.runFailover()
case rmn.ActionFailback:
return d.runFailback()
case rmn.ActionRelocate:
return d.runRelocate()
}
// Not a failover, a failback, or a relocation. Must be an initial deployment.
return d.runInitialDeployment()
}
func (d *DRPCInstance) runInitialDeployment() (bool, error) {
d.log.Info("Running initial deployment")
const done = true
// 1. Check if the user wants to use the preferredCluster
homeCluster := ""
homeClusterNamespace := ""
if d.instance.Spec.PreferredCluster != "" {
homeCluster = d.instance.Spec.PreferredCluster
homeClusterNamespace = homeCluster
}
if homeCluster == "" && d.drpcPlacementRule != nil && len(d.drpcPlacementRule.Status.Decisions) != 0 {
homeCluster = d.drpcPlacementRule.Status.Decisions[0].ClusterName
homeClusterNamespace = d.drpcPlacementRule.Status.Decisions[0].ClusterNamespace
}
if homeCluster == "" {
return !done, fmt.Errorf("PreferredCluster not set and unable to find home cluster in DRPCPlacementRule (%v)",
d.drpcPlacementRule)
}
// We are done if the initial deployment is already complete
if len(d.userPlacementRule.Status.Decisions) > 0 {
d.log.Info(fmt.Sprintf("Already deployed to %s. Last state %s",
d.userPlacementRule.Status.Decisions[0].ClusterName, d.getLastDRState()))
return done, nil
}
// Make sure we record the state that we are deploying
d.setDRState(rmn.Deploying)
// Create VRG first, to leverage user PlacementRule decision to skip placement and move to cleanup
err := d.createVRGManifestWork(homeCluster)
if err != nil {
return false, err
}
// We have a home cluster
err = d.updateUserPlacementRule(homeCluster, homeClusterNamespace)
if err != nil {
return !done, err
}
// All good, update the preferred decision and state
if len(d.userPlacementRule.Status.Decisions) > 0 {
d.instance.Status.PreferredDecision = d.userPlacementRule.Status.Decisions[0]
}
d.advanceToNextDRState()
d.log.Info(fmt.Sprintf("DRPC (%+v)", d.instance))
return done, nil
}
//nolint:exhaustive
func setMetricsTimerFromDRState(stateDR rmn.DRState, stateTimer timerState) {
switch stateDR {
case rmn.FailingOver:
setMetricsTimer(&failoverTime, stateTimer, stateDR)
case rmn.FailingBack:
setMetricsTimer(&failbackTime, stateTimer, stateDR)
case rmn.Relocating:
setMetricsTimer(&relocateTime, stateTimer, stateDR)
// case rmn.Deploying:
// TODO: setMetricsTimer(&deployTime, stateTimer, stateDR)
case rmn.FailedBack:
fallthrough
case rmn.FailedOver:
fallthrough
case rmn.Deployed:
fallthrough
case rmn.Relocated:
fallthrough
default:
// not supported
}
}
func setMetricsTimer(wrapper *timerWrapper, desiredTimerState timerState, reconcileState rmn.DRState) {
switch desiredTimerState {
case timerStart:
if reconcileState != wrapper.reconcileState {
wrapper.reconcileState = reconcileState
wrapper.timer = *prometheus.NewTimer(prometheus.ObserverFunc(wrapper.gauge.Set))
}
case timerStop:
wrapper.timer.ObserveDuration()
wrapper.histogram.Observe(wrapper.timer.ObserveDuration().Seconds()) // add timer to histogram
wrapper.reconcileState = rmn.Deployed // "reserved" value
}
}
//
// runFailover:
// 1. If failoverCluster empty, then fail it and we are done
// 2. If already failed over, then ensure clean up and we are done
// 3. Set VRG for the preferredCluster to secondary
// 4. Restore PV to failoverCluster
// 5. Update UserPlacementRule decision to failoverCluster
// 6. Create VRG for the failoverCluster as Primary
// 7. Update DRPC status
// 8. Delete VRG MW from preferredCluster once the VRG state has changed to Secondary
//
func (d *DRPCInstance) runFailover() (bool, error) {
d.log.Info("Entering runFailover", "state", d.getLastDRState())
const done = true
if d.isFailbackInProgress() || d.isRelocationInProgress() {
return done, fmt.Errorf("invalid state %s for the selected action %v",
d.getLastDRState(), d.instance.Spec.Action)
}
// We are done if empty
if d.instance.Spec.FailoverCluster == "" {
return done, fmt.Errorf("failover cluster not set. FailoverCluster is a mandatory field")
}
// We are done if we have already failed over
if len(d.userPlacementRule.Status.Decisions) > 0 {
if d.instance.Spec.FailoverCluster == d.userPlacementRule.Status.Decisions[0].ClusterName {
d.log.Info(fmt.Sprintf("Already failed over to %s. Last state %s",
d.userPlacementRule.Status.Decisions[0].ClusterName, d.getLastDRState()))
err := d.ensureCleanup(d.instance.Spec.FailoverCluster)
if err != nil {
return !done, err
}
return done, nil
}
}
return d.switchToFailoverCluster()
}
func (d *DRPCInstance) switchToFailoverCluster() (bool, error) {
const done = true
// Make sure we record the state that we are failing over
d.setDRState(rmn.FailingOver)
setMetricsTimerFromDRState(rmn.FailingOver, timerStart)
// Save the current home cluster
curHomeCluster := d.getCurrentHomeClusterName()
if curHomeCluster == "" {
d.log.Info("Invalid Failover request. Current home cluster does not exists")
return done, fmt.Errorf("failover requested on invalid state %v", d.instance.Status)
}
// Set VRG in the failed cluster (preferred cluster) to secondary
err := d.updateVRGStateToSecondary(curHomeCluster)
if err != nil {
d.log.Error(err, "Failed to update existing VRG manifestwork to secondary")
return !done, err
}
newHomeCluster := d.instance.Spec.FailoverCluster
const restorePVs = true
result, err := d.runPlacementTask(newHomeCluster, "", restorePVs)
if err != nil {
return !done, err
}
d.advanceToNextDRState()
d.log.Info("Exiting runFailover", "state", d.getLastDRState())
setMetricsTimerFromDRState(rmn.FailingOver, timerStop)
return result, nil
}
func (d *DRPCInstance) getCurrentHomeClusterName() string {
curHomeCluster := ""
if len(d.userPlacementRule.Status.Decisions) > 0 {
curHomeCluster = d.userPlacementRule.Status.Decisions[0].ClusterName
}
if curHomeCluster == "" {
curHomeCluster = d.instance.Status.PreferredDecision.ClusterName
}
return curHomeCluster
}
//
// runFailback:
// 1. If preferredCluster not set, get it from DRPC status
// 2. If still empty, fail it
// 3. If the preferredCluster is the failoverCluster, fail it
// 4. If preferredCluster is the same as the userPlacementRule decision, do nothing
// 5. Clear the user PlacementRule decision
// 6. Update VRG.Spec.ReplicationState to secondary for all DR clusters
// 7. Ensure that the VRG status reflects the previous step. If not, then wait.
// 8. Restore PV to preferredCluster
// 9. Update UserPlacementRule decision to preferredCluster
// 10. Create VRG for the preferredCluster as Primary
// 11. Update DRPC status
// 12. Delete VRG MW from failoverCluster once the VRG state has changed to Secondary
//
func (d *DRPCInstance) runFailback() (bool, error) {
d.log.Info("Entering runFailback", "state", d.getLastDRState())
const done = true
if d.isFailoverInProgress() || d.isRelocationInProgress() {
return done, fmt.Errorf("invalid state %s for the selected action %v",
d.getLastDRState(), d.instance.Spec.Action)
}
return d.switchToPreferredCluster(rmn.FailingBack)
}
func (d *DRPCInstance) runRelocate() (bool, error) {
d.log.Info("Entering runRelocate", "state", d.getLastDRState())
const done = true
if d.isFailbackInProgress() || d.isFailoverInProgress() {
return done, fmt.Errorf("invalid state %s for the selected action %v",
d.getLastDRState(), d.instance.Spec.Action)
}
return d.switchToPreferredCluster(rmn.Relocating)
}
func (d *DRPCInstance) switchToPreferredCluster(drState rmn.DRState) (bool, error) {
const done = true