/
subscription.py
255 lines (215 loc) · 10.3 KB
/
subscription.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
from json import dumps, loads
from typing import TYPE_CHECKING
from rucio.api.permission import has_permission
from rucio.common.exception import AccessDenied, InvalidObject
from rucio.common.schema import validate_schema
from rucio.common.types import InternalAccount, InternalScope
from rucio.core import subscription
from rucio.db.sqla.session import read_session, stream_session, transactional_session
if TYPE_CHECKING:
from sqlalchemy.orm import Session
SubscriptionRuleState = namedtuple('SubscriptionRuleState', ['account', 'name', 'state', 'count'])
@transactional_session
def add_subscription(name, account, filter_, replication_rules, comments, lifetime, retroactive, dry_run, priority=None, issuer=None, vo='def', *, session: "Session"):
"""
Adds a new subscription which will be verified against every new added file and dataset
:param account: Account identifier
:type account: String
:param name: Name of the subscription
:type: String
:param filter_: Dictionary of attributes by which the input data should be filtered
**Example**: ``{'dsn': 'data11_hi*.express_express.*,data11_hi*physics_MinBiasOverlay*', 'account': 'tzero'}``
:type filter_: Dict
:param replication_rules: Replication rules to be set : Dictionary with keys copies, rse_expression, weight, rse_expression
:type replication_rules: Dict
:param comments: Comments for the subscription
:type comments: String
:param lifetime: Subscription's lifetime (seconds); False if subscription has no lifetime
:type lifetime: Integer or False
:param retroactive: Flag to know if the subscription should be applied on previous data
:type retroactive: Boolean
:param dry_run: Just print the subscriptions actions without actually executing them (Useful if retroactive flag is set)
:type dry_run: Boolean
:param priority: The priority of the subscription
:type priority: Integer
:param issuer: The account issuing this operation.
:type issuer: String
:param vo: The VO to act on.
:type vo: String
:param session: The database session in use.
:returns: subscription_id
:rtype: String
"""
if not has_permission(issuer=issuer, vo=vo, action='add_subscription', kwargs={'account': account}, session=session):
raise AccessDenied('Account %s can not add subscription' % (issuer))
try:
if filter_:
if not isinstance(filter_, dict):
raise TypeError('filter should be a dict')
validate_schema(name='subscription_filter', obj=filter_, vo=vo)
if replication_rules:
if not isinstance(replication_rules, list):
raise TypeError('replication_rules should be a list')
else:
for rule in replication_rules:
validate_schema(name='activity', obj=rule.get('activity', 'default'), vo=vo)
else:
raise InvalidObject('You must specify a rule')
except ValueError as error:
raise TypeError(error)
account = InternalAccount(account, vo=vo)
keys = ['scope', 'account']
types = [InternalScope, InternalAccount]
for _key, _type in zip(keys, types):
if _key in filter_:
if isinstance(filter_[_key], list):
filter_[_key] = [_type(val, vo=vo).internal for val in filter_[_key]]
else:
filter_[_key] = _type(filter_[_key], vo=vo).internal
return subscription.add_subscription(name=name, account=account, filter_=dumps(filter_), replication_rules=dumps(replication_rules),
comments=comments, lifetime=lifetime, retroactive=retroactive, dry_run=dry_run, priority=priority,
session=session)
@transactional_session
def update_subscription(name, account, metadata=None, issuer=None, vo='def', *, session: "Session"):
"""
Updates a subscription
:param name: Name of the subscription
:type: String
:param account: Account identifier
:type account: String
:param metadata: Dictionary of metadata to update. Supported keys : filter, replication_rules, comments, lifetime, retroactive, dry_run, priority, last_processed
:type metadata: Dict
:param issuer: The account issuing this operation.
:type issuer: String
:param vo: The VO to act on.
:type vo: String
:param session: The database session in use.
:raises: SubscriptionNotFound if subscription is not found
"""
if not has_permission(issuer=issuer, vo=vo, action='update_subscription', kwargs={'account': account}, session=session):
raise AccessDenied('Account %s can not update subscription' % (issuer))
try:
if not isinstance(metadata, dict):
raise TypeError('metadata should be a dict')
if 'filter' in metadata and metadata['filter']:
if not isinstance(metadata['filter'], dict):
raise TypeError('filter should be a dict')
validate_schema(name='subscription_filter', obj=metadata['filter'], vo=vo)
if 'replication_rules' in metadata and metadata['replication_rules']:
if not isinstance(metadata['replication_rules'], list):
raise TypeError('replication_rules should be a list')
else:
for rule in metadata['replication_rules']:
validate_schema(name='activity', obj=rule.get('activity', 'default'), vo=vo)
except ValueError as error:
raise TypeError(error)
account = InternalAccount(account, vo=vo)
if 'filter' in metadata and metadata['filter'] is not None:
filter_ = metadata['filter']
keys = ['scope', 'account']
types = [InternalScope, InternalAccount]
for _key, _type in zip(keys, types):
if _key in filter_ and filter_[_key] is not None:
if isinstance(filter_[_key], list):
filter_[_key] = [_type(val, vo=vo).internal for val in filter_[_key]]
else:
filter_[_key] = _type(filter_[_key], vo=vo).internal
return subscription.update_subscription(name=name, account=account, metadata=metadata, session=session)
@stream_session
def list_subscriptions(name=None, account=None, state=None, vo='def', *, session: "Session"):
"""
Returns a dictionary with the subscription information :
Examples: ``{'status': 'INACTIVE/ACTIVE/BROKEN', 'last_modified_date': ...}``
:param name: Name of the subscription
:type: String
:param account: Account identifier
:type account: String
:param state: Filter for subscription state
:type state: String
:param vo: The VO to act on.
:type vo: String
:param session: The database session in use.
:returns: Dictionary containing subscription parameter
:rtype: Dict
:raises: exception.NotFound if subscription is not found
"""
if account:
account = InternalAccount(account, vo=vo)
else:
account = InternalAccount('*', vo=vo)
subs = subscription.list_subscriptions(name, account, state, session=session)
for sub in subs:
sub['account'] = sub['account'].external
if 'filter' in sub:
fil = loads(sub['filter'])
if 'account' in fil:
fil['account'] = [InternalAccount(acc, fromExternal=False).external for acc in fil['account']]
if 'scope' in fil:
fil['scope'] = [InternalScope(sco, fromExternal=False).external for sco in fil['scope']]
sub['filter'] = dumps(fil)
yield sub
@stream_session
def list_subscription_rule_states(name=None, account=None, vo='def', *, session: "Session"):
"""Returns a list of with the number of rules per state for a subscription.
:param name: Name of the subscription
:param account: Account identifier
:param vo: The VO to act on.
:param session: The database session in use.
:returns: Sequence with SubscriptionRuleState named tuples (account, name, state, count)
"""
if account is not None:
account = InternalAccount(account, vo=vo)
else:
account = InternalAccount('*', vo=vo)
subs = subscription.list_subscription_rule_states(name, account, session=session)
for sub in subs:
# sub is an immutable Row so return new named tuple with edited entries
d = sub._asdict()
d['account'] = d['account'].external
yield SubscriptionRuleState(**d)
@transactional_session
def delete_subscription(subscription_id, vo='def', *, session: "Session"):
"""
Deletes a subscription
:param subscription_id: Subscription identifier
:param vo: The VO of the user issuing command
:param session: The database session in use.
:type subscription_id: String
"""
raise NotImplementedError
@read_session
def get_subscription_by_id(subscription_id, vo='def', *, session: "Session"):
"""
Get a specific subscription by id.
:param subscription_id: The subscription_id to select.
:param vo: The VO of the user issuing command.
:param session: The database session in use.
:raises: SubscriptionNotFound if no Subscription can be found.
"""
sub = subscription.get_subscription_by_id(subscription_id, session=session)
if sub['account'].vo != vo:
raise AccessDenied('Unable to get subscription')
sub['account'] = sub['account'].external
if 'filter' in sub:
fil = loads(sub['filter'])
if 'account' in fil:
fil['account'] = [InternalAccount(acc, fromExternal=False).external for acc in fil['account']]
if 'scope' in fil:
fil['scope'] = [InternalScope(sco, fromExternal=False).external for sco in fil['scope']]
sub['filter'] = dumps(fil)
return sub