Skip to content

Commit

Permalink
Merge pull request #7995 from cybozu-go/ceph-make-timeout-of-radosgw-…
Browse files Browse the repository at this point in the history
…admin-configurable

ceph: make the timeout of ceph commands configurable
  • Loading branch information
travisn committed Jul 27, 2021
2 parents 38c4d52 + 30e4fbb commit b8ad33b
Show file tree
Hide file tree
Showing 20 changed files with 77 additions and 14 deletions.
2 changes: 2 additions & 0 deletions cluster/charts/rook-ceph/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ spec:
value: "{{ .Values.enableFlexDriver }}"
- name: ROOK_ENABLE_DISCOVERY_DAEMON
value: "{{ .Values.enableDiscoveryDaemon }}"
- name: ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS
value: "{{ .Values.cephCommandsTimeoutSeconds }}"
- name: ROOK_OBC_WATCH_OPERATOR_NAMESPACE
value: "{{ .Values.enableOBCWatchOperatorNamespace }}"

Expand Down
1 change: 1 addition & 0 deletions cluster/charts/rook-ceph/templates/resources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9867,5 +9867,6 @@ spec:
version: v1
subresources:
status: {}

{{- end }}
{{- end }}
1 change: 1 addition & 0 deletions cluster/charts/rook-ceph/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ csi:

enableFlexDriver: false
enableDiscoveryDaemon: false
cephCommandsTimeoutSeconds: "15"

# enable the ability to have multiple Ceph filesystems in the same cluster
# WARNING: Experimental feature in Ceph Releases Octopus (v15) and Nautilus (v14)
Expand Down
2 changes: 2 additions & 0 deletions cluster/examples/kubernetes/ceph/operator-openshift.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ data:
ROOK_ENABLE_DISCOVERY_DAEMON: "false"
# Enable volume replication controller
CSI_ENABLE_VOLUME_REPLICATION: "false"
# The timeout value (in seconds) of Ceph commands. It should be >= 1. If this variable is not set or is an invalid value, it's default to 15.
ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS: "15"
# CSI_VOLUME_REPLICATION_IMAGE: "quay.io/csiaddons/volumereplication-operator:v0.1.0"

# (Optional) Admission controller NodeAffinity.
Expand Down
2 changes: 2 additions & 0 deletions cluster/examples/kubernetes/ceph/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ data:
# Whether to start the discovery daemon to watch for raw storage devices on nodes in the cluster.
# This daemon does not need to run if you are only going to create your OSDs based on StorageClassDeviceSets with PVCs.
ROOK_ENABLE_DISCOVERY_DAEMON: "false"
# The timeout value (in seconds) of Ceph commands. It should be >= 1. If this variable is not set or is an invalid value, it's default to 15.
ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS: "15"
# Enable volume replication controller
CSI_ENABLE_VOLUME_REPLICATION: "false"
# CSI_VOLUME_REPLICATION_IMAGE: "quay.io/csiaddons/volumereplication-operator:v0.1.0"
Expand Down
2 changes: 1 addition & 1 deletion cluster/examples/kubernetes/ceph/pre-k8s-1.16/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -773,4 +773,4 @@ spec:
scope: Namespaced
version: v1
subresources:
status: {}
status: {}
4 changes: 2 additions & 2 deletions pkg/daemon/ceph/client/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func FinalizeCephCommandArgs(command string, clusterInfo *ClusterInfo, args []st

// we could use a slice and iterate over it but since we have only 3 elements
// I don't think this is worth a loop
timeout := strconv.Itoa(int(exec.CephCommandTimeout.Seconds()))
timeout := strconv.Itoa(int(exec.CephCommandsTimeout.Seconds()))
if command != "rbd" && command != "crushtool" && command != "radosgw-admin" {
args = append(args, "--connect-timeout="+timeout)
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func (c *CephToolCommand) RunWithTimeout(timeout time.Duration) ([]byte, error)
// configured its arguments. It is future work to integrate this case into the
// generalization.
func ExecuteRBDCommandWithTimeout(context *clusterd.Context, args []string) (string, error) {
output, err := context.Executor.ExecuteCommandWithTimeout(exec.CephCommandTimeout, RBDTool, args...)
output, err := context.Executor.ExecuteCommandWithTimeout(exec.CephCommandsTimeout, RBDTool, args...)
return output, err
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/daemon/ceph/client/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client
import (
"strconv"
"testing"
"time"

"github.com/pkg/errors"
"github.com/rook/rook/pkg/clusterd"
Expand All @@ -35,7 +36,7 @@ func TestFinalizeCephCommandArgs(t *testing.T) {
args := []string{"quorum_status"}
expectedArgs := []string{
"quorum_status",
"--connect-timeout=" + strconv.Itoa(int(exec.CephCommandTimeout.Seconds())),
"--connect-timeout=" + strconv.Itoa(int(exec.CephCommandsTimeout.Seconds())),
"--cluster=rook",
"--conf=/var/lib/rook/rook-ceph/rook/rook.config",
"--name=client.admin",
Expand Down Expand Up @@ -98,6 +99,7 @@ func TestFinalizeCephCommandArgsToolBox(t *testing.T) {
}

clusterInfo := AdminClusterInfo("rook")
exec.CephCommandsTimeout = 15 * time.Second
cmd, args := FinalizeCephCommandArgs(expectedCommand, clusterInfo, args, configDir)
assert.Exactly(t, "kubectl", cmd)
assert.Exactly(t, expectedArgs, args)
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/ceph/client/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *ClusterInfo) NamespacedName() types.NamespacedName {
}

// AdminClusterInfo() creates a ClusterInfo with the basic info to access the cluster
// as an admin. Only the namespace and the ceph username fields are set in the struct,
// as an admin. Only a few fields are set in the struct,
// so this clusterInfo cannot be used to generate the mon config or request the
// namespacedName. A full cluster info must be populated for those operations.
func AdminClusterInfo(namespace string) *ClusterInfo {
Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/ceph/client/mon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package client
import (
"fmt"
"testing"
"time"

"github.com/pkg/errors"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/util/exec"
exectest "github.com/rook/rook/pkg/util/exec/test"
"github.com/stretchr/testify/assert"
)
Expand All @@ -29,6 +31,7 @@ func TestCephArgs(t *testing.T) {
// cluster a under /etc
args := []string{}
clusterInfo := AdminClusterInfo("a")
exec.CephCommandsTimeout = 15 * time.Second
command, args := FinalizeCephCommandArgs(CephTool, clusterInfo, args, "/etc")
assert.Equal(t, CephTool, command)
assert.Equal(t, 5, len(args))
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/mgr/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (c *Cluster) createSelfSignedCert() (bool, error) {

// retry a few times in the case that the mgr module is not ready to accept commands
for i := 0; i < 5; i++ {
_, err := client.NewCephCommand(c.context, c.clusterInfo, args).RunWithTimeout(exec.CephCommandTimeout)
_, err := client.NewCephCommand(c.context, c.clusterInfo, args).RunWithTimeout(exec.CephCommandsTimeout)
if err == context.DeadlineExceeded {
logger.Warning("cert creation timed out. trying again")
continue
Expand Down Expand Up @@ -253,7 +253,7 @@ func (c *Cluster) setLoginCredentials(password string) error {
}

_, err := client.ExecuteCephCommandWithRetry(func() (string, []byte, error) {
output, err := client.NewCephCommand(c.context, c.clusterInfo, args).RunWithTimeout(exec.CephCommandTimeout)
output, err := client.NewCephCommand(c.context, c.clusterInfo, args).RunWithTimeout(exec.CephCommandsTimeout)
return "set dashboard creds", output, err
}, c.exitCode, 5, invalidArgErrorCode, dashboardInitWaitTime)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mgr/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *Cluster) setRookOrchestratorBackend() error {
// retry a few times in the case that the mgr module is not ready to accept commands
_, err := client.ExecuteCephCommandWithRetry(func() (string, []byte, error) {
args := []string{orchestratorCLIName, "set", "backend", "rook"}
output, err := client.NewCephCommand(c.context, c.clusterInfo, args).RunWithTimeout(exec.CephCommandTimeout)
output, err := client.NewCephCommand(c.context, c.clusterInfo, args).RunWithTimeout(exec.CephCommandsTimeout)
return "set rook backend", output, err
}, c.exitCode, 5, invalidArgErrorCode, orchestratorInitWaitTime)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/ceph/cluster/mgr/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/rook/rook/pkg/clusterd"
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
"github.com/rook/rook/pkg/util/exec"
exectest "github.com/rook/rook/pkg/util/exec/test"
"github.com/stretchr/testify/assert"
)
Expand All @@ -33,6 +34,7 @@ func TestOrchestratorModules(t *testing.T) {
rookModuleEnabled := false
rookBackendSet := false
backendErrorCount := 0
exec.CephCommandsTimeout = 15 * time.Second
executor.MockExecuteCommandWithOutput = func(command string, args ...string) (string, error) {
logger.Infof("Command: %s %v", command, args)
if args[0] == "mgr" && args[1] == "module" && args[2] == "enable" {
Expand Down
13 changes: 13 additions & 0 deletions pkg/operator/ceph/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"time"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util/exec"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -81,6 +83,17 @@ func DiscoveryDaemonEnabled(context *clusterd.Context) bool {
return value == "true"
}

// SetCephCommandsTimeout sets the timeout value of Ceph commands which are executed from Rook
func SetCephCommandsTimeout(context *clusterd.Context) {
strTimeoutSeconds, _ := k8sutil.GetOperatorSetting(context.Clientset, OperatorSettingConfigMapName, "ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS", "15")
timeoutSeconds, err := strconv.Atoi(strTimeoutSeconds)
if err != nil || timeoutSeconds < 1 {
logger.Warningf("ROOK_CEPH_COMMANDS_TIMEOUT is %q but it should be >= 1, set the default value 15", strTimeoutSeconds)
timeoutSeconds = 15
}
exec.CephCommandsTimeout = time.Duration(timeoutSeconds) * time.Second
}

// CheckForCancelledOrchestration checks whether a cancellation has been requested
func CheckForCancelledOrchestration(context *clusterd.Context) error {
defer context.RequestCancelOrchestration.UnSet()
Expand Down
34 changes: 34 additions & 0 deletions pkg/operator/ceph/controller/controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@ limitations under the License.
package controller

import (
"context"
"testing"
"time"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/util/exec"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)

func CreateTestClusterFromStatusDetails(details map[string]cephv1.CephHealthMessage) cephv1.CephCluster {
Expand Down Expand Up @@ -70,3 +77,30 @@ func TestCanIgnoreHealthErrStatusInReconcile(t *testing.T) {
})
assert.False(t, canIgnoreHealthErrStatusInReconcile(cluster, "controller"))
}

func TestSetCephCommandsTimeout(t *testing.T) {
clientset := fake.NewSimpleClientset()
ctx := context.TODO()
cm := &v1.ConfigMap{}
cm.Name = "rook-ceph-operator-config"
_, err := clientset.CoreV1().ConfigMaps("").Create(ctx, cm, metav1.CreateOptions{})
assert.NoError(t, err)
context := &clusterd.Context{Clientset: clientset}

SetCephCommandsTimeout(context)
assert.Equal(t, 15*time.Second, exec.CephCommandsTimeout)

exec.CephCommandsTimeout = 0
cm.Data = map[string]string{"ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS": "0"}
_, err = clientset.CoreV1().ConfigMaps("").Update(ctx, cm, metav1.UpdateOptions{})
assert.NoError(t, err)
SetCephCommandsTimeout(context)
assert.Equal(t, 15*time.Second, exec.CephCommandsTimeout)

exec.CephCommandsTimeout = 0
cm.Data = map[string]string{"ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS": "1"}
_, err = clientset.CoreV1().ConfigMaps("").Update(ctx, cm, metav1.UpdateOptions{})
assert.NoError(t, err)
SetCephCommandsTimeout(context)
assert.Equal(t, 1*time.Second, exec.CephCommandsTimeout)
}
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func RunAdminCommandNoMultisite(c *Context, expectJSON bool, args ...string) (st
output, stderr, err = c.Context.RemoteExecutor.ExecCommandInContainerWithFullOutputWithTimeout(cephclient.ProxyAppLabel, cephclient.CommandProxyInitContainerName, c.clusterInfo.Namespace, append([]string{"radosgw-admin"}, args...)...)
} else {
command, args := cephclient.FinalizeCephCommandArgs("radosgw-admin", c.clusterInfo, args, c.Context.ConfigDir)
output, err = c.Context.Executor.ExecuteCommandWithTimeout(exec.CephCommandTimeout, command, args...)
output, err = c.Context.Executor.ExecuteCommandWithTimeout(exec.CephCommandsTimeout, command, args...)
}

if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/object/objectstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ func enableRGWDashboard(context *Context) error {
// starting in ceph v15.2.8. We run it in a goroutine until the fix
// is found. We expect the ceph command to timeout so at least the goroutine exits.
logger.Info("setting the dashboard api secret key")
_, err = cephCmd.RunWithTimeout(exec.CephCommandTimeout)
_, err = cephCmd.RunWithTimeout(exec.CephCommandsTimeout)
if err != nil {
logger.Errorf("failed to set user %q secretkey. %v", DashboardUser, err)
}
Expand Down Expand Up @@ -943,14 +943,14 @@ func disableRGWDashboard(context *Context) {

args := []string{"dashboard", "reset-rgw-api-access-key"}
cephCmd := cephclient.NewCephCommand(context.Context, context.clusterInfo, args)
_, err = cephCmd.RunWithTimeout(exec.CephCommandTimeout)
_, err = cephCmd.RunWithTimeout(exec.CephCommandsTimeout)
if err != nil {
logger.Warningf("failed to reset user accesskey for user %q. %v", DashboardUser, err)
}

args = []string{"dashboard", "reset-rgw-api-secret-key"}
cephCmd = cephclient.NewCephCommand(context.Context, context.clusterInfo, args)
_, err = cephCmd.RunWithTimeout(exec.CephCommandTimeout)
_, err = cephCmd.RunWithTimeout(exec.CephCommandsTimeout)
if err != nil {
logger.Warningf("failed to reset user secretkey for user %q. %v", DashboardUser, err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (o *Operator) Run() error {
return errors.Errorf("rook operator namespace is not provided. expose it via downward API in the rook operator manifest file using environment variable %q", k8sutil.PodNamespaceEnvVar)
}

opcontroller.SetCephCommandsTimeout(o.context)
// creating a context
stopContext, stopFunc := context.WithCancel(context.Background())
defer stopFunc()
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
)

var (
CephCommandTimeout = 15 * time.Second
CephCommandsTimeout = 15 * time.Second
)

// Executor is the main interface for all the exec commands
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/exec/exec_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,5 @@ func execute(method string, url *url.URL, config *rest.Config, stdin io.Reader,
}

func (e *RemotePodCommandExecutor) ExecCommandInContainerWithFullOutputWithTimeout(appLabel, containerName, namespace string, cmd ...string) (string, string, error) {
return e.ExecCommandInContainerWithFullOutput(appLabel, containerName, namespace, append([]string{"timeout", strconv.Itoa(int(CephCommandTimeout.Seconds()))}, cmd...)...)
return e.ExecCommandInContainerWithFullOutput(appLabel, containerName, namespace, append([]string{"timeout", strconv.Itoa(int(CephCommandsTimeout.Seconds()))}, cmd...)...)
}

0 comments on commit b8ad33b

Please sign in to comment.