From db74ae7c3984d3f65439814b7b6ff4493b1ee23b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Han?= Date: Fri, 17 Sep 2021 16:46:02 +0200 Subject: [PATCH] mds: change init sequence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The MDS core team suggested with deploy the MDS daemon first and then do the filesystem creation and configuration. Reversing the sequence lets us avoid spurious FS_DOWN warnings when creating the filesystem. Closes: https://github.com/rook/rook/issues/8745 Signed-off-by: Sébastien Han --- pkg/operator/ceph/file/filesystem.go | 36 ++--- pkg/operator/ceph/file/filesystem_test.go | 188 +++++++++------------- pkg/operator/ceph/file/mds/mds.go | 10 +- pkg/operator/ceph/file/mds/spec_test.go | 1 - pkg/operator/ceph/object/user.go | 4 + tests/integration/ceph_base_file_test.go | 19 ++- 6 files changed, 111 insertions(+), 147 deletions(-) diff --git a/pkg/operator/ceph/file/filesystem.go b/pkg/operator/ceph/file/filesystem.go index c6cba903b17e7..2430c9b08f323 100644 --- a/pkg/operator/ceph/file/filesystem.go +++ b/pkg/operator/ceph/file/filesystem.go @@ -18,10 +18,8 @@ package file import ( "fmt" - "syscall" "github.com/rook/rook/pkg/operator/k8sutil" - "github.com/rook/rook/pkg/util/exec" "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" @@ -51,37 +49,31 @@ func createFilesystem( ownerInfo *k8sutil.OwnerInfo, dataDirHostPath string, ) error { + logger.Infof("start running mdses for filesystem %q", fs.Name) + c := mds.NewCluster(clusterInfo, context, clusterSpec, fs, ownerInfo, dataDirHostPath) + if err := c.Start(); err != nil { + return err + } + if len(fs.Spec.DataPools) != 0 { f := newFS(fs.Name, fs.Namespace) if err := f.doFilesystemCreate(context, clusterInfo, clusterSpec, fs.Spec); err != nil { return errors.Wrapf(err, "failed to create filesystem %q", fs.Name) } } - - filesystem, err := cephclient.GetFilesystem(context, clusterInfo, fs.Name) - if err != nil { - return errors.Wrapf(err, "failed to get filesystem %q", fs.Name) - } - if fs.Spec.MetadataServer.ActiveStandby { - if err = cephclient.AllowStandbyReplay(context, clusterInfo, fs.Name, fs.Spec.MetadataServer.ActiveStandby); err != nil { + if err := cephclient.AllowStandbyReplay(context, clusterInfo, fs.Name, fs.Spec.MetadataServer.ActiveStandby); err != nil { return errors.Wrapf(err, "failed to set allow_standby_replay to filesystem %q", fs.Name) } } // set the number of active mds instances if fs.Spec.MetadataServer.ActiveCount > 1 { - if err = cephclient.SetNumMDSRanks(context, clusterInfo, fs.Name, fs.Spec.MetadataServer.ActiveCount); err != nil { + if err := cephclient.SetNumMDSRanks(context, clusterInfo, fs.Name, fs.Spec.MetadataServer.ActiveCount); err != nil { logger.Warningf("failed setting active mds count to %d. %v", fs.Spec.MetadataServer.ActiveCount, err) } } - logger.Infof("start running mdses for filesystem %q", fs.Name) - c := mds.NewCluster(clusterInfo, context, clusterSpec, fs, filesystem, ownerInfo, dataDirHostPath) - if err := c.Start(); err != nil { - return err - } - return nil } @@ -94,15 +86,7 @@ func deleteFilesystem( ownerInfo *k8sutil.OwnerInfo, dataDirHostPath string, ) error { - filesystem, err := cephclient.GetFilesystem(context, clusterInfo, fs.Name) - if err != nil { - if code, ok := exec.ExitStatus(err); ok && code == int(syscall.ENOENT) { - // If we're deleting the filesystem anyway, ignore the error that the filesystem doesn't exist - return nil - } - return errors.Wrapf(err, "failed to get filesystem %q", fs.Name) - } - c := mds.NewCluster(clusterInfo, context, clusterSpec, fs, filesystem, ownerInfo, dataDirHostPath) + c := mds.NewCluster(clusterInfo, context, clusterSpec, fs, ownerInfo, dataDirHostPath) // Delete mds CephX keys and configuration in centralized mon database replicas := fs.Spec.MetadataServer.ActiveCount * 2 @@ -110,7 +94,7 @@ func deleteFilesystem( daemonLetterID := k8sutil.IndexToName(i) daemonName := fmt.Sprintf("%s-%s", fs.Name, daemonLetterID) - err = c.DeleteMdsCephObjects(daemonName) + err := c.DeleteMdsCephObjects(daemonName) if err != nil { return errors.Wrapf(err, "failed to delete mds ceph objects for filesystem %q", fs.Name) } diff --git a/pkg/operator/ceph/file/filesystem_test.go b/pkg/operator/ceph/file/filesystem_test.go index 263781038a526..b3a48845532f7 100644 --- a/pkg/operator/ceph/file/filesystem_test.go +++ b/pkg/operator/ceph/file/filesystem_test.go @@ -95,7 +95,7 @@ func isBasePoolOperation(fsName, command string, args []string) bool { return false } -func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool) *exectest.MockExecutor { +func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool, createDataOnePoolCount, addDataOnePoolCount *int) *exectest.MockExecutor { mdsmap := cephclient.CephFilesystemDetails{ ID: 0, MDSMap: cephclient.MDSMap{ @@ -160,6 +160,16 @@ func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool) *exectest. return "", nil } else if contains(args, "flag") && contains(args, "enable_multiple") { return "", nil + } else if reflect.DeepEqual(args[0:5], []string{"osd", "crush", "rule", "create-replicated", fsName + "-data1"}) { + return "", nil + } else if reflect.DeepEqual(args[0:4], []string{"osd", "pool", "create", fsName + "-data1"}) { + *createDataOnePoolCount++ + return "", nil + } else if reflect.DeepEqual(args[0:6], []string{"osd", "pool", "set", fsName + "-data1", "size", "1"}) { + return "", nil + } else if reflect.DeepEqual(args[0:4], []string{"fs", "add_data_pool", fsName, fsName + "-data1"}) { + *addDataOnePoolCount++ + return "", nil } else if contains(args, "versions") { versionStr, _ := json.Marshal( map[string]map[string]int{ @@ -213,6 +223,16 @@ func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool) *exectest. return "", nil } else if contains(args, "config") && contains(args, "get") { return "{}", nil + } else if reflect.DeepEqual(args[0:5], []string{"osd", "crush", "rule", "create-replicated", fsName + "-data1"}) { + return "", nil + } else if reflect.DeepEqual(args[0:4], []string{"osd", "pool", "create", fsName + "-data1"}) { + *createDataOnePoolCount++ + return "", nil + } else if reflect.DeepEqual(args[0:6], []string{"osd", "pool", "set", fsName + "-data1", "size", "1"}) { + return "", nil + } else if reflect.DeepEqual(args[0:4], []string{"fs", "add_data_pool", fsName, fsName + "-data1"}) { + *addDataOnePoolCount++ + return "", nil } else if contains(args, "versions") { versionStr, _ := json.Marshal( map[string]map[string]int{ @@ -257,9 +277,10 @@ func TestCreateFilesystem(t *testing.T) { var deploymentsUpdated *[]*apps.Deployment mds.UpdateDeploymentAndWait, deploymentsUpdated = testopk8s.UpdateDeploymentAndWaitStub() configDir, _ := ioutil.TempDir("", "") - fsName := "myfs" - executor := fsExecutor(t, fsName, configDir, false) + addDataOnePoolCount := 0 + createDataOnePoolCount := 0 + executor := fsExecutor(t, fsName, configDir, false, &createDataOnePoolCount, &addDataOnePoolCount) defer os.RemoveAll(configDir) clientset := testop.New(t, 1) context := &clusterd.Context{ @@ -268,117 +289,58 @@ func TestCreateFilesystem(t *testing.T) { Clientset: clientset} fs := fsTest(fsName) clusterInfo := &cephclient.ClusterInfo{FSID: "myfsid", CephVersion: version.Octopus, Context: ctx} - - // start a basic cluster ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() - err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/") - assert.Nil(t, err) - validateStart(ctx, t, context, fs) - assert.ElementsMatch(t, []string{}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated)) - testopk8s.ClearDeploymentsUpdated(deploymentsUpdated) - // starting again should be a no-op - err = createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/") - assert.Nil(t, err) - validateStart(ctx, t, context, fs) - assert.ElementsMatch(t, []string{fmt.Sprintf("rook-ceph-mds-%s-a", fsName), fmt.Sprintf("rook-ceph-mds-%s-b", fsName)}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated)) - testopk8s.ClearDeploymentsUpdated(deploymentsUpdated) - - // Increasing the number of data pools should be successful. - createDataOnePoolCount := 0 - addDataOnePoolCount := 0 - createdFsResponse := fmt.Sprintf(`{"fs_name": "%s", "metadata_pool": 2, "data_pools":[3]}`, fsName) - executor = &exectest.MockExecutor{ - MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { - if contains(args, "fs") && contains(args, "get") { - return createdFsResponse, nil - } else if isBasePoolOperation(fsName, command, args) { - return "", nil - } else if reflect.DeepEqual(args[0:4], []string{"osd", "pool", "create", fsName + "-data1"}) { - createDataOnePoolCount++ - return "", nil - } else if reflect.DeepEqual(args[0:4], []string{"fs", "add_data_pool", fsName, fsName + "-data1"}) { - addDataOnePoolCount++ - return "", nil - } else if contains(args, "set") && contains(args, "max_mds") { - return "", nil - } else if contains(args, "auth") && contains(args, "get-or-create-key") { - return "{\"key\":\"mysecurekey\"}", nil - } else if reflect.DeepEqual(args[0:5], []string{"osd", "crush", "rule", "create-replicated", fsName + "-data1"}) { - return "", nil - } else if reflect.DeepEqual(args[0:6], []string{"osd", "pool", "set", fsName + "-data1", "size", "1"}) { - return "", nil - } else if args[0] == "config" && args[1] == "set" { - return "", nil - } else if contains(args, "versions") { - versionStr, _ := json.Marshal( - map[string]map[string]int{ - "mds": { - "ceph version 16.0.0-4-g2f728b9 (2f728b952cf293dd7f809ad8a0f5b5d040c43010) pacific (stable)": 2, - }, - }) - return string(versionStr), nil - } - assert.Fail(t, fmt.Sprintf("Unexpected command: %v", args)) - return "", nil - }, - } - context = &clusterd.Context{ - Executor: executor, - ConfigDir: configDir, - Clientset: clientset} - fs.Spec.DataPools = append(fs.Spec.DataPools, cephv1.PoolSpec{Replicated: cephv1.ReplicatedSpec{Size: 1, RequireSafeReplicaSize: false}}) - - err = createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/") - assert.Nil(t, err) - validateStart(ctx, t, context, fs) - assert.ElementsMatch(t, []string{fmt.Sprintf("rook-ceph-mds-%s-a", fsName), fmt.Sprintf("rook-ceph-mds-%s-b", fsName)}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated)) - assert.Equal(t, 1, createDataOnePoolCount) - assert.Equal(t, 1, addDataOnePoolCount) - testopk8s.ClearDeploymentsUpdated(deploymentsUpdated) - - // Test multiple filesystem creation - // Output to check multiple filesystem creation - fses := `[{"name":"myfs","metadata_pool":"myfs-metadata","metadata_pool_id":4,"data_pool_ids":[5],"data_pools":["myfs-data0"]},{"name":"myfs2","metadata_pool":"myfs2-metadata","metadata_pool_id":6,"data_pool_ids":[7],"data_pools":["myfs2-data0"]},{"name":"leseb","metadata_pool":"cephfs.leseb.meta","metadata_pool_id":8,"data_pool_ids":[9],"data_pools":["cephfs.leseb.data"]}]` - executorMultiFS := &exectest.MockExecutor{ - MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { - if contains(args, "ls") { - return fses, nil - } else if contains(args, "versions") { - versionStr, _ := json.Marshal( - map[string]map[string]int{ - "mds": { - "ceph version 16.0.0-4-g2f728b9 (2f728b952cf293dd7f809ad8a0f5b5d040c43010) pacific (stable)": 2, - }, - }) - return string(versionStr), nil - } - return "{\"key\":\"mysecurekey\"}", errors.New("multiple fs") - }, - } - context = &clusterd.Context{ - Executor: executorMultiFS, - ConfigDir: configDir, - Clientset: clientset, - } - - // Create another filesystem which should fail - err = createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}, "/var/lib/rook/") - assert.Error(t, err) - assert.Equal(t, fmt.Sprintf("failed to create filesystem %q: multiple filesystems are only supported as of ceph pacific", fsName), err.Error()) + t.Run("start basic filesystem", func(t *testing.T) { + // start a basic cluster + err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/") + assert.Nil(t, err) + validateStart(ctx, t, context, fs) + assert.ElementsMatch(t, []string{}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated)) + testopk8s.ClearDeploymentsUpdated(deploymentsUpdated) + }) + + t.Run("start again should no-op", func(t *testing.T) { + err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/") + assert.Nil(t, err) + validateStart(ctx, t, context, fs) + assert.ElementsMatch(t, []string{fmt.Sprintf("rook-ceph-mds-%s-a", fsName), fmt.Sprintf("rook-ceph-mds-%s-b", fsName)}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated)) + testopk8s.ClearDeploymentsUpdated(deploymentsUpdated) + }) + + t.Run("increasing the number of data pools should be successful.", func(t *testing.T) { + context = &clusterd.Context{ + Executor: executor, + ConfigDir: configDir, + Clientset: clientset} + fs.Spec.DataPools = append(fs.Spec.DataPools, cephv1.PoolSpec{Replicated: cephv1.ReplicatedSpec{Size: 1, RequireSafeReplicaSize: false}}) + err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/") + assert.Nil(t, err) + validateStart(ctx, t, context, fs) + assert.ElementsMatch(t, []string{fmt.Sprintf("rook-ceph-mds-%s-a", fsName), fmt.Sprintf("rook-ceph-mds-%s-b", fsName)}, testopk8s.DeploymentNamesUpdated(deploymentsUpdated)) + assert.Equal(t, 1, createDataOnePoolCount) + assert.Equal(t, 1, addDataOnePoolCount) + testopk8s.ClearDeploymentsUpdated(deploymentsUpdated) + }) + + t.Run("multiple filesystem creation", func(t *testing.T) { + context = &clusterd.Context{ + Executor: fsExecutor(t, fsName, configDir, true, &createDataOnePoolCount, &addDataOnePoolCount), + ConfigDir: configDir, + Clientset: clientset, + } - // It works since the Ceph version is Pacific - fsName = "myfs3" - fs = fsTest(fsName) - executor = fsExecutor(t, fsName, configDir, true) - clusterInfo.CephVersion = version.Pacific - context = &clusterd.Context{ - Executor: executor, - ConfigDir: configDir, - Clientset: clientset, - } - err = createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/") - assert.NoError(t, err) + // Create another filesystem which should fail + err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}, "/var/lib/rook/") + assert.Error(t, err) + assert.Equal(t, fmt.Sprintf("failed to create filesystem %q: multiple filesystems are only supported as of ceph pacific", fsName), err.Error()) + }) + + t.Run("multi filesystem creation now works since ceph version is pacific", func(t *testing.T) { + clusterInfo.CephVersion = version.Pacific + err := createFilesystem(context, clusterInfo, fs, &cephv1.ClusterSpec{}, ownerInfo, "/var/lib/rook/") + assert.NoError(t, err) + }) } func TestUpgradeFilesystem(t *testing.T) { @@ -388,7 +350,9 @@ func TestUpgradeFilesystem(t *testing.T) { configDir, _ := ioutil.TempDir("", "") fsName := "myfs" - executor := fsExecutor(t, fsName, configDir, false) + addDataOnePoolCount := 0 + createDataOnePoolCount := 0 + executor := fsExecutor(t, fsName, configDir, false, &createDataOnePoolCount, &addDataOnePoolCount) defer os.RemoveAll(configDir) clientset := testop.New(t, 1) context := &clusterd.Context{ diff --git a/pkg/operator/ceph/file/mds/mds.go b/pkg/operator/ceph/file/mds/mds.go index d43d144b2883e..58661cd1bfbb4 100644 --- a/pkg/operator/ceph/file/mds/mds.go +++ b/pkg/operator/ceph/file/mds/mds.go @@ -20,7 +20,6 @@ package mds import ( "context" "fmt" - "strconv" "strings" "syscall" "time" @@ -58,7 +57,6 @@ type Cluster struct { context *clusterd.Context clusterSpec *cephv1.ClusterSpec fs cephv1.CephFilesystem - fsID string ownerInfo *k8sutil.OwnerInfo dataDirHostPath string } @@ -75,7 +73,6 @@ func NewCluster( context *clusterd.Context, clusterSpec *cephv1.ClusterSpec, fs cephv1.CephFilesystem, - fsdetails *cephclient.CephFilesystemDetails, ownerInfo *k8sutil.OwnerInfo, dataDirHostPath string, ) *Cluster { @@ -84,7 +81,6 @@ func NewCluster( context: context, clusterSpec: clusterSpec, fs: fs, - fsID: strconv.Itoa(fsdetails.ID), ownerInfo: ownerInfo, dataDirHostPath: dataDirHostPath, } @@ -232,7 +228,7 @@ func (c *Cluster) isCephUpgrade() (bool, error) { return false, err } if cephver.IsSuperior(c.clusterInfo.CephVersion, *currentVersion) { - logger.Debugf("ceph version for MDS %q is %q and target version is %q", key, currentVersion, c.clusterInfo.CephVersion) + logger.Debugf("ceph version for MDS %q is %q and target version is %q", key, currentVersion.String(), c.clusterInfo.CephVersion.String()) return true, err } } @@ -249,7 +245,9 @@ func (c *Cluster) upgradeMDS() error { return errors.Wrap(err, "failed to setting allow_standby_replay to false") } - // In Pacific, standby-replay daemons are stopped automatically. Older versions of Ceph require us to stop these daemons manually. + // In Pacific, standby-replay daemons are stopped automatically. Older versions of Ceph require + // us to stop these daemons manually. + // TODO: so why don't we have a version check? if err := cephclient.FailAllStandbyReplayMDS(c.context, c.clusterInfo, c.fs.Name); err != nil { return errors.Wrap(err, "failed to fail mds agent in up:standby-replay state") } diff --git a/pkg/operator/ceph/file/mds/spec_test.go b/pkg/operator/ceph/file/mds/spec_test.go index d6c9d53e4ae11..864145a92bce4 100644 --- a/pkg/operator/ceph/file/mds/spec_test.go +++ b/pkg/operator/ceph/file/mds/spec_test.go @@ -72,7 +72,6 @@ func testDeploymentObject(t *testing.T, network cephv1.NetworkSpec) (*apps.Deplo Network: network, }, fs, - &cephclient.CephFilesystemDetails{ID: 15}, &k8sutil.OwnerInfo{}, "/var/lib/rook/", ) diff --git a/pkg/operator/ceph/object/user.go b/pkg/operator/ceph/object/user.go index 9eda431c82b04..ce9c18dd38dbf 100644 --- a/pkg/operator/ceph/object/user.go +++ b/pkg/operator/ceph/object/user.go @@ -136,6 +136,10 @@ func CreateUser(c *Context, user ObjectUser) (*ObjectUser, int, error) { result, err := runAdminCommand(c, true, args...) if err != nil { + if code, err := exec.ExtractExitCode(err); err == nil && code == int(syscall.EEXIST) { + return nil, ErrorCodeFileExists, errors.New("s3 user already exists") + } + if strings.Contains(result, "could not create user: unable to create user, user: ") { return nil, ErrorCodeFileExists, errors.New("s3 user already exists") } diff --git a/tests/integration/ceph_base_file_test.go b/tests/integration/ceph_base_file_test.go index f2acba483cb3a..52704dfdcef87 100644 --- a/tests/integration/ceph_base_file_test.go +++ b/tests/integration/ceph_base_file_test.go @@ -385,9 +385,24 @@ func createFilesystem(helper *clients.TestClient, k8sh *utils.K8sHelper, s suite logger.Infof("Create file System") fscErr := helper.FSClient.Create(filesystemName, settings.Namespace, activeCount) require.Nil(s.T(), fscErr) - logger.Infof("File system %s created", filesystemName) + var err error - filesystemList, _ := helper.FSClient.List(settings.Namespace) + var filesystemList []cephclient.CephFilesystem + for range []int{0, 10} { + filesystemList, err = helper.FSClient.List(settings.Namespace) + if err != nil { + logger.Errorf("failed to list fs. trying again. %v", err) + continue + } + logger.Debugf("filesystemList is %+v", filesystemList) + if len(filesystemList) == 1 { + logger.Infof("File system %s created", filesystemList[0].Name) + break + } + logger.Infof("Waiting for file system %s to be created", filesystemName) + time.Sleep(time.Second * 5) + } + logger.Debugf("filesystemList is %+v", filesystemList) require.Equal(s.T(), 1, len(filesystemList), "There should be one shared file system present") }