Skip to content

Commit

Permalink
Merge pull request #8379 from leseb/manual-peer-bootstrap
Browse files Browse the repository at this point in the history
ceph: add an rbd-mirror bootstrap token on cluster creation
  • Loading branch information
leseb committed Aug 2, 2021
2 parents ff7535a + 630c2f6 commit 1c8caa2
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 37 deletions.
54 changes: 45 additions & 9 deletions .github/workflows/canary-integration-test.yml
Expand Up @@ -755,41 +755,77 @@ jobs:
yq w -i pool-test.yaml spec.mirroring.enabled true
yq w -i pool-test.yaml spec.mirroring.mode image
kubectl create -f pool-test.yaml
timeout 60 sh -c 'until [ "$(kubectl -n rook-ceph get cephblockpool replicapool -o jsonpath='{.status.phase}'|grep -c "Ready")" -eq 1 ]; do echo "waiting for pool to created" && sleep 1; done'
timeout 60 sh -c 'until [ "$(kubectl -n rook-ceph get cephblockpool replicapool -o jsonpath='{.status.phase}'|grep -c "Ready")" -eq 1 ]; do echo "waiting for pool replicapool to created on cluster 1" && sleep 1; done'
- name: create replicated mirrored pool 2 on cluster 1
run: |
cd cluster/examples/kubernetes/ceph/
yq w -i pool-test.yaml metadata.name replicapool2
kubectl create -f pool-test.yaml
timeout 60 sh -c 'until [ "$(kubectl -n rook-ceph get cephblockpool replicapool2 -o jsonpath='{.status.phase}'|grep -c "Ready")" -eq 1 ]; do echo "waiting for pool replicapool2 to created on cluster 2" && sleep 1; done'
yq w -i pool-test.yaml metadata.name replicapool
- name: create replicated mirrored pool on cluster 2
run: |
cd cluster/examples/kubernetes/ceph/
yq w -i pool-test.yaml metadata.namespace rook-ceph-secondary
kubectl create -f pool-test.yaml
timeout 60 sh -c 'until [ "$(kubectl -n rook-ceph-secondary get cephblockpool replicapool -o jsonpath='{.status.phase}'|grep -c "Ready")" -eq 1 ]; do echo "waiting for pool to created" && sleep 1; done'
timeout 60 sh -c 'until [ "$(kubectl -n rook-ceph-secondary get cephblockpool replicapool -o jsonpath='{.status.phase}'|grep -c "Ready")" -eq 1 ]; do echo "waiting for pool replicapool to created on cluster 1" && sleep 1; done'
- name: create replicated mirrored pool 2 on cluster 2
run: |
cd cluster/examples/kubernetes/ceph/
yq w -i pool-test.yaml metadata.name replicapool2
kubectl create -f pool-test.yaml
timeout 60 sh -c 'until [ "$(kubectl -n rook-ceph-secondary get cephblockpool replicapool -o jsonpath='{.status.phase}'|grep -c "Ready")" -eq 1 ]; do echo "waiting for pool replicapool2 to created on cluster 2" && sleep 1; done'
- name: create image in the pool
- name: create images in the pools
run: |
kubectl exec -n rook-ceph deploy/rook-ceph-tools -ti -- rbd -p replicapool create test -s 1G
kubectl exec -n rook-ceph deploy/rook-ceph-tools -t -- rbd mirror image enable replicapool/test snapshot
kubectl exec -n rook-ceph deploy/rook-ceph-tools -t -- rbd -p replicapool info test
kubectl exec -n rook-ceph deploy/rook-ceph-tools -ti -- rbd -p replicapool2 create test -s 1G
kubectl exec -n rook-ceph deploy/rook-ceph-tools -t -- rbd mirror image enable replicapool2/test snapshot
kubectl exec -n rook-ceph deploy/rook-ceph-tools -t -- rbd -p replicapool2 info test
- name: copy block mirror peer secret into the other cluster
- name: copy block mirror peer secret into the other cluster for replicapool
run: |
kubectl -n rook-ceph get secret pool-peer-token-replicapool -o yaml |\
sed 's/namespace: rook-ceph/namespace: rook-ceph-secondary/g; s/name: pool-peer-token-replicapool/name: pool-peer-token-replicapool-config/g' |\
kubectl create --namespace=rook-ceph-secondary -f -
- name: add block mirror peer secret to the other cluster
- name: copy block mirror peer secret into the other cluster for replicapool2 (using cluster global peer)
run: |
kubectl -n rook-ceph get secret cluster-peer-token-my-cluster -o yaml |\
sed 's/namespace: rook-ceph/namespace: rook-ceph-secondary/g; s/name: cluster-peer-token-my-cluster/name: cluster-peer-token-my-cluster-config/g' |\
kubectl create --namespace=rook-ceph-secondary -f -
- name: add block mirror peer secret to the other cluster for replicapool
run: |
kubectl -n rook-ceph-secondary patch cephblockpool replicapool --type merge -p '{"spec":{"mirroring":{"peers": {"secretNames": ["pool-peer-token-replicapool-config"]}}}}'
- name: verify image has been mirrored
- name: add block mirror peer secret to the other cluster for replicapool2 (using cluster global peer)
run: |
kubectl -n rook-ceph-secondary patch cephblockpool replicapool2 --type merge -p '{"spec":{"mirroring":{"peers": {"secretNames": ["cluster-peer-token-my-cluster-config"]}}}}'
- name: verify image has been mirrored for replicapool
run: |
# let's wait a bit for the image to be present
timeout 120 sh -c 'until [ "$(kubectl exec -n rook-ceph-secondary deploy/rook-ceph-tools -t -- rbd -p replicapool ls|grep -c test)" -eq 1 ]; do echo "waiting for image to be mirrored in pool replicapool" && sleep 1; done'
- name: verify image has been mirrored for replicapool2
run: |
# let's wait a bit for the image to be present
timeout 120 sh -c 'until [ "$(kubectl exec -n rook-ceph-secondary deploy/rook-ceph-tools -t -- rbd -p replicapool ls|grep -c test)" -eq 1 ]; do echo "waiting for image to be mirrored" && sleep 1; done'
timeout 120 sh -c 'until [ "$(kubectl exec -n rook-ceph-secondary deploy/rook-ceph-tools -t -- rbd -p replicapool2 ls|grep -c test)" -eq 1 ]; do echo "waiting for image to be mirrored in pool replicapool2" && sleep 1; done'
- name: display cephblockpool and image status
run: |
timeout 80 sh -c 'until [ "$(kubectl -n rook-ceph-secondary get cephblockpool replicapool -o jsonpath='{.status.mirroringStatus.summary.daemon_health}'|grep -c OK)" -eq 1 ]; do echo "waiting for mirroring status to be updated" && sleep 1; done'
kubectl -n rook-ceph-secondary get cephblockpool -o yaml
timeout 80 sh -c 'until [ "$(kubectl -n rook-ceph-secondary get cephblockpool replicapool -o jsonpath='{.status.mirroringStatus.summary.daemon_health}'|grep -c OK)" -eq 1 ]; do echo "waiting for mirroring status to be updated in replicapool" && sleep 1; done'
timeout 80 sh -c 'until [ "$(kubectl -n rook-ceph-secondary get cephblockpool replicapool2 -o jsonpath='{.status.mirroringStatus.summary.daemon_health}'|grep -c OK)" -eq 1 ]; do echo "waiting for mirroring status to be updated in replicapool2" && sleep 1; done'
kubectl -n rook-ceph-secondary get cephblockpool replicapool -o yaml
kubectl -n rook-ceph-secondary get cephblockpool replicapool2 -o yaml
kubectl exec -n rook-ceph deploy/rook-ceph-tools -t -- rbd -p replicapool info test
kubectl exec -n rook-ceph deploy/rook-ceph-tools -t -- rbd -p replicapool2 info test
- name: create replicated mirrored filesystem on cluster 1
run: |
Expand Down
75 changes: 75 additions & 0 deletions pkg/daemon/ceph/client/mirror.go
Expand Up @@ -17,21 +17,41 @@ limitations under the License.
package client

