/
lockclient.py
98 lines (76 loc) · 3.85 KB
/
lockclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from urllib.parse import quote_plus
from requests.status_codes import codes
from rucio.client.baseclient import BaseClient, choice
from rucio.common.utils import build_url, render_json
class LockClient(BaseClient):
"""Lock client class for working with rucio locks"""
LOCKS_BASEURL = 'locks'
def get_dataset_locks(self, scope, name):
"""
Get a dataset locks of the specified dataset.
:param scope: the scope of the did of the locks to list.
:param name: the name of the did of the locks to list.
"""
path = '/'.join([self.LOCKS_BASEURL, quote_plus(scope), quote_plus(name)])
url = build_url(choice(self.list_hosts), path=path, params={'did_type': 'dataset'})
result = self._send_request(url)
if result.status_code == codes.ok: # pylint: disable-msg=E1101
locks = self._load_json_data(result)
return locks
else:
exc_cls, exc_msg = self._get_exception(headers=result.headers,
status_code=result.status_code)
raise exc_cls(exc_msg)
def get_locks_for_dids(self, dids, **filter_args):
"""
Get list of locks for for all the files found, recursively, in the listed datasets or containers.
:param dids: list of dictionaries {"scope":..., "name":..., "type":...}
type can be either "dataset" or "container"
type is optional, but if specified, improves the query performance
:returns: list of dictionaries with lock info
"""
# convert did list to list of dictionaries
assert all(did.get("type", "dataset") in ("dataset", "container") for did in dids), "did type can be either 'container' or 'dataset'"
path = '/'.join([self.LOCKS_BASEURL, "bulk_locks_for_dids"])
url = build_url(choice(self.list_hosts), path=path)
result = self._send_request(url, type_='POST', data=render_json(dids=dids))
if result.status_code == codes.ok: # pylint: disable-msg=E1101
out = []
for lock in self._load_json_data(result):
filter_ok = (not filter_args) or all(lock.get(name) == value for name, value in filter_args.items())
if filter_ok:
out.append(lock)
return out
else:
exc_cls, exc_msg = self._get_exception(headers=result.headers,
status_code=result.status_code)
raise exc_cls(exc_msg)
def get_dataset_locks_by_rse(self, rse):
"""
Get all dataset locks of the specified rse.
:param rse: the rse of the locks to list.
"""
path = '/'.join([self.LOCKS_BASEURL, rse])
url = build_url(choice(self.list_hosts), path=path, params={'did_type': 'dataset'})
result = self._send_request(url)
if result.status_code == codes.ok: # pylint: disable-msg=E1101
locks = self._load_json_data(result)
return locks
else:
exc_cls, exc_msg = self._get_exception(headers=result.headers,
status_code=result.status_code)
raise exc_cls(exc_msg)