Skip to content

Commit

Permalink
object: add support for user s3 key for cephobjectstoreuser
Browse files Browse the repository at this point in the history
CephObjectStoreUser should optionally be able to reference
a secret where S3 key is defined. This enables us to
specify the accesskey and accesssecret rather than those
values being randomly generated.

Closes: rook#11563

Signed-off-by: parth-gr <partharora1010@gmail.com>
  • Loading branch information
parth-gr committed Apr 19, 2024
1 parent a9fded2 commit 4a834ab
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 39 deletions.
43 changes: 43 additions & 0 deletions Documentation/CRDs/Object-Storage/ceph-object-store-user-crd.md
Expand Up @@ -60,3 +60,46 @@ spec:
* `user-policy`
* `odic-provider`
* `ratelimit`

### CephObjectStoreUser Reference Secret

If a specific user key and secret is desired instead of randomly generated credentials, a specific user key and secret can be specified for an object store user.

Create or update the Kubernetes secret with name, `rook-ceph-object-user-<store-name>-<user-name>` in the same namespace of cephobjectUser, where:

* `store-name`: The object store name in which the user will be created. This matches the name of the objectstore CRD.
* `user-name`: The metadata name of the cephObjectStoreUser

Secret details:

It should have the annotations as `rook.io/source-of-truth: secret` specified.
It should also has the array of `SecretKeys` which contains all the access and secret keys, and also the latest updated key should be at top.

```console
kubectl create -f
apiVersion: v1
kind: Secret
metadata:
name: rook-ceph-object-user-my-store-my-user
namespace: rook-ceph
annotations:
rook.io/source-of-truth: secret
data:
AccessKey: ***
SecretKey: ***
Endpoint: ***
SecretKeys: |
[
{
"user": "my-user",
"access_key": "***",
"secret_key": "***"
},
{
"user": "my-user",
"access_key": "***",
"secret_key": "***"
},
],
type: "kubernetes.io/rook"
```
25 changes: 17 additions & 8 deletions pkg/operator/ceph/object/user.go
Expand Up @@ -241,17 +241,23 @@ func GenerateCephUserSecretName(store, username string) string {
return fmt.Sprintf("rook-ceph-object-user-%s-%s", store, username)
}

func generateCephUserSecret(userConfig *admin.User, endpoint, namespace, storeName, tlsSecretName string) *corev1.Secret {
func generateCephUserSecret(userConfig *admin.User, endpoint, namespace, storeName, tlsSecretName string) (*corev1.Secret, error) {
secretName := GenerateCephUserSecretName(storeName, userConfig.ID)
secretKeys, err := json.Marshal(userConfig.Keys)
if err != nil {
return &corev1.Secret{}, err
}
// Store the keys in a secret
secrets := map[string]string{
"AccessKey": userConfig.Keys[0].AccessKey,
"SecretKey": userConfig.Keys[0].SecretKey,
"Endpoint": endpoint,
"AccessKey": userConfig.Keys[0].AccessKey,
"SecretKey": userConfig.Keys[0].SecretKey,
"SecretKeys": string(secretKeys),
"Endpoint": endpoint,
}
if tlsSecretName != "" {
secrets["SSLCertSecretName"] = tlsSecretName
}

secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Expand All @@ -266,15 +272,18 @@ func generateCephUserSecret(userConfig *admin.User, endpoint, namespace, storeNa
StringData: secrets,
Type: k8sutil.RookType,
}
return secret

return secret, nil
}

func ReconcileCephUserSecret(ctx context.Context, k8sclient client.Client, scheme *runtime.Scheme, ownerRef metav1.Object, userConfig *admin.User, endpoint, namespace, storeName, tlsSecretName string) (reconcile.Result, error) {
// Generate Kubernetes Secret
secret := generateCephUserSecret(userConfig, endpoint, namespace, storeName, tlsSecretName)

secret, err := generateCephUserSecret(userConfig, endpoint, namespace, storeName, tlsSecretName)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to get the ceph object user secret %q", GenerateCephUserSecretName(storeName, userConfig.ID))
}
// Set owner ref to the object store user object
err := controllerutil.SetControllerReference(ownerRef, secret, scheme)
err = controllerutil.SetControllerReference(ownerRef, secret, scheme)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to set owner reference of ceph object user secret %q", secret.Name)
}
Expand Down
115 changes: 94 additions & 21 deletions pkg/operator/ceph/object/user/controller.go
Expand Up @@ -19,8 +19,10 @@ package objectuser

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/ceph/go-ceph/rgw/admin"
opcontroller "github.com/rook/rook/pkg/operator/ceph/controller"
Expand Down Expand Up @@ -50,6 +52,9 @@ import (
const (
appName = object.AppName
controllerName = "ceph-object-store-user-controller"
//#nosec G101 -- This is only an env var name
updateObjectUserSecretAnnotation = "rook.io/source-of-truth"
sourceOfTruthSecret = "secret"
)

// newMultisiteAdminOpsCtxFunc help us mocking the admin ops API client in unit test
Expand Down Expand Up @@ -250,7 +255,7 @@ func (r *ReconcileObjectStoreUser) reconcile(request reconcile.Request) (reconci
}

// CREATE/UPDATE CEPH USER
reconcileResponse, err = r.reconcileCephUser(cephObjectStoreUser)
reconcileResponse, userKeysSource, err := r.reconcileCephUser(cephObjectStoreUser)
if err != nil {
r.updateStatus(k8sutil.ObservedGenerationNotAvailable, request.NamespacedName, k8sutil.ReconcileFailedStatus)
return reconcileResponse, *cephObjectStoreUser, err
Expand All @@ -263,12 +268,13 @@ func (r *ReconcileObjectStoreUser) reconcile(request reconcile.Request) (reconci
}

tlsSecretName := store.Spec.Gateway.SSLCertificateRef
reconcileResponse, err = object.ReconcileCephUserSecret(r.opManagerContext, r.client, r.scheme, cephObjectStoreUser, r.userConfig, r.objContext.Endpoint, cephObjectStoreUser.Namespace, cephObjectStoreUser.Spec.Store, tlsSecretName)
if err != nil {
r.updateStatus(k8sutil.ObservedGenerationNotAvailable, request.NamespacedName, k8sutil.ReconcileFailedStatus)
return reconcileResponse, *cephObjectStoreUser, err
if !userKeysSource {
reconcileResponse, err = object.ReconcileCephUserSecret(r.opManagerContext, r.client, r.scheme, cephObjectStoreUser, r.userConfig, r.objContext.Endpoint, cephObjectStoreUser.Namespace, cephObjectStoreUser.Spec.Store, tlsSecretName)
if err != nil {
r.updateStatus(k8sutil.ObservedGenerationNotAvailable, request.NamespacedName, k8sutil.ReconcileFailedStatus)
return reconcileResponse, *cephObjectStoreUser, err
}
}

// update ObservedGeneration in status at the end of reconcile
// Set Ready status, we are done reconciling
r.updateStatus(observedGeneration, request.NamespacedName, k8sutil.ReadyStatus)
Expand All @@ -278,41 +284,107 @@ func (r *ReconcileObjectStoreUser) reconcile(request reconcile.Request) (reconci
return reconcile.Result{}, *cephObjectStoreUser, nil
}

func (r *ReconcileObjectStoreUser) reconcileCephUser(cephObjectStoreUser *cephv1.CephObjectStoreUser) (reconcile.Result, error) {
err := r.createOrUpdateCephUser(cephObjectStoreUser)
func (r *ReconcileObjectStoreUser) reconcileCephUser(cephObjectStoreUser *cephv1.CephObjectStoreUser) (reconcile.Result, bool, error) {
userKeysSource, err := r.createOrUpdateCephUser(cephObjectStoreUser)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to create/update object store user %q", cephObjectStoreUser.Name)
return reconcile.Result{}, userKeysSource, errors.Wrapf(err, "failed to create/update object store user %q", cephObjectStoreUser.Name)
}

return reconcile.Result{}, userKeysSource, nil
}

func validateSecretKeys(accessKey, secretKey string, secretKeys []admin.UserKeySpec) ([]admin.UserKeySpec, error) {
if len(secretKeys) == 0 {
secretKeys = []admin.UserKeySpec{
{AccessKey: accessKey,
SecretKey: secretKey}}
return secretKeys, nil
}
if (secretKeys)[0].AccessKey == accessKey && (secretKeys)[0].SecretKey == secretKey {
return secretKeys, nil
}

return reconcile.Result{}, nil
return secretKeys, errors.Errorf("secret keys data is invalid, please update the secret with cvalid format")
}

func (r *ReconcileObjectStoreUser) createOrUpdateCephUser(u *cephv1.CephObjectStoreUser) error {
func forceUpdateObjectUserSecret(annotations map[string]string) bool {
if value, found := annotations[updateObjectUserSecretAnnotation]; found {
if strings.EqualFold(value, sourceOfTruthSecret) {
return true
}
}
return false
}

func getSecretKeysJsonStruct(secretKeys []byte) ([]admin.UserKeySpec, error) {
var userKeys []admin.UserKeySpec
err := json.Unmarshal(secretKeys, &userKeys)
if err != nil {
return userKeys, errors.Wrapf(err, "unable to unmarshal secretKeys from the object secret")
}
return userKeys, nil
}

func (r *ReconcileObjectStoreUser) createOrUpdateCephUser(u *cephv1.CephObjectStoreUser) (bool, error) {
logger.Infof("creating ceph object user %q in namespace %q", u.Name, u.Namespace)

logCreateOrUpdate := fmt.Sprintf("retrieved existing ceph object user %q", u.Name)
var user admin.User
var secretKeysStruct []admin.UserKeySpec
userKeysSource := false
var err error

// get the user defined k8s s3 keys if exists
secretName := object.GenerateCephUserSecretName(u.Spec.Store, u.Name)
namspacedName := types.NamespacedName{Namespace: u.Namespace, Name: secretName}
cephObjectStoreUserSecret := &corev1.Secret{}
err = r.client.Get(r.clusterInfo.Context, namspacedName, cephObjectStoreUserSecret)
if err != nil {
if !kerrors.IsNotFound(err) {
return userKeysSource, errors.Wrapf(err, "failed to get user cephobjectuser secret %q", secretName)
}
logger.Debugf("no user secret %q provided for cephobjectuser", secretName)

} else {
if forceUpdateObjectUserSecret(cephObjectStoreUserSecret.GetAnnotations()) {
userKeysSource = true
secretKeysStruct, err = getSecretKeysJsonStruct(cephObjectStoreUserSecret.Data["SecretKeys"])
if err != nil {
return userKeysSource, errors.Wrapf(err, "invalid cephobjectuser secret %q", secretName)
}
secretKeysStruct, err = validateSecretKeys(string(cephObjectStoreUserSecret.Data["AccessKey"]), string(cephObjectStoreUserSecret.Data["SecretKey"]), secretKeysStruct)
if err != nil {
return userKeysSource, errors.Wrapf(err, "invalid cephobjectuser secret %q", secretName)
}
}
}

if userKeysSource {
// if secret exists and source of truth is secret then use the user specified keys
r.userConfig.Keys = secretKeysStruct
}
logger.Debugf("updated1 object user values are %v %v %t", r.userConfig.Keys, user.Keys, userKeysSource)

user, err = r.objContext.AdminOpsClient.GetUser(r.opManagerContext, *r.userConfig)
if err != nil {
if errors.Is(err, admin.ErrNoSuchUser) {
user, err = r.objContext.AdminOpsClient.CreateUser(r.opManagerContext, *r.userConfig)
if err != nil {
return errors.Wrapf(err, "failed to create ceph object user %v", &r.userConfig.ID)
return userKeysSource, errors.Wrapf(err, "failed to create ceph object user %v", &r.userConfig.ID)
}
logCreateOrUpdate = fmt.Sprintf("created ceph object user %q", u.Name)
} else {
return errors.Wrapf(err, "failed to get details from ceph object user %q", u.Name)
return userKeysSource, errors.Wrapf(err, "failed to get details from ceph object user %q", u.Name)
}
}

// Update max bucket if necessary
logger.Tracef("user capabilities(id: %s, caps: %#v, user caps: %s, op mask: %s)",
user.ID, user.Caps, user.UserCaps, user.OpMask)
if *user.MaxBuckets != *r.userConfig.MaxBuckets {
if *user.MaxBuckets != *r.userConfig.MaxBuckets || userKeysSource {
user, err = r.objContext.AdminOpsClient.ModifyUser(r.opManagerContext, *r.userConfig)
if err != nil {
return errors.Wrapf(err, "failed to update ceph object user %q max buckets", r.userConfig.ID)
return userKeysSource, errors.Wrapf(err, "failed to update ceph object user %q max buckets", r.userConfig.ID)
}
logCreateOrUpdate = fmt.Sprintf("updated ceph object user %q", u.Name)
}
Expand All @@ -325,14 +397,14 @@ func (r *ReconcileObjectStoreUser) createOrUpdateCephUser(u *cephv1.CephObjectSt
logger.Tracef("remove capabilities %s from user %s", user.UserCaps, r.userConfig.ID)
_, err = r.objContext.AdminOpsClient.RemoveUserCap(r.opManagerContext, r.userConfig.ID, user.UserCaps)
if err != nil {
return errors.Wrapf(err, "failed to remove current ceph object user %q capabilities", r.userConfig.ID)
return userKeysSource, errors.Wrapf(err, "failed to remove current ceph object user %q capabilities", r.userConfig.ID)
}
}
if r.userConfig.UserCaps != "" {
logger.Tracef("set capabilities %s for user %s", r.userConfig.UserCaps, r.userConfig.ID)
_, err = r.objContext.AdminOpsClient.AddUserCap(r.opManagerContext, r.userConfig.ID, r.userConfig.UserCaps)
if err != nil {
return errors.Wrapf(err, "failed to update ceph object user %q capabilities", r.userConfig.ID)
return userKeysSource, errors.Wrapf(err, "failed to update ceph object user %q capabilities", r.userConfig.ID)
}
}
logCreateOrUpdate = fmt.Sprintf("updated ceph object user %q", u.Name)
Expand All @@ -359,18 +431,19 @@ func (r *ReconcileObjectStoreUser) createOrUpdateCephUser(u *cephv1.CephObjectSt
}
err = r.objContext.AdminOpsClient.SetUserQuota(r.opManagerContext, userQuota)
if err != nil {
return errors.Wrapf(err, "failed to set quotas for user %q", u.Name)
return userKeysSource, errors.Wrapf(err, "failed to set quotas for user %q", u.Name)
}

// Set access and secret key
if r.userConfig.Keys == nil {
r.userConfig.Keys = make([]admin.UserKeySpec, 1)
r.userConfig.Keys[0].AccessKey = user.Keys[0].AccessKey
r.userConfig.Keys[0].SecretKey = user.Keys[0].SecretKey
}
r.userConfig.Keys[0].AccessKey = user.Keys[0].AccessKey
r.userConfig.Keys[0].SecretKey = user.Keys[0].SecretKey
logger.Debugf("updated object user values are %v %v %t", r.userConfig.Keys, user.Keys, userKeysSource)
logger.Info(logCreateOrUpdate)

return nil
return userKeysSource, nil
}

func (r *ReconcileObjectStoreUser) initializeObjectStoreContext(u *cephv1.CephObjectStoreUser) error {
Expand Down

0 comments on commit 4a834ab

Please sign in to comment.