forked from rucio/rucio
/
subscriptionclient.py
161 lines (145 loc) · 7.81 KB
/
subscriptionclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from json import dumps
from requests.status_codes import codes
from rucio.client.baseclient import BaseClient, choice
from rucio.common.utils import build_url
class SubscriptionClient(BaseClient):
"""SubscriptionClient class for working with subscriptions"""
SUB_BASEURL = 'subscriptions'
def add_subscription(self, name, account, filter_, replication_rules, comments, lifetime, retroactive, dry_run, priority=3):
"""
Adds a new subscription which will be verified against every new added file and dataset
:param name: Name of the subscription
:type: String
:param account: Account identifier
:type account: String
:param filter_: Dictionary of attributes by which the input data should be filtered
**Example**: ``{'dsn': 'data11_hi*.express_express.*,data11_hi*physics_MinBiasOverlay*', 'account': 'tzero'}``
:type filter_: Dict
:param replication_rules: Replication rules to be set : Dictionary with keys copies, rse_expression, weight, rse_expression
:type replication_rules: Dict
:param comments: Comments for the subscription
:type comments: String
:param lifetime: Subscription's lifetime (days); False if subscription has no lifetime
:type lifetime: Integer or False
:param retroactive: Flag to know if the subscription should be applied on previous data
:type retroactive: Boolean
:param dry_run: Just print the subscriptions actions without actually executing them (Useful if retroactive flag is set)
:type dry_run: Boolean
:param priority: The priority of the subscription (3 by default)
:type priority: Integer
"""
path = self.SUB_BASEURL + '/' + account + '/' + name
url = build_url(choice(self.list_hosts), path=path)
if retroactive:
raise NotImplementedError('Retroactive mode is not implemented')
if filter_ and not isinstance(filter_, dict):
raise TypeError('filter should be a dict')
if replication_rules and not isinstance(replication_rules, list):
raise TypeError('replication_rules should be a list')
data = dumps({'options': {'filter': filter_, 'replication_rules': replication_rules, 'comments': comments,
'lifetime': lifetime, 'retroactive': retroactive, 'dry_run': dry_run, 'priority': priority}})
result = self._send_request(url, type_='POST', data=data)
if result.status_code == codes.created: # pylint: disable=no-member
return result.text
else:
exc_cls, exc_msg = self._get_exception(headers=result.headers, status_code=result.status_code, data=result.content)
raise exc_cls(exc_msg)
def list_subscriptions(self, name=None, account=None):
"""
Returns a dictionary with the subscription information :
Examples: ``{'status': 'INACTIVE/ACTIVE/BROKEN', 'last_modified_date': ...}``
:param name: Name of the subscription
:type: String
:param account: Account identifier
:type account: String
:returns: Dictionary containing subscription parameter
:rtype: Dict
:raises: exception.NotFound if subscription is not found
"""
path = self.SUB_BASEURL
if account:
path += '/%s' % (account)
if name:
path += '/%s' % (name)
elif name:
path += '/Name/%s' % (name)
else:
path += '/'
url = build_url(choice(self.list_hosts), path=path)
result = self._send_request(url, type_='GET')
if result.status_code == codes.ok: # pylint: disable=no-member
return self._load_json_data(result)
if result.status_code == codes.not_found:
return []
exc_cls, exc_msg = self._get_exception(headers=result.headers, status_code=result.status_code, data=result.content)
raise exc_cls(exc_msg)
def update_subscription(self, name, account=None, filter_=None, replication_rules=None, comments=None, lifetime=None, retroactive=None, dry_run=None, priority=None):
"""
Updates a subscription
:param name: Name of the subscription
:type: String
:param account: Account identifier
:type account: String
:param filter_: Dictionary of attributes by which the input data should be filtered
**Example**: ``{'dsn': 'data11_hi*.express_express.*,data11_hi*physics_MinBiasOverlay*', 'account': 'tzero'}``
:type filter_: Dict
:param replication_rules: Replication rules to be set : Dictionary with keys copies, rse_expression, weight, rse_expression
:type replication_rules: Dict
:param comments: Comments for the subscription
:type comments: String
:param lifetime: Subscription's lifetime (days); False if subscription has no lifetime
:type lifetime: Integer or False
:param retroactive: Flag to know if the subscription should be applied on previous data
:type retroactive: Boolean
:param dry_run: Just print the subscriptions actions without actually executing them (Useful if retroactive flag is set)
:type dry_run: Boolean
:param priority: The priority of the subscription
:type priority: Integer
:raises: exception.NotFound if subscription is not found
"""
if not account:
account = self.account
if retroactive:
raise NotImplementedError('Retroactive mode is not implemented')
path = self.SUB_BASEURL + '/' + account + '/' + name
url = build_url(choice(self.list_hosts), path=path)
if filter_ and not isinstance(filter_, dict):
raise TypeError('filter should be a dict')
if replication_rules and not isinstance(replication_rules, list):
raise TypeError('replication_rules should be a list')
data = dumps({'options': {'filter': filter_, 'replication_rules': replication_rules, 'comments': comments,
'lifetime': lifetime, 'retroactive': retroactive, 'dry_run': dry_run, 'priority': priority}})
result = self._send_request(url, type_='PUT', data=data)
if result.status_code == codes.created: # pylint: disable=no-member
return True
else:
exc_cls, exc_msg = self._get_exception(headers=result.headers, status_code=result.status_code, data=result.content)
raise exc_cls(exc_msg)
def list_subscription_rules(self, account, name):
"""
List the associated rules of a subscription.
:param account: Account of the subscription.
:param name: Name of the subscription.
"""
path = '/'.join([self.SUB_BASEURL, account, name, 'Rules'])
url = build_url(choice(self.list_hosts), path=path)
result = self._send_request(url, type_='GET')
if result.status_code == codes.ok: # pylint: disable=no-member
return self._load_json_data(result)
else:
exc_cls, exc_msg = self._get_exception(headers=result.headers, status_code=result.status_code, data=result.content)
raise exc_cls(exc_msg)