/
ruleclient.py
286 lines (252 loc) · 12.4 KB
/
ruleclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from json import dumps, loads
from typing import Any, Optional, Union
from urllib.parse import quote_plus
from requests.status_codes import codes
from rucio.client.baseclient import BaseClient, choice
from rucio.common.utils import build_url
class RuleClient(BaseClient):
"""RuleClient class for working with replication rules"""
RULE_BASEURL = 'rules'
def add_replication_rule(
self,
dids: list[str],
copies: int,
rse_expression: str,
priority: int = 3,
lifetime: Optional[int] = None,
grouping: str = 'DATASET',
notify: str = 'N',
source_replica_expression: Optional[str] = None,
activity: Optional[str] = None,
account: Optional[str] = None,
meta: Optional[str] = None,
ignore_availability: bool = False,
purge_replicas: bool = False,
ask_approval: bool = False,
asynchronous: bool = False,
locked: bool = False,
delay_injection=None,
comment=None,
weight=None,
):
"""
:param dids: The data identifier set.
:param copies: The number of replicas.
:param rse_expression: Boolean string expression to give the list of RSEs.
:param priority: Priority of the transfers.
:param lifetime: The lifetime of the replication rules (in seconds).
:param grouping: ALL - All files will be replicated to the same RSE.
DATASET - All files in the same dataset will be replicated to the same RSE.
NONE - Files will be completely spread over all allowed RSEs without any grouping considerations at all.
:param notify: Notification setting for the rule (Y, N, C).
:param source_replica_expression: RSE Expression for RSEs to be considered for source replicas.
:param activity: Transfer Activity to be passed to FTS.
:param account: The account owning the rule.
:param meta: Metadata, as dictionary.
:param ignore_availability: Option to ignore the availability of RSEs.
:param purge_replicas: When the rule gets deleted purge the associated replicas immediately.
:param ask_approval: Ask for approval of this replication rule.
:param asynchronous: Create rule asynchronously by judge-injector.
:param locked: If the rule is locked, it cannot be deleted.
:param delay_injection:
:param comment: Comment about the rule.
:param weight: If the weighting option of the replication rule is used, the choice of RSEs takes their weight into account.
"""
path = self.RULE_BASEURL + '/'
url = build_url(choice(self.list_hosts), path=path)
# TODO remove the subscription_id from the client; It will only be used by the core;
data = dumps({'dids': dids, 'copies': copies, 'rse_expression': rse_expression,
'weight': weight, 'lifetime': lifetime, 'grouping': grouping,
'account': account, 'locked': locked, 'source_replica_expression': source_replica_expression,
'activity': activity, 'notify': notify, 'purge_replicas': purge_replicas,
'ignore_availability': ignore_availability, 'comment': comment, 'ask_approval': ask_approval,
'asynchronous': asynchronous, 'delay_injection': delay_injection, 'priority': priority, 'meta': meta})
r = self._send_request(url, type_='POST', data=data)
if r.status_code == codes.created:
return loads(r.text)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def delete_replication_rule(
self, rule_id: str, purge_replicas: Optional[bool] = None
):
"""
Deletes a replication rule and all associated locks.
:param rule_id: The id of the rule to be deleted
:param purge_replicas: Immediately delete the replicas.
:raises: RuleNotFound, AccessDenied
"""
path = self.RULE_BASEURL + '/' + rule_id
url = build_url(choice(self.list_hosts), path=path)
data = dumps({'purge_replicas': purge_replicas})
r = self._send_request(url, type_='DEL', data=data)
if r.status_code == codes.ok:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def get_replication_rule(self, rule_id: str):
"""
Get a replication rule.
:param rule_id: The id of the rule to be retrieved.
:raises: RuleNotFound
"""
path = self.RULE_BASEURL + '/' + rule_id
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type_='GET')
if r.status_code == codes.ok:
return next(self._load_json_data(r))
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def update_replication_rule(self, rule_id: str, options: dict[str, Any]):
"""
:param rule_id: The id of the rule to be retrieved.
:param options: Options dictionary.
:raises: RuleNotFound
"""
path = self.RULE_BASEURL + '/' + rule_id
url = build_url(choice(self.list_hosts), path=path)
data = dumps({'options': options})
r = self._send_request(url, type_='PUT', data=data)
if r.status_code == codes.ok:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def reduce_replication_rule(
self, rule_id: str, copies: int, exclude_expression=None
):
"""
:param rule_id: Rule to be reduced.
:param copies: Number of copies of the new rule.
:param exclude_expression: RSE Expression of RSEs to exclude.
:raises: RuleReplaceFailed, RuleNotFound
"""
path = self.RULE_BASEURL + '/' + rule_id + '/reduce'
url = build_url(choice(self.list_hosts), path=path)
data = dumps({'copies': copies, 'exclude_expression': exclude_expression})
r = self._send_request(url, type_='POST', data=data)
if r.status_code == codes.ok:
return loads(r.text)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def move_replication_rule(
self, rule_id: str, rse_expression: str, override
):
"""
Move a replication rule to another RSE and, once done, delete the original one.
:param rule_id: Rule to be moved.
:param rse_expression: RSE expression of the new rule.
:param override: Configurations to update for the new rule.
:raises: RuleNotFound, RuleReplaceFailed
"""
path = self.RULE_BASEURL + '/' + rule_id + '/move'
url = build_url(choice(self.list_hosts), path=path)
data = dumps({
'rule_id': rule_id,
'rse_expression': rse_expression,
'override': override,
})
r = self._send_request(url, type_='POST', data=data)
if r.status_code == codes.created:
return loads(r.text)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def approve_replication_rule(self, rule_id: str):
"""
:param rule_id: Rule to be approved.
:raises: RuleNotFound
"""
path = self.RULE_BASEURL + '/' + rule_id
url = build_url(choice(self.list_hosts), path=path)
data = dumps({'options': {'approve': True}})
r = self._send_request(url, type_='PUT', data=data)
if r.status_code == codes.ok:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def deny_replication_rule(self, rule_id: str, reason: Optional[str] = None):
"""
:param rule_id: Rule to be denied.
:param reason: Reason for denying the rule.
:raises: RuleNotFound
"""
path = self.RULE_BASEURL + '/' + rule_id
url = build_url(choice(self.list_hosts), path=path)
options: dict[str, Union[bool, str]] = {'approve': False}
if reason:
options['comment'] = reason
data = dumps({'options': options})
r = self._send_request(url, type_='PUT', data=data)
if r.status_code == codes.ok:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_replication_rule_full_history(
self, scope: Union[str, bytes], name: Union[str, bytes]
):
"""
List the rule history of a DID.
:param scope: The scope of the DID.
:param name: The name of the DID.
"""
path = '/'.join([self.RULE_BASEURL, quote_plus(scope), quote_plus(name), 'history'])
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type_='GET')
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(r.headers, r.status_code)
raise exc_cls(exc_msg)
def examine_replication_rule(self, rule_id: str):
"""
Examine a replication rule for errors during transfer.
:param rule_id: Rule to be denied.
:raises: RuleNotFound
"""
path = self.RULE_BASEURL + '/' + rule_id + '/analysis'
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type_='GET')
if r.status_code == codes.ok:
return next(self._load_json_data(r))
exc_cls, exc_msg = self._get_exception(r.headers, r.status_code)
raise exc_cls(exc_msg)
def list_replica_locks(self, rule_id: str):
"""
List details of all replica locks for a rule.
:param rule_id: Rule to be denied.
:raises: RuleNotFound
"""
path = self.RULE_BASEURL + '/' + rule_id + '/locks'
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type_='GET')
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(r.headers, r.status_code)
raise exc_cls(exc_msg)
def list_replication_rules(self, filters=None):
"""
List all replication rules which match a filter
:param filters: dictionary of attributes by which the rules should be filtered
:returns: True if successful, otherwise false.
"""
filters = filters or {}
path = self.RULE_BASEURL + '/'
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type_='GET', params=filters)
if r.status_code == codes.ok:
return self._load_json_data(r)
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)