/
admin.go
340 lines (293 loc) · 12.3 KB
/
admin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
/*
Copyright 2016 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package object
import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"regexp"
"github.com/ceph/go-ceph/rgw/admin"
"github.com/coreos/pkg/capnslog"
"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
"github.com/rook/rook/pkg/util/exec"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)
// Context holds the context for the object store.
type Context struct {
Context *clusterd.Context
clusterInfo *cephclient.ClusterInfo
CephClusterSpec cephv1.ClusterSpec
Name string
UID string
Endpoint string
Realm string
ZoneGroup string
Zone string
}
// AdminOpsContext holds the object store context as well as information for connecting to the admin
// ops API.
type AdminOpsContext struct {
Context
TlsCert []byte
AdminOpsUserAccessKey string
AdminOpsUserSecretKey string
AdminOpsClient *admin.API
}
type debugHTTPClient struct {
client admin.HTTPClient
logger *capnslog.PackageLogger
}
// NewDebugHTTPClient helps us mutating the HTTP client to debug the request/response
func NewDebugHTTPClient(client admin.HTTPClient, logger *capnslog.PackageLogger) *debugHTTPClient {
return &debugHTTPClient{client, logger}
}
func (c *debugHTTPClient) Do(req *http.Request) (*http.Response, error) {
dump, err := httputil.DumpRequestOut(req, true)
if err != nil {
return nil, err
}
// this can leak credentials for making requests
c.logger.Tracef("\n%s\n", string(dump))
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
dump, err = httputil.DumpResponse(resp, true)
if err != nil {
return nil, err
}
// this can leak any sensitive info like credentials in the response
c.logger.Tracef("\n%s\n", string(dump))
return resp, nil
}
const (
// RGWAdminOpsUserSecretName is the secret name of the admin ops user
// #nosec G101 since this is not leaking any hardcoded credentials, it's just the secret name
RGWAdminOpsUserSecretName = "rgw-admin-ops-user"
rgwAdminOpsUserAccessKey = "accessKey"
rgwAdminOpsUserSecretKey = "secretKey"
rgwAdminOpsUserCaps = "buckets=*;users=*;usage=read;metadata=read;zone=read"
)
var (
rgwAdminOpsUserDisplayName = "RGW Admin Ops User"
)
// NewContext creates a new object store context.
func NewContext(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, name string) *Context {
return &Context{Context: context, Name: name, clusterInfo: clusterInfo}
}
func NewMultisiteContext(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, store *cephv1.CephObjectStore) (*Context, error) {
nsName := fmt.Sprintf("%s/%s", store.Namespace, store.Name)
objContext := NewContext(context, clusterInfo, store.Name)
objContext.UID = string(store.UID)
if err := UpdateEndpoint(objContext, &store.Spec); err != nil {
return nil, err
}
realmName, zoneGroupName, zoneName, err := getMultisiteForObjectStore(context, &store.Spec, store.Namespace, store.Name)
if err != nil {
return nil, errors.Wrapf(err, "failed to get realm/zone group/zone for object store %q", nsName)
}
objContext.Realm = realmName
objContext.ZoneGroup = zoneGroupName
objContext.Zone = zoneName
return objContext, nil
}
// UpdateEndpoint updates an object.Context using the latest info from the CephObjectStore spec
func UpdateEndpoint(objContext *Context, spec *cephv1.ObjectStoreSpec) error {
nsName := fmt.Sprintf("%s/%s", objContext.clusterInfo.Namespace, objContext.Name)
port, err := spec.GetPort()
if err != nil {
return errors.Wrapf(err, "failed to get port for object store %q", nsName)
}
objContext.Endpoint = BuildDNSEndpoint(BuildDomainName(objContext.Name, objContext.clusterInfo.Namespace), port, spec.IsTLSEnabled())
return nil
}
func NewMultisiteAdminOpsContext(
objContext *Context,
spec *cephv1.ObjectStoreSpec,
) (*AdminOpsContext, error) {
accessKey, secretKey, err := GetAdminOPSUserCredentials(objContext, spec)
if err != nil {
return nil, errors.Wrapf(err, "failed to create or retrieve rgw admin ops user")
}
httpClient, tlsCert, err := genObjectStoreHTTPClientFunc(objContext, spec)
if err != nil {
return nil, err
}
// If DEBUG level is set we will mutate the HTTP client for printing request and response
var client *admin.API
if logger.LevelAt(capnslog.DEBUG) {
client, err = admin.New(objContext.Endpoint, accessKey, secretKey, NewDebugHTTPClient(httpClient, logger))
if err != nil {
return nil, errors.Wrap(err, "failed to build admin ops API connection")
}
} else {
client, err = admin.New(objContext.Endpoint, accessKey, secretKey, httpClient)
if err != nil {
return nil, errors.Wrap(err, "failed to build admin ops API connection")
}
}
return &AdminOpsContext{
Context: *objContext,
TlsCert: tlsCert,
AdminOpsUserAccessKey: accessKey,
AdminOpsUserSecretKey: secretKey,
AdminOpsClient: client,
}, nil
}
func extractJSON(output string) (string, error) {
// `radosgw-admin` sometimes leaves logs to stderr even if it succeeds.
// So we should skip them if parsing output as json.
// valid JSON can be an object (in braces) or an array (in brackets)
arrayRegex := regexp.MustCompile(`(?ms)^\[.*\]$`)
arrayMatch := arrayRegex.Find([]byte(output))
objRegex := regexp.MustCompile(`(?ms)^{.*}$`)
objMatch := objRegex.Find([]byte(output))
if arrayMatch == nil && objMatch == nil {
return "", errors.Errorf("didn't contain json. %s", output)
}
if arrayMatch == nil && objMatch != nil {
return string(objMatch), nil
}
if arrayMatch != nil && objMatch == nil {
return string(arrayMatch), nil
}
// if both object and array match, take the largest of the two matches
if len(arrayMatch) > len(objMatch) {
return string(arrayMatch), nil
}
return string(objMatch), nil
}
// RunAdminCommandNoMultisite is for running radosgw-admin commands in scenarios where an object-store has not been created yet or for commands on the realm or zonegroup (ex: radosgw-admin zonegroup get)
// This function times out after a fixed interval if no response is received.
// The function will return a Kubernetes error "NotFound" when exec fails when the pod does not exist
func RunAdminCommandNoMultisite(c *Context, expectJSON bool, args ...string) (string, error) {
var output, stderr string
var err error
// If Multus is enabled we proxy all the command to the mgr sidecar
if c.CephClusterSpec.Network.IsMultus() {
output, stderr, err = c.Context.RemoteExecutor.ExecCommandInContainerWithFullOutputWithTimeout(cephclient.ProxyAppLabel, cephclient.CommandProxyInitContainerName, c.clusterInfo.Namespace, append([]string{"radosgw-admin"}, args...)...)
} else {
command, args := cephclient.FinalizeCephCommandArgs("radosgw-admin", c.clusterInfo, args, c.Context.ConfigDir)
output, err = c.Context.Executor.ExecuteCommandWithTimeout(exec.CephCommandsTimeout, command, args...)
}
if err != nil {
return fmt.Sprintf("%s. %s", output, stderr), err
}
if expectJSON {
match, err := extractJSON(output)
if err != nil {
return output, errors.Wrap(err, "failed to parse as JSON")
}
output = match
}
return output, nil
}
// This function is for running radosgw-admin commands in scenarios where an object-store has been created and the Context has been updated with the appropriate realm, zone group, and zone.
func runAdminCommand(c *Context, expectJSON bool, args ...string) (string, error) {
// If the objectStoreName is not passed in the storage class
// This means we are pointing to an external cluster so these commands are not needed
// simply because the external cluster mode does not support that yet
//
// The following conditions tries to determine if the cluster is external
// When connecting to an external cluster, the Ceph user is different than client.admin
// This is not perfect though since "client.admin" is somehow supported...
if c.Name != "" && c.clusterInfo.CephCred.Username == cephclient.AdminUsername {
options := []string{
fmt.Sprintf("--rgw-realm=%s", c.Realm),
fmt.Sprintf("--rgw-zonegroup=%s", c.ZoneGroup),
fmt.Sprintf("--rgw-zone=%s", c.Zone),
}
args = append(args, options...)
}
// work around FIFO file I/O issue when radosgw-admin is not compatible between version
// installed in Rook operator and RGW version in Ceph cluster (#7573)
result, err := RunAdminCommandNoMultisite(c, expectJSON, args...)
if err != nil && isFifoFileIOError(err) {
logger.Debugf("retrying 'radosgw-admin' command with OMAP backend to work around FIFO file I/O issue. %v", result)
// We can either run 'ceph --version' to determine the Ceph version running in the operator
// and then pick a flag to use, or we can just try to use both flags and return the one that
// works. Same number of commands being run.
retryArgs := append(args, "--rgw-data-log-backing=omap") // v16.2.0- in the operator
retryResult, retryErr := RunAdminCommandNoMultisite(c, expectJSON, retryArgs...)
if retryErr != nil && isInvalidFlagError(retryErr) {
retryArgs = append(args, "--rgw-default-data-log-backing=omap") // v16.2.1+ in the operator
retryResult, retryErr = RunAdminCommandNoMultisite(c, expectJSON, retryArgs...)
}
return retryResult, retryErr
}
return result, err
}
func isFifoFileIOError(err error) bool {
exitCode, extractErr := exec.ExtractExitCode(err)
if extractErr != nil {
logger.Errorf("failed to determine return code of 'radosgw-admin' command. assuming this could be a FIFO file I/O issue. %#v", extractErr)
return true
}
// exit code 5 (EIO) is returned when there is a FIFO file I/O issue
return exitCode == 5
}
func isInvalidFlagError(err error) bool {
exitCode, extractErr := exec.ExtractExitCode(err)
if extractErr != nil {
logger.Errorf("failed to determine return code of 'radosgw-admin' command. assuming this could be an invalid flag error. %#v", extractErr)
}
// exit code 22 (EINVAL) is returned when there is an invalid flag
// it's also returned from some other failures, but this should be rare for Rook
return exitCode == 22
}
func GetAdminOPSUserCredentials(objContext *Context, spec *cephv1.ObjectStoreSpec) (string, string, error) {
ns := objContext.clusterInfo.Namespace
if spec.IsExternal() {
// Fetch the secret for admin ops user
s := &v1.Secret{}
err := objContext.Context.Client.Get(context.TODO(), types.NamespacedName{Name: RGWAdminOpsUserSecretName, Namespace: ns}, s)
if err != nil {
return "", "", err
}
accessKey, ok := s.Data[rgwAdminOpsUserAccessKey]
if !ok {
return "", "", errors.Errorf("failed to find accessKey %q for rgw admin ops in secret %q", rgwAdminOpsUserAccessKey, RGWAdminOpsUserSecretName)
}
secretKey, ok := s.Data[rgwAdminOpsUserSecretKey]
if !ok {
return "", "", errors.Errorf("failed to find secretKey %q for rgw admin ops in secret %q", rgwAdminOpsUserSecretKey, RGWAdminOpsUserSecretName)
}
// Set the keys for further usage
return string(accessKey), string(secretKey), nil
}
// Fetch the admin ops user locally
userConfig := ObjectUser{
UserID: RGWAdminOpsUserSecretName,
DisplayName: &rgwAdminOpsUserDisplayName,
AdminOpsUser: true,
}
logger.Debugf("creating s3 user object %q for object store %q", userConfig.UserID, ns)
user, rgwerr, err := CreateUser(objContext, userConfig)
if err != nil {
if rgwerr == ErrorCodeFileExists {
user, _, err = GetUser(objContext, userConfig.UserID)
if err != nil {
return "", "", errors.Wrapf(err, "failed to get details from ceph object user %q for object store %q", userConfig.UserID, objContext.Name)
}
} else {
return "", "", errors.Wrapf(err, "failed to create object user %q. error code %d for object store %q", userConfig.UserID, rgwerr, objContext.Name)
}
}
return *user.AccessKey, *user.SecretKey, nil
}