Skip to content

Commit

Permalink
mds: change init sequence
Browse files Browse the repository at this point in the history
The MDS core team suggested with deploy the MDS daemon first and then do
the filesystem creation and configuration. Reversing the sequence lets
us avoid spurious FS_DOWN warnings when creating the filesystem.

Closes: #8745
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Sep 20, 2021
1 parent acb64e9 commit 3f4a47e
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 147 deletions.
36 changes: 10 additions & 26 deletions pkg/operator/ceph/file/filesystem.go
Expand Up @@ -18,10 +18,8 @@ package file

import (
"fmt"
"syscall"

"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util/exec"

"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
Expand Down Expand Up @@ -51,37 +49,31 @@ func createFilesystem(
ownerInfo *k8sutil.OwnerInfo,
dataDirHostPath string,
) error {
logger.Infof("start running mdses for filesystem %q", fs.Name)
c := mds.NewCluster(clusterInfo, context, clusterSpec, fs, ownerInfo, dataDirHostPath)
if err := c.Start(); err != nil {
return err
}

if len(fs.Spec.DataPools) != 0 {
f := newFS(fs.Name, fs.Namespace)
if err := f.doFilesystemCreate(context, clusterInfo, clusterSpec, fs.Spec); err != nil {
return errors.Wrapf(err, "failed to create filesystem %q", fs.Name)
}
}

filesystem, err := cephclient.GetFilesystem(context, clusterInfo, fs.Name)
if err != nil {
return errors.Wrapf(err, "failed to get filesystem %q", fs.Name)
}

if fs.Spec.MetadataServer.ActiveStandby {
if err = cephclient.AllowStandbyReplay(context, clusterInfo, fs.Name, fs.Spec.MetadataServer.ActiveStandby); err != nil {
if err := cephclient.AllowStandbyReplay(context, clusterInfo, fs.Name, fs.Spec.MetadataServer.ActiveStandby); err != nil {
return errors.Wrapf(err, "failed to set allow_standby_replay to filesystem %q", fs.Name)
}
}

// set the number of active mds instances
if fs.Spec.MetadataServer.ActiveCount > 1 {
if err = cephclient.SetNumMDSRanks(context, clusterInfo, fs.Name, fs.Spec.MetadataServer.ActiveCount); err != nil {
if err := cephclient.SetNumMDSRanks(context, clusterInfo, fs.Name, fs.Spec.MetadataServer.ActiveCount); err != nil {
logger.Warningf("failed setting active mds count to %d. %v", fs.Spec.MetadataServer.ActiveCount, err)
}
}

logger.Infof("start running mdses for filesystem %q", fs.Name)
c := mds.NewCluster(clusterInfo, context, clusterSpec, fs, filesystem, ownerInfo, dataDirHostPath)
if err := c.Start(); err != nil {
return err
}

return nil
}

Expand All @@ -94,23 +86,15 @@ func deleteFilesystem(
ownerInfo *k8sutil.OwnerInfo,
dataDirHostPath string,
) error {
filesystem, err := cephclient.GetFilesystem(context, clusterInfo, fs.Name)
if err != nil {
if code, ok := exec.ExitStatus(err); ok && code == int(syscall.ENOENT) {
// If we're deleting the filesystem anyway, ignore the error that the filesystem doesn't exist
return nil
}
return errors.Wrapf(err, "failed to get filesystem %q", fs.Name)
}
c := mds.NewCluster(clusterInfo, context, clusterSpec, fs, filesystem, ownerInfo, dataDirHostPath)
c := mds.NewCluster(clusterInfo, context, clusterSpec, fs, ownerInfo, dataDirHostPath)

// Delete mds CephX keys and configuration in centralized mon database
replicas := fs.Spec.MetadataServer.ActiveCount * 2
for i := 0; i < int(replicas); i++ {
daemonLetterID := k8sutil.IndexToName(i)
daemonName := fmt.Sprintf("%s-%s", fs.Name, daemonLetterID)

err = c.DeleteMdsCephObjects(daemonName)
err := c.DeleteMdsCephObjects(daemonName)
if err != nil {
return errors.Wrapf(err, "failed to delete mds ceph objects for filesystem %q", fs.Name)
}
Expand Down
188 changes: 76 additions & 112 deletions pkg/operator/ceph/file/filesystem_test.go
Expand Up @@ -95,7 +95,7 @@ func isBasePoolOperation(fsName, command string, args []string) bool {
return false
}

func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool) *exectest.MockExecutor {
func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool, createDataOnePoolCount, addDataOnePoolCount *int) *exectest.MockExecutor {
mdsmap := cephclient.CephFilesystemDetails{
ID: 0,
MDSMap: cephclient.MDSMap{
Expand Down Expand Up @@ -160,6 +160,16 @@ func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool) *exectest.
return "", nil
} else if contains(args, "flag") && contains(args, "enable_multiple") {
return "", nil
} else if reflect.DeepEqual(args[0:5], []string{"osd", "crush", "rule", "create-replicated", fsName + "-data1"}) {
return "", nil
} else if reflect.DeepEqual(args[0:4], []string{"osd", "pool", "create", fsName + "-data1"}) {
*createDataOnePoolCount++
return "", nil
} else if reflect.DeepEqual(args[0:6], []string{"osd", "pool", "set", fsName + "-data1", "size", "1"}) {
return "", nil
} else if reflect.DeepEqual(args[0:4], []string{"fs", "add_data_pool", fsName, fsName + "-data1"}) {
*addDataOnePoolCount++
return "", nil
} else if contains(args, "versions") {
versionStr, _ := json.Marshal(
map[string]map[string]int{
Expand Down Expand Up @@ -213,6 +223,16 @@ func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool) *exectest.
return "", nil
} else if contains(args, "config") && contains(args, "get") {
return "{}", nil
} else if reflect.DeepEqual(args[0:5], []string{"osd", "crush", "rule", "create-replicated", fsName + "-data1"}) {
return "", nil
} else if reflect.DeepEqual(args[0:4], []string{"osd", "pool", "create", fsName + "-data1"}) {
*createDataOnePoolCount++
return "", nil
} else if reflect.DeepEqual(args[0:6], []string{"osd", "pool", "set", fsName + "-data1", "size", "1"}) {
return "", nil
} else if reflect.DeepEqual(args[0:4], []string{"fs", "add_data_pool", fsName, fsName + "-data1"}) {
*addDataOnePoolCount++
return "", nil
} else if contains(args, "versions") {
versionStr, _ := json.Marshal(
map[string]map[string]int{
Expand Down Expand Up @@ -257,9 +277,10 @@ func TestCreateFilesystem(t *testing.T) {
var deploymentsUpdated *[]*apps.Deployment
mds.UpdateDeploymentAndWait, deploymentsUpdated = testopk8s.UpdateDeploymentAndWaitStub()
configDir, _ := ioutil.TempDir("", "")

fsName := "myfs"
executor := fsExecutor(t, fsName, configDir, false)
addDataOnePoolCount := 0
createDataOnePoolCount := 0
executor := fsExecutor(t, fsName, configDir, false, &createDataOnePoolCount, &addDataOnePoolCount)
defer os.RemoveAll(configDir)
clientset := testop.New(t, 1)
context := &clusterd.Context{
Expand All @@ -268,117 +289,58 @@ func TestCreateFilesystem(t *testing.T) {
Clientset: clientset}
fs := fsTest(fsName)
clusterInfo := &cephclient.ClusterInfo{FSID: "myfsid", CephVersion: version.Octopus, Context: ctx}

// start a basic cluster
ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef()
err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/")
assert.Nil(t, err)
validateStart(ctx, t, context, fs)
assert.ElementsMatch(t, []string{}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated))
testopk8s.ClearDeploymentsUpdated(deploymentsUpdated)

// starting again should be a no-op
err = createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/")
assert.Nil(t, err)
validateStart(ctx, t, context, fs)
assert.ElementsMatch(t, []string{fmt.Sprintf("rook-ceph-mds-%s-a", fsName), fmt.Sprintf("rook-ceph-mds-%s-b", fsName)}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated))
testopk8s.ClearDeploymentsUpdated(deploymentsUpdated)

// Increasing the number of data pools should be successful.
createDataOnePoolCount := 0
addDataOnePoolCount := 0
createdFsResponse := fmt.Sprintf(`{"fs_name": "%s", "metadata_pool": 2, "data_pools":[3]}`, fsName)
executor = &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
if contains(args, "fs") && contains(args, "get") {
return createdFsResponse, nil
} else if isBasePoolOperation(fsName, command, args) {
return "", nil
} else if reflect.DeepEqual(args[0:4], []string{"osd", "pool", "create", fsName + "-data1"}) {
createDataOnePoolCount++
return "", nil
} else if reflect.DeepEqual(args[0:4], []string{"fs", "add_data_pool", fsName, fsName + "-data1"}) {
addDataOnePoolCount++
return "", nil
} else if contains(args, "set") && contains(args, "max_mds") {
return "", nil
} else if contains(args, "auth") && contains(args, "get-or-create-key") {
return "{\"key\":\"mysecurekey\"}", nil
} else if reflect.DeepEqual(args[0:5], []string{"osd", "crush", "rule", "create-replicated", fsName + "-data1"}) {
return "", nil
} else if reflect.DeepEqual(args[0:6], []string{"osd", "pool", "set", fsName + "-data1", "size", "1"}) {
return "", nil
} else if args[0] == "config" && args[1] == "set" {
return "", nil
} else if contains(args, "versions") {
versionStr, _ := json.Marshal(
map[string]map[string]int{
"mds": {
"ceph version 16.0.0-4-g2f728b9 (2f728b952cf293dd7f809ad8a0f5b5d040c43010) pacific (stable)": 2,
},
})
return string(versionStr), nil
}
assert.Fail(t, fmt.Sprintf("Unexpected command: %v", args))
return "", nil
},
}
context = &clusterd.Context{
Executor: executor,
ConfigDir: configDir,
Clientset: clientset}
fs.Spec.DataPools = append(fs.Spec.DataPools, cephv1.PoolSpec{Replicated: cephv1.ReplicatedSpec{Size: 1, RequireSafeReplicaSize: false}})

err = createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/")
assert.Nil(t, err)
validateStart(ctx, t, context, fs)
assert.ElementsMatch(t, []string{fmt.Sprintf("rook-ceph-mds-%s-a", fsName), fmt.Sprintf("rook-ceph-mds-%s-b", fsName)}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated))
assert.Equal(t, 1, createDataOnePoolCount)
assert.Equal(t, 1, addDataOnePoolCount)
testopk8s.ClearDeploymentsUpdated(deploymentsUpdated)

// Test multiple filesystem creation
// Output to check multiple filesystem creation
fses := `[{"name":"myfs","metadata_pool":"myfs-metadata","metadata_pool_id":4,"data_pool_ids":[5],"data_pools":["myfs-data0"]},{"name":"myfs2","metadata_pool":"myfs2-metadata","metadata_pool_id":6,"data_pool_ids":[7],"data_pools":["myfs2-data0"]},{"name":"leseb","metadata_pool":"cephfs.leseb.meta","metadata_pool_id":8,"data_pool_ids":[9],"data_pools":["cephfs.leseb.data"]}]`
executorMultiFS := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
if contains(args, "ls") {
return fses, nil
} else if contains(args, "versions") {
versionStr, _ := json.Marshal(
map[string]map[string]int{
"mds": {
"ceph version 16.0.0-4-g2f728b9 (2f728b952cf293dd7f809ad8a0f5b5d040c43010) pacific (stable)": 2,
},
})
return string(versionStr), nil
}
return "{\"key\":\"mysecurekey\"}", errors.New("multiple fs")
},
}
context = &clusterd.Context{
Executor: executorMultiFS,
ConfigDir: configDir,
Clientset: clientset,
}

// Create another filesystem which should fail
err = createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}, "/var/lib/rook/")
assert.Error(t, err)
assert.Equal(t, fmt.Sprintf("failed to create filesystem %q: multiple filesystems are only supported as of ceph pacific", fsName), err.Error())
t.Run("start basic filesystem", func(t *testing.T) {
// start a basic cluster
err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/")
assert.Nil(t, err)
validateStart(ctx, t, context, fs)
assert.ElementsMatch(t, []string{}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated))
testopk8s.ClearDeploymentsUpdated(deploymentsUpdated)
})

t.Run("start again should no-op", func(t *testing.T) {
err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/")
assert.Nil(t, err)
validateStart(ctx, t, context, fs)
assert.ElementsMatch(t, []string{fmt.Sprintf("rook-ceph-mds-%s-a", fsName), fmt.Sprintf("rook-ceph-mds-%s-b", fsName)}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated))
testopk8s.ClearDeploymentsUpdated(deploymentsUpdated)
})