import (
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"

"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/util"
)

// PeerToken is the content of the peer token
type PeerToken struct {
ClusterFSID string `json:"fsid"`
ClientID string `json:"client_id"`
Key string `json:"key"`
MonHost string `json:"mon_host"`
// These fields are added by Rook and NOT part of the output of client.CreateRBDMirrorBootstrapPeer()
PoolID int `json:"pool_id"`
Namespace string `json:"namespace"`
}

var (
rbdMirrorPeerCaps = []string{"mon", "profile rbd-mirror-peer", "osd", "profile rbd"}
rbdMirrorPeerKeyringID = "rbd-mirror-peer"
)

// ImportRBDMirrorBootstrapPeer add a mirror peer in the rbd-mirror configuration
func ImportRBDMirrorBootstrapPeer(context *clusterd.Context, clusterInfo *ClusterInfo, poolName, direction string, token []byte) error {
logger.Infof("add rbd-mirror bootstrap peer token for pool %q", poolName)

// Token file
// TODO: use mktemp?
tokenFilePath := fmt.Sprintf("/tmp/rbd-mirror-token-%s", poolName)

// Write token into a file
Expand Down Expand Up @@ -307,3 +327,58 @@ func ListSnapshotSchedulesRecursively(context *clusterd.Context, clusterInfo *Cl
logger.Debugf("successfully recursively listed snapshot schedules for pool %q", poolName)
return snapshotSchedulesRecursive, nil
}

/* CreateRBDMirrorBootstrapPeerWithoutPool creates a bootstrap peer for the current cluster
It creates the cephx user for the remote cluster to use with all the necessary details
This function is handy on scenarios where no pools have been created yet but replication communication is required (connecting peers)
It essentially sits above CreateRBDMirrorBootstrapPeer()
and is a cluster-wide option in the scenario where all the pools will be mirrored to the same remote cluster
So the scenario looks like:
1) Create the cephx ID on the source cluster
2) Enable a source pool for mirroring - at any time, we just don't know when
rbd --cluster site-a mirror pool enable image-pool image
3) Copy the key details over to the other cluster (non-ceph workflow)
4) Enable destination pool for mirroring
rbd --cluster site-b mirror pool enable image-pool image
5) Add the peer details to the destination pool
6) Repeat the steps flipping source and destination to enable
bi-directional mirroring
*/
func CreateRBDMirrorBootstrapPeerWithoutPool(context *clusterd.Context, clusterInfo *ClusterInfo) ([]byte, error) {
fullClientName := getQualifiedUser(rbdMirrorPeerKeyringID)
logger.Infof("create rbd-mirror bootstrap peer token %q", fullClientName)
key, err := AuthGetOrCreateKey(context, clusterInfo, fullClientName, rbdMirrorPeerCaps)
if err != nil {
return nil, errors.Wrapf(err, "failed to create rbd-mirror peer key %q", fullClientName)
}
logger.Infof("successfully created rbd-mirror bootstrap peer token for cluster %q", clusterInfo.NamespacedName().Name)

mons := util.NewSet()
for _, mon := range clusterInfo.Monitors {
mons.Add(mon.Endpoint)
}

peerToken := PeerToken{
ClusterFSID: clusterInfo.FSID,
ClientID: rbdMirrorPeerKeyringID,
Key: key,
MonHost: strings.Join(mons.ToSlice(), ","),
Namespace: clusterInfo.Namespace,
}

// Marshal the Go type back to JSON
decodedTokenBackToJSON, err := json.Marshal(peerToken)
if err != nil {
return nil, errors.Wrap(err, "failed to encode peer token to json")
}

// Return the base64 encoded token
return []byte(base64.StdEncoding.EncodeToString(decodedTokenBackToJSON)), nil
}
1 change: 1 addition & 0 deletions pkg/daemon/ceph/client/test/info.go
Expand Up @@ -62,5 +62,6 @@ func CreateTestClusterInfo(monCount int) *client.ClusterInfo {
Endpoint: fmt.Sprintf("1.2.3.%d:6789", (i + 1)),
}
}
c.SetName(c.Namespace)
return c
}
7 changes: 7 additions & 0 deletions pkg/operator/ceph/cluster/cluster.go
Expand Up @@ -214,6 +214,7 @@ func (c *ClusterController) initializeCluster(cluster *cluster) error {

// Populate ClusterInfo with the last value
cluster.mons.ClusterInfo = cluster.ClusterInfo
cluster.mons.ClusterInfo.SetName(c.namespacedName.Name)

// Start the monitoring if not already started
c.configureCephMonitoring(cluster, cluster.ClusterInfo)
Expand Down Expand Up @@ -572,5 +573,11 @@ func (c *cluster) postMonStartupActions() error {
}
}

// Create cluster-wide RBD bootstrap peer token
_, err = controller.CreateBootstrapPeerSecret(c.context, c.ClusterInfo, &cephv1.CephCluster{ObjectMeta: metav1.ObjectMeta{Name: c.namespacedName.Name, Namespace: c.Namespace}}, c.ownerInfo)
if err != nil {
return errors.Wrap(err, "failed to create cluster rbd bootstrap peer token")
}

return nil
}
1 change: 1 addition & 0 deletions pkg/operator/ceph/cluster/controller.go
Expand Up @@ -367,6 +367,7 @@ func (c *ClusterController) reconcileCephCluster(clusterObj *cephv1.CephCluster,
// It's a new cluster so let's populate the struct
cluster = newCluster(clusterObj, c.context, c.csiConfigMutex, ownerInfo)
}
cluster.namespacedName = c.namespacedName

// Pass down the client to interact with Kubernetes objects
// This will be used later down by spec code to create objects like deployment, services etc
Expand Down
7 changes: 7 additions & 0 deletions pkg/operator/ceph/cluster/mon/health.go
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
cephutil "github.com/rook/rook/pkg/daemon/ceph/util"
"github.com/rook/rook/pkg/operator/ceph/controller"
Expand Down Expand Up @@ -497,6 +498,12 @@ func (c *Cluster) removeMon(daemonName string) error {
return errors.Wrapf(err, "failed to save mon config after failing over mon %s", daemonName)
}

// Update cluster-wide RBD bootstrap peer token since Monitors have changed
_, err := controller.CreateBootstrapPeerSecret(c.context, c.ClusterInfo, &cephv1.CephCluster{ObjectMeta: metav1.ObjectMeta{Name: c.ClusterInfo.NamespacedName().Name, Namespace: c.Namespace}}, c.ownerInfo)
if err != nil {
return errors.Wrap(err, "failed to update cluster rbd bootstrap peer token")
}

return nil
}

Expand Down
20 changes: 19 additions & 1 deletion pkg/operator/ceph/cluster/mon/health_test.go
Expand Up @@ -49,6 +49,10 @@ func TestCheckHealth(t *testing.T) {

executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
logger.Infof("executing command: %s %+v", command, args)
if args[0] == "auth" && args[1] == "get-or-create-key" {
return "{\"key\":\"mysecurekey\"}", nil
}
return clienttest.MonInQuorumResponse(), nil
},
}
Expand Down Expand Up @@ -154,7 +158,13 @@ func TestEvictMonOnSameNode(t *testing.T) {
clientset := test.New(t, 1)
configDir, _ := ioutil.TempDir("", "")
defer os.RemoveAll(configDir)
context := &clusterd.Context{Clientset: clientset, ConfigDir: configDir, Executor: &exectest.MockExecutor{}, RequestCancelOrchestration: abool.New()}
executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
logger.Infof("executing command: %s %+v", command, args)
return "{\"key\":\"mysecurekey\"}", nil
},
}
context := &clusterd.Context{Clientset: clientset, ConfigDir: configDir, Executor: executor, RequestCancelOrchestration: abool.New()}
ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef()
c := New(context, "ns", cephv1.ClusterSpec{}, ownerInfo, &sync.Mutex{})
setCommonMonProperties(c, 1, cephv1.MonSpec{Count: 0}, "myversion")
Expand Down Expand Up @@ -245,6 +255,10 @@ func TestCheckHealthNotFound(t *testing.T) {

executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
logger.Infof("executing command: %s %+v", command, args)
if args[0] == "auth" && args[1] == "get-or-create-key" {
return "{\"key\":\"mysecurekey\"}", nil
}
return clienttest.MonInQuorumResponse(), nil
},
}
Expand Down Expand Up @@ -304,6 +318,10 @@ func TestAddRemoveMons(t *testing.T) {
monQuorumResponse := clienttest.MonInQuorumResponse()
executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
logger.Infof("executing command: %s %+v", command, args)
if args[0] == "auth" && args[1] == "get-or-create-key" {
return "{\"key\":\"mysecurekey\"}", nil
}
return monQuorumResponse, nil
},
}
Expand Down

0 comments on commit 1c8caa2

Please sign in to comment.