/
filesystem_mirror.go
235 lines (187 loc) · 10 KB
/
filesystem_mirror.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
/*
Copyright 2021 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"encoding/json"
"fmt"
"strings"
"syscall"
"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/util/exec"
)
type BootstrapPeerToken struct {
Token string `json:"token"`
}
// RemoveFilesystemMirrorPeer add a mirror peer in the cephfs-mirror configuration
func RemoveFilesystemMirrorPeer(context *clusterd.Context, clusterInfo *ClusterInfo, peerUUID string) error {
logger.Infof("removing cephfs-mirror peer %q", peerUUID)
// Build command
args := []string{"fs", "snapshot", "mirror", "peer_remove", peerUUID}
cmd := NewCephCommand(context, clusterInfo, args)
// Run command
output, err := cmd.Run()
if err != nil {
return errors.Wrapf(err, "failed to remove cephfs-mirror peer for filesystem %q. %s", peerUUID, output)
}
logger.Infof("successfully removed cephfs-mirror peer %q", peerUUID)
return nil
}
// EnableFilesystemSnapshotMirror enables filesystem snapshot mirroring
func EnableFilesystemSnapshotMirror(context *clusterd.Context, clusterInfo *ClusterInfo, filesystem string) error {
logger.Infof("enabling ceph filesystem snapshot mirror for filesystem %q", filesystem)
// Build command
args := []string{"fs", "snapshot", "mirror", "enable", filesystem}
cmd := NewCephCommand(context, clusterInfo, args)
// Run command
output, err := cmd.Run()
if err != nil {
return errors.Wrapf(err, "failed to enable ceph filesystem snapshot mirror for filesystem %q. %s", filesystem, output)
}
logger.Infof("successfully enabled ceph filesystem snapshot mirror for filesystem %q", filesystem)
return nil
}
// DisableFilesystemSnapshotMirror enables filesystem snapshot mirroring
func DisableFilesystemSnapshotMirror(context *clusterd.Context, clusterInfo *ClusterInfo, filesystem string) error {
logger.Infof("disabling ceph filesystem snapshot mirror for filesystem %q", filesystem)
// Build command
args := []string{"fs", "snapshot", "mirror", "disable", filesystem}
cmd := NewCephCommand(context, clusterInfo, args)
// Run command
output, err := cmd.Run()
if err != nil {
if code, err := exec.ExtractExitCode(err); err == nil && code == int(syscall.ENOTSUP) {
logger.Debug("filesystem mirroring is not enabled, nothing to disable")
return nil
}
return errors.Wrapf(err, "failed to disable ceph filesystem snapshot mirror for filesystem %q. %s", filesystem, output)
}
logger.Infof("successfully disabled ceph filesystem snapshot mirror for filesystem %q", filesystem)
return nil
}
func AddSnapshotSchedule(context *clusterd.Context, clusterInfo *ClusterInfo, path, interval, startTime, filesystem string) error {
logger.Infof("adding snapshot schedule every %q to ceph filesystem %q on path %q", interval, filesystem, path)
args := []string{"fs", "snap-schedule", "add", path, interval}
if startTime != "" {
args = append(args, startTime)
}
args = append(args, fmt.Sprintf("fs=%s", filesystem))
cmd := NewCephCommand(context, clusterInfo, args)
cmd.JsonOutput = false
// Example command: "ceph fs snap-schedule add / 4d fs=myfs2"
// CHANGE time for "2014-01-09T21:48:00" IF interval
// Run command
output, err := cmd.Run()
if err != nil {
if code, ok := exec.ExitStatus(err); ok && code != int(syscall.EEXIST) {
return errors.Wrapf(err, "failed to add snapshot schedule every %q to ceph filesystem %q on path %q. %s", interval, filesystem, path, output)
}
}
logger.Infof("successfully added snapshot schedule every %q to ceph filesystem %q on path %q", interval, filesystem, path)
return nil
}
func AddSnapshotScheduleRetention(context *clusterd.Context, clusterInfo *ClusterInfo, path, duration, filesystem string) error {
logger.Infof("adding snapshot schedule retention %s to ceph filesystem %q on path %q", duration, filesystem, path)
// Example command: "ceph fs snap-schedule retention add / d 1 fs=myfs2"
args := []string{"fs", "snap-schedule", "retention", "add", path, duration, fmt.Sprintf("fs=%s", filesystem)}
cmd := NewCephCommand(context, clusterInfo, args)
cmd.JsonOutput = false
// Run command
output, err := cmd.Run()
if err != nil {
if code, ok := exec.ExitStatus(err); ok && code == int(syscall.ENOENT) {
logger.Warningf("snapshot schedule retention %s already exists for filesystem %q on path %q. %s", duration, filesystem, path, output)
} else {
return errors.Wrapf(err, "failed to add snapshot schedule retention %s to ceph filesystem %q on path %q. %s", duration, filesystem, path, output)
}
}
logger.Infof("successfully added snapshot schedule retention %s to ceph filesystem %q on path %q", duration, filesystem, path)
return nil
}
func GetSnapshotScheduleStatus(context *clusterd.Context, clusterInfo *ClusterInfo, filesystem string) ([]cephv1.FilesystemSnapshotSchedulesSpec, error) {
logger.Infof("retrieving snapshot schedule status for ceph filesystem %q", filesystem)
args := []string{"fs", "snap-schedule", "status", "/", "recursive=true", fmt.Sprintf("--fs=%s", filesystem)}
cmd := NewCephCommand(context, clusterInfo, args)
// Run command
output, err := cmd.Run()
if err != nil {
return nil, errors.Wrapf(err, "failed to retrieve snapshot schedule status for ceph filesystem %q. %s", filesystem, output)
}
// Unmarshal JSON into Go struct
var filesystemSnapshotSchedulesStatusSpec []cephv1.FilesystemSnapshotSchedulesSpec
/* Replace new line since the command outputs a new line first and breaks the json parsing...
[root@rook-ceph-operator-75c6d6bbfc-wqlnc /]# ceph --connect-timeout=15 --cluster=rook-ceph --conf=/var/lib/rook/rook-ceph/rook-ceph.config --name=client.admin --keyring=/var/lib/rook/rook-ceph/client.admin.keyring --format json fs snap-schedule status /
[{"fs": "myfs", "subvol": null, "path": "/", "rel_path": "/", "schedule": "24h", "retention": {"h": 24}, "start": "2021-07-01T00:00:00", "created": "2021-07-01T12:19:12", "first": null, "last": null, "last_pruned": null, "created_count": 0, "pruned_count": 0, "active": true},{"fs": "myfs", "subvol": null, "path": "/", "rel_path": "/", "schedule": "25h", "retention": {"h": 24}, "start": "2021-07-01T00:00:00", "created": "2021-07-01T12:31:25", "first": null, "last": null, "last_pruned": null, "created_count": 0, "pruned_count": 0, "active": true}]
*/
if err := json.Unmarshal([]byte(strings.ReplaceAll(string(output), "\n", "")), &filesystemSnapshotSchedulesStatusSpec); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal filesystem mirror snapshot schedule status response")
}
logger.Infof("successfully retrieved snapshot schedule status for ceph filesystem %q", filesystem)
return filesystemSnapshotSchedulesStatusSpec, nil
}
// ImportFSMirrorBootstrapPeer add a mirror peer in the cephfs-mirror configuration
func ImportFSMirrorBootstrapPeer(context *clusterd.Context, clusterInfo *ClusterInfo, fsName, token string) error {
logger.Infof("importing cephfs bootstrap peer token for filesystem %q", fsName)
// Build command
args := []string{"fs", "snapshot", "mirror", "peer_bootstrap", "import", fsName, strings.TrimSpace(token)}
cmd := NewCephCommand(context, clusterInfo, args)
cmd.JsonOutput = false
cmd.combinedOutput = true
// Run command
output, err := cmd.Run()
if err != nil {
return errors.Wrapf(err, "failed to import cephfs-mirror peer token for filesystem %q. %s", fsName, output)
}
logger.Infof("successfully imported cephfs-mirror peer for filesystem %q", fsName)
return nil
}
// CreateFSMirrorBootstrapPeer add a mirror peer in the cephfs-mirror configuration
func CreateFSMirrorBootstrapPeer(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string) ([]byte, error) {
logger.Infof("create cephfs-mirror bootstrap peer token for filesystem %q", fsName)
// Build command
args := []string{"fs", "snapshot", "mirror", "peer_bootstrap", "create", fsName, "client.mirror", clusterInfo.FSID}
cmd := NewCephCommand(context, clusterInfo, args)
// Run command
output, err := cmd.Run()
if err != nil {
return nil, errors.Wrapf(err, "failed to create cephfs-mirror peer token for filesystem %q. %s", fsName, output)
}
// Unmarshal JSON into Go struct
var bootstrapPeerToken BootstrapPeerToken
if err := json.Unmarshal(output, &bootstrapPeerToken); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal cephfs-mirror peer token create response. %s", output)
}
logger.Infof("successfully created cephfs-mirror bootstrap peer token for filesystem %q", fsName)
return []byte(bootstrapPeerToken.Token), nil
}
// GetFSMirrorDaemonStatus returns the mirroring status of a given filesystem
func GetFSMirrorDaemonStatus(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string) ([]cephv1.FilesystemMirroringInfo, error) {
// Using Debug level since this is called in a recurrent go routine
logger.Debugf("retrieving filesystem mirror status for filesystem %q", fsName)
// Build command
args := []string{"fs", "snapshot", "mirror", "daemon", "status", fsName}
cmd := NewCephCommand(context, clusterInfo, args)
// Run command
output, err := cmd.Run()
if err != nil {
return nil, errors.Wrapf(err, "failed to retrieve filesystem mirror status for filesystem %q. %s", fsName, output)
}
// Unmarshal JSON into Go struct
var filesystemMirroringInfo []cephv1.FilesystemMirroringInfo
if err := json.Unmarshal([]byte(output), &filesystemMirroringInfo); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal filesystem mirror status response. %q.", string(output))
}
logger.Debugf("successfully retrieved filesystem mirror status for filesystem %q", fsName)
return filesystemMirroringInfo, nil
}