t.Run("increasing the number of data pools should be successful.", func(t *testing.T) {
context = &clusterd.Context{
Executor: executor,
ConfigDir: configDir,
Clientset: clientset}
fs.Spec.DataPools = append(fs.Spec.DataPools, cephv1.PoolSpec{Replicated: cephv1.ReplicatedSpec{Size: 1, RequireSafeReplicaSize: false}})
err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/")
assert.Nil(t, err)
validateStart(ctx, t, context, fs)
assert.ElementsMatch(t, []string{fmt.Sprintf("rook-ceph-mds-%s-a", fsName), fmt.Sprintf("rook-ceph-mds-%s-b", fsName)}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated))
assert.Equal(t, 1, createDataOnePoolCount)
assert.Equal(t, 1, addDataOnePoolCount)
testopk8s.ClearDeploymentsUpdated(deploymentsUpdated)
})

t.Run("multiple filesystem creation", func(t *testing.T) {
context = &clusterd.Context{
Executor: fsExecutor(t, fsName, configDir, true, &createDataOnePoolCount, &addDataOnePoolCount),
ConfigDir: configDir,
Clientset: clientset,
}

// It works since the Ceph version is Pacific
fsName = "myfs3"
fs = fsTest(fsName)
executor = fsExecutor(t, fsName, configDir, true)
clusterInfo.CephVersion = version.Pacific
context = &clusterd.Context{
Executor: executor,
ConfigDir: configDir,
Clientset: clientset,
}
err = createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/")
assert.NoError(t, err)
// Create another filesystem which should fail
err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}, "/var/lib/rook/")
assert.Error(t, err)
assert.Equal(t, fmt.Sprintf("failed to create filesystem %q: multiple filesystems are only supported as of ceph pacific", fsName), err.Error())
})

t.Run("multi filesystem creation now works since ceph version is pacific", func(t *testing.T) {
clusterInfo.CephVersion = version.Pacific
err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/")
assert.NoError(t, err)
})
}

func TestUpgradeFilesystem(t *testing.T) {
Expand All @@ -388,7 +350,9 @@ func TestUpgradeFilesystem(t *testing.T) {
configDir, _ := ioutil.TempDir("", "")

fsName := "myfs"
executor := fsExecutor(t, fsName, configDir, false)
addDataOnePoolCount := 0
createDataOnePoolCount := 0
executor := fsExecutor(t, fsName, configDir, false, &createDataOnePoolCount, &addDataOnePoolCount)
defer os.RemoveAll(configDir)
clientset := testop.New(t, 1)
context := &clusterd.Context{
Expand Down
10 changes: 4 additions & 6 deletions pkg/operator/ceph/file/mds/mds.go
Expand Up @@ -20,7 +20,6 @@ package mds
import (
"context"
"fmt"
"strconv"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -58,7 +57,6 @@ type Cluster struct {
context *clusterd.Context
clusterSpec *cephv1.ClusterSpec
fs cephv1.CephFilesystem
fsID string
ownerInfo *k8sutil.OwnerInfo
dataDirHostPath string
}
Expand All @@ -75,7 +73,6 @@ func NewCluster(
context *clusterd.Context,
clusterSpec *cephv1.ClusterSpec,
fs cephv1.CephFilesystem,
fsdetails *cephclient.CephFilesystemDetails,
ownerInfo *k8sutil.OwnerInfo,
dataDirHostPath string,
) *Cluster {
Expand All @@ -84,7 +81,6 @@ func NewCluster(
context: context,
clusterSpec: clusterSpec,
fs: fs,
fsID: strconv.Itoa(fsdetails.ID),
ownerInfo: ownerInfo,
dataDirHostPath: dataDirHostPath,
}
Expand Down Expand Up @@ -232,7 +228,7 @@ func (c *Cluster) isCephUpgrade() (bool, error) {
return false, err
}
if cephver.IsSuperior(c.clusterInfo.CephVersion, *currentVersion) {
logger.Debugf("ceph version for MDS %q is %q and target version is %q", key, currentVersion, c.clusterInfo.CephVersion)
logger.Debugf("ceph version for MDS %q is %q and target version is %q", key, currentVersion.String(), c.clusterInfo.CephVersion.String())
return true, err
}
}
Expand All @@ -249,7 +245,9 @@ func (c *Cluster) upgradeMDS() error {
return errors.Wrap(err, "failed to setting allow_standby_replay to false")
}

// In Pacific, standby-replay daemons are stopped automatically. Older versions of Ceph require us to stop these daemons manually.
// In Pacific, standby-replay daemons are stopped automatically. Older versions of Ceph require
// us to stop these daemons manually.
// TODO: so why don't we have a version check?
if err := cephclient.FailAllStandbyReplayMDS(c.context, c.clusterInfo, c.fs.Name); err != nil {
return errors.Wrap(err, "failed to fail mds agent in up:standby-replay state")
}
Expand Down
1 change: 0 additions & 1 deletion pkg/operator/ceph/file/mds/spec_test.go
Expand Up @@ -72,7 +72,6 @@ func testDeploymentObject(t *testing.T, network cephv1.NetworkSpec) (*apps.Deplo
Network: network,
},
fs,
&cephclient.CephFilesystemDetails{ID: 15},
&k8sutil.OwnerInfo{},
"/var/lib/rook/",
)
Expand Down
4 changes: 4 additions & 0 deletions pkg/operator/ceph/object/user.go
Expand Up @@ -136,6 +136,10 @@ func CreateUser(c *Context, user ObjectUser) (*ObjectUser, int, error) {

result, err := runAdminCommand(c, true, args...)
if err != nil {
if code, err := exec.ExtractExitCode(err); err == nil && code == int(syscall.EEXIST) {
return nil, ErrorCodeFileExists, errors.New("s3 user already exists")
}

if strings.Contains(result, "could not create user: unable to create user, user: ") {
return nil, ErrorCodeFileExists, errors.New("s3 user already exists")
}
Expand Down

0 comments on commit 3f4a47e

Please sign in to comment.