forked from getredash/redash
-
Notifications
You must be signed in to change notification settings - Fork 0
/
permissions.py
131 lines (85 loc) · 3.9 KB
/
permissions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import functools
from flask_login import current_user
from flask_restful import abort
from funcy import flatten
view_only = True
not_view_only = False
ACCESS_TYPE_VIEW = "view"
ACCESS_TYPE_MODIFY = "modify"
ACCESS_TYPE_DELETE = "delete"
ACCESS_TYPES = (ACCESS_TYPE_VIEW, ACCESS_TYPE_MODIFY, ACCESS_TYPE_DELETE)
# Add a fourth parameter that default to False. This way we don't interfere with Redash's has_access default behavior.
# We set this parameter true only in one case (redash/handlers/query_results.py#L290).
# More information on README.md file (section Beat Customization)
def has_access(obj, user, need_view_only, beat_ro_access=False):
if hasattr(obj, "api_key") and user.is_api_user():
return has_access_to_object(obj, user.id, need_view_only)
else:
return has_access_to_groups(obj, user, need_view_only, beat_ro_access)
def has_access_to_object(obj, api_key, need_view_only):
if obj.api_key == api_key:
return need_view_only
elif hasattr(obj, "dashboard_api_keys"):
# check if api_key belongs to a dashboard containing this query
return api_key in obj.dashboard_api_keys and need_view_only
else:
return False
# Add a fourth parameter that default to False. This way we don't interfere with Redash's has_access_to_groups default behavior.
# More information on README.md file (section Beat Customization)
def has_access_to_groups(obj, user, need_view_only, beat_ro_access=False):
groups = obj.groups if hasattr(obj, "groups") else obj
if "admin" in user.permissions:
return True
matching_groups = set(groups.keys()).intersection(user.group_ids)
if not matching_groups:
return False
# Add if only default group overlapping don't allow refresh
if beat_ro_access == True and len(matching_groups) == 1 and str(list(matching_groups)[0]) == '2':
return False
required_level = 1 if need_view_only else 2
group_level = 1 if all(flatten([groups[group] for group in matching_groups])) else 2
return required_level <= group_level
def require_access(obj, user, need_view_only):
if not has_access(obj, user, need_view_only):
abort(403)
class require_permissions(object):
def __init__(self, permissions, allow_one=False):
self.permissions = permissions
self.allow_one = allow_one
def __call__(self, fn):
@functools.wraps(fn)
def decorated(*args, **kwargs):
if self.allow_one:
has_permissions = any([current_user.has_permission(permission) for permission in self.permissions])
else:
has_permissions = current_user.has_permissions(self.permissions)
if has_permissions:
return fn(*args, **kwargs)
else:
abort(403)
return decorated
def require_permission(permission):
return require_permissions((permission,))
def require_any_of_permission(permissions):
return require_permissions(permissions, True)
def require_admin(fn):
return require_permission("admin")(fn)
def require_super_admin(fn):
return require_permission("super_admin")(fn)
def has_permission_or_owner(permission, object_owner_id):
return int(object_owner_id) == current_user.id or current_user.has_permission(
permission
)
def is_admin_or_owner(object_owner_id):
return has_permission_or_owner("admin", object_owner_id)
def require_permission_or_owner(permission, object_owner_id):
if not has_permission_or_owner(permission, object_owner_id):
abort(403)
def require_admin_or_owner(object_owner_id):
if not is_admin_or_owner(object_owner_id):
abort(403, message="You don't have permission to edit this resource.")
def can_modify(obj, user):
return is_admin_or_owner(obj.user_id) or user.has_access(obj, ACCESS_TYPE_MODIFY)
def require_object_modify_permission(obj, user):
if not can_modify(obj, user):
abort(403)