/
replicaclient.py
443 lines (363 loc) · 18.9 KB
/
replicaclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from json import dumps, loads
from urllib.parse import quote_plus
from requests.status_codes import codes
from rucio.client.baseclient import BaseClient, choice
from rucio.common.utils import build_url, chunks, render_json
class ReplicaClient(BaseClient):
"""Replica client class for working with replicas"""
REPLICAS_BASEURL = 'replicas'
REPLICAS_CHUNK_SIZE = 1000
def quarantine_replicas(self, replicas, rse=None, rse_id=None):
"""
Add quaratined replicas for RSE.
:param replicas: List of replica infos: {'scope': <scope> (optional), 'name': <name> (optional), 'path':<path> (required)}.
:param rse: RSE name.
:param rse_id: RSE id. Either RSE name or RSE id must be specified, but not both
"""
if (rse is None) == (rse_id is None):
raise ValueError("Either RSE name or RSE id must be specified, but not both")
url = build_url(self.host, path='/'.join([self.REPLICAS_BASEURL, 'quarantine']))
headers = {}
for chunk in chunks(replicas, self.REPLICAS_CHUNK_SIZE):
data = {'rse': rse, 'rse_id': rse_id, 'replicas': chunk}
r = self._send_request(url, headers=headers, type_='POST', data=dumps(data))
if r.status_code != codes.ok:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def declare_bad_file_replicas(self, replicas, reason, force=False):
"""
Declare a list of bad replicas.
:param replicas: Either a list of PFNs (string) or a list of dicts {'scope': <scope>, 'name': <name>, 'rse_id': <rse_id> or 'rse': <rse_name>}
:param reason: The reason of the loss.
:param force: boolean, tell the serrver to ignore existing replica status in the bad_replicas table. Default: False
:returns: Dictionary {"rse_name": ["did: error",...]} - list of strings for DIDs failed to declare, by RSE
"""
out = {} # {rse: ["did: error text",...]}
url = build_url(self.host, path='/'.join([self.REPLICAS_BASEURL, 'bad']))
headers = {}
for chunk in chunks(replicas, self.REPLICAS_CHUNK_SIZE):
data = {'reason': reason, 'replicas': chunk, 'force': force}
r = self._send_request(url, headers=headers, type_='POST', data=dumps(data))
if r.status_code not in (codes.created, codes.ok):
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
chunk_result = loads(r.text)
if chunk_result:
for rse, lst in chunk_result.items():
out.setdefault(rse, []).extend(lst)
return out
def declare_bad_did_replicas(self, rse, dids, reason):
"""
Declare a list of bad replicas.
:param rse: The RSE where the bad replicas reside
:param dids: The DIDs of the bad replicas
:param reason: The reason of the loss.
"""
data = {'reason': reason, 'rse': rse, 'dids': dids}
url = build_url(self.host, path='/'.join([self.REPLICAS_BASEURL, 'bad/dids']))
headers = {}
r = self._send_request(url, headers=headers, type_='POST', data=dumps(data))
if r.status_code == codes.created:
return loads(r.text)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def declare_suspicious_file_replicas(self, pfns, reason):
"""
Declare a list of bad replicas.
:param pfns: The list of PFNs.
:param reason: The reason of the loss.
"""
data = {'reason': reason, 'pfns': pfns}
url = build_url(self.host, path='/'.join([self.REPLICAS_BASEURL, 'suspicious']))
headers = {}
r = self._send_request(url, headers=headers, type_='POST', data=dumps(data))
if r.status_code == codes.created:
return loads(r.text)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def get_did_from_pfns(self, pfns, rse=None):
"""
Get the DIDs associated to a PFN on one given RSE
:param pfns: The list of PFNs.
:param rse: The RSE name.
:returns: A list of dictionaries {pfn: {'scope': scope, 'name': name}}
"""
data = {'rse': rse, 'pfns': pfns}
url = build_url(self.host, path='/'.join([self.REPLICAS_BASEURL, 'dids']))
headers = {}
r = self._send_request(url, headers=headers, type_='POST', data=dumps(data))
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_replicas(self, dids, schemes=None, ignore_availability=True,
all_states=False, metalink=False, rse_expression=None,
client_location=None, sort=None, domain=None,
signature_lifetime=None, nrandom=None,
resolve_archives=True, resolve_parents=False,
updated_after=None):
"""
List file replicas for a list of data identifiers (DIDs).
:param dids: The list of data identifiers (DIDs) like :
[{'scope': <scope1>, 'name': <name1>}, {'scope': <scope2>, 'name': <name2>}, ...]
:param schemes: A list of schemes to filter the replicas. (e.g. file, http, ...)
:param ignore_availability: Also include replicas from blocked RSEs into the list
:param metalink: ``False`` (default) retrieves as JSON,
``True`` retrieves as metalink4+xml.
:param rse_expression: The RSE expression to restrict replicas on a set of RSEs.
:param client_location: Client location dictionary for PFN modification {'ip', 'fqdn', 'site', 'latitude', 'longitude'}
:param sort: Sort the replicas: ``geoip`` - based on src/dst IP topographical distance
``closeness`` - based on src/dst closeness
``dynamic`` - Rucio Dynamic Smart Sort (tm)
:param domain: Define the domain. None is fallback to 'wan', otherwise 'wan, 'lan', or 'all'
:param signature_lifetime: If supported, in seconds, restrict the lifetime of the signed PFN.
:param nrandom: pick N random replicas. If the initial number of replicas is smaller than N, returns all replicas.
:param resolve_archives: When set to True, find archives which contain the replicas.
:param resolve_parents: When set to True, find all parent datasets which contain the replicas.
:param updated_after: epoch timestamp or datetime object (UTC time), only return replicas updated after this time
:returns: A list of dictionaries with replica information.
"""
data = {'dids': dids,
'domain': domain}
if schemes:
data['schemes'] = schemes
if ignore_availability is not None:
data['ignore_availability'] = ignore_availability
data['all_states'] = all_states
if rse_expression:
data['rse_expression'] = rse_expression
if client_location:
data['client_location'] = client_location
if sort:
data['sort'] = sort
if updated_after:
if isinstance(updated_after, datetime):
# encode in UTC string with format '%Y-%m-%dT%H:%M:%S' e.g. '2020-03-02T12:01:38'
data['updated_after'] = updated_after.strftime('%Y-%m-%dT%H:%M:%S')
else:
data['updated_after'] = updated_after
if signature_lifetime:
data['signature_lifetime'] = signature_lifetime
if nrandom:
data['nrandom'] = nrandom
data['resolve_archives'] = resolve_archives
data['resolve_parents'] = resolve_parents
url = build_url(choice(self.list_hosts),
path='/'.join([self.REPLICAS_BASEURL, 'list']))
headers = {}
if metalink:
headers['Accept'] = 'application/metalink4+xml'
# pass json dict in querystring
r = self._send_request(url, headers=headers, type_='POST', data=dumps(data), stream=True)
if r.status_code == codes.ok:
if not metalink:
return self._load_json_data(r)
return r.text
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_suspicious_replicas(self, rse_expression=None, younger_than=None, nattempts=None):
"""
List file replicas tagged as suspicious.
:param rse_expression: The RSE expression to restrict replicas on a set of RSEs.
:param younger_than: Datetime object to select the replicas which were declared since younger_than date. Default value = 10 days ago.
:param nattempts: The minimum number of replica appearances in the bad_replica DB table from younger_than date. Default value = 0.
:param state: State of the replica, either 'BAD' or 'SUSPICIOUS'. No value returns replicas with either state.
"""
params = {}
if rse_expression:
params['rse_expression'] = rse_expression
if younger_than:
params['younger_than'] = younger_than
if nattempts:
params['nattempts'] = nattempts
url = build_url(choice(self.list_hosts),
path='/'.join([self.REPLICAS_BASEURL, 'suspicious']))
r = self._send_request(url, type_='GET', params=params)
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def add_replica(self, rse, scope, name, bytes_, adler32, pfn=None, md5=None, meta={}):
"""
Add file replicas to a RSE.
:param rse: the RSE name.
:param scope: The scope of the file.
:param name: The name of the file.
:param bytes_: The size in bytes.
:param adler32: adler32 checksum.
:param pfn: PFN of the file for non deterministic RSE.
:param md5: md5 checksum.
:param meta: Metadata attributes.
:return: True if files were created successfully.
"""
dict_ = {'scope': scope, 'name': name, 'bytes': bytes_, 'meta': meta, 'adler32': adler32}
if md5:
dict_['md5'] = md5
if pfn:
dict_['pfn'] = pfn
return self.add_replicas(rse=rse, files=[dict_])
def add_replicas(self, rse, files, ignore_availability=True):
"""
Bulk add file replicas to a RSE.
:param rse: the RSE name.
:param files: The list of files. This is a list of DIDs like :
[{'scope': <scope1>, 'name': <name1>}, {'scope': <scope2>, 'name': <name2>}, ...]
:param ignore_availability: Ignore the RSE blocklsit.
:return: True if files were created successfully.
"""
url = build_url(choice(self.list_hosts), path=self.REPLICAS_BASEURL)
data = {'rse': rse, 'files': files, 'ignore_availability': ignore_availability}
r = self._send_request(url, type_='POST', data=render_json(**data))
if r.status_code == codes.created:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def delete_replicas(self, rse, files, ignore_availability=True):
"""
Bulk delete file replicas from a RSE.
:param rse: the RSE name.
:param files: The list of files. This is a list of DIDs like :
[{'scope': <scope1>, 'name': <name1>}, {'scope': <scope2>, 'name': <name2>}, ...]
:param ignore_availability: Ignore the RSE blocklist.
:return: True if files have been deleted successfully.
"""
url = build_url(choice(self.list_hosts), path=self.REPLICAS_BASEURL)
data = {'rse': rse, 'files': files, 'ignore_availability': ignore_availability}
r = self._send_request(url, type_='DEL', data=render_json(**data))
if r.status_code == codes.ok:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def update_replicas_states(self, rse, files):
"""
Bulk update the file replicas states from a RSE.
:param rse: the RSE name.
:param files: The list of files. This is a list of DIDs like :
[{'scope': <scope1>, 'name': <name1>, 'state': <state1>}, {'scope': <scope2>, 'name': <name2>, 'state': <state2>}, ...],
where a state value can be either of:
'A' (AVAILABLE)
'U' (UNAVAILABLE)
'C' (COPYING)
'B' (BEING_DELETED)
'D' (BAD)
'T' (TEMPORARY_UNAVAILABLE)
:return: True if replica states have been updated successfully, otherwise an exception is raised.
"""
url = build_url(choice(self.list_hosts), path=self.REPLICAS_BASEURL)
data = {'rse': rse, 'files': files}
r = self._send_request(url, type_='PUT', data=render_json(**data))
if r.status_code == codes.ok:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_dataset_replicas(self, scope, name, deep=False):
"""
List dataset replicas for a did (scope:name).
:param scope: The scope of the dataset.
:param name: The name of the dataset.
:param deep: Lookup at the file level.
:returns: A list of dict dataset replicas.
"""
payload = {}
if deep:
payload = {'deep': True}
url = build_url(self.host,
path='/'.join([self.REPLICAS_BASEURL, quote_plus(scope), quote_plus(name), 'datasets']),
params=payload)
r = self._send_request(url, type_='GET')
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_dataset_replicas_bulk(self, dids):
"""
List dataset replicas for a did (scope:name).
:param dids: The list of DIDs of the datasets.
:returns: A list of dict dataset replicas.
"""
payload = {'dids': list(dids)}
url = build_url(self.host, path='/'.join([self.REPLICAS_BASEURL, 'datasets_bulk']))
r = self._send_request(url, type_='POST', data=dumps(payload))
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_dataset_replicas_vp(self, scope, name, deep=False):
"""
List dataset replicas for a DID (scope:name) using the
Virtual Placement service.
NOTICE: This is an RnD function and might change or go away at any time.
:param scope: The scope of the dataset.
:param name: The name of the dataset.
:param deep: Lookup at the file level.
:returns: If VP exists a list of dicts of sites
"""
payload = {}
if deep:
payload = {'deep': True}
url = build_url(self.host,
path='/'.join([self.REPLICAS_BASEURL, quote_plus(scope), quote_plus(name), 'datasets_vp']),
params=payload)
r = self._send_request(url, type_='GET')
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_datasets_per_rse(self, rse, filters=None, limit=None):
"""
List datasets at a RSE.
:param rse: the rse name.
:param filters: dictionary of attributes by which the results should be filtered.
:param limit: limit number.
:returns: A list of dict dataset replicas.
"""
url = build_url(self.host, path='/'.join([self.REPLICAS_BASEURL, 'rse', rse]))
r = self._send_request(url, type_='GET')
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def add_bad_pfns(self, pfns, reason, state, expires_at):
"""
Declare a list of bad replicas.
:param pfns: The list of PFNs.
:param reason: The reason of the loss.
:param state: The state of the replica. Either BAD, SUSPICIOUS, TEMPORARY_UNAVAILABLE
:param expires_at: Specify a timeout for the TEMPORARY_UNAVAILABLE replicas. None for BAD files.
:return: True if PFNs were created successfully.
"""
data = {'reason': reason, 'pfns': pfns, 'state': state, 'expires_at': expires_at}
url = build_url(self.host, path='/'.join([self.REPLICAS_BASEURL, 'bad/pfns']))
headers = {}
r = self._send_request(url, headers=headers, type_='POST', data=dumps(data))
if r.status_code == codes.created:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def set_tombstone(self, replicas):
"""
Set a tombstone on a list of replicas.
:param replicas: list of replicas.
"""
url = build_url(self.host, path='/'.join([self.REPLICAS_BASEURL, 'tombstone']))
data = {'replicas': replicas}
r = self._send_request(url, type_='POST', data=render_json(**data))
if r.status_code == codes.created:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)