From d4781f8447c38b020dfd0ec17fc752161f368faa Mon Sep 17 00:00:00 2001 From: rdimaio Date: Wed, 17 Apr 2024 14:15:26 +0200 Subject: [PATCH] Testing: Add type annotations to core/permission; #6588 --- lib/rucio/core/permission/__init__.py | 14 ++- lib/rucio/core/permission/generic.py | 148 +++++++++++++------------- 2 files changed, 85 insertions(+), 77 deletions(-) diff --git a/lib/rucio/core/permission/__init__.py b/lib/rucio/core/permission/__init__.py index 095188444e..8b55301991 100644 --- a/lib/rucio/core/permission/__init__.py +++ b/lib/rucio/core/permission/__init__.py @@ -15,7 +15,7 @@ import importlib from configparser import NoOptionError, NoSectionError from os import environ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from rucio.common import config, exception from rucio.common.utils import check_policy_package_version @@ -25,6 +25,8 @@ from sqlalchemy.orm import Session + from rucio.common.types import InternalAccount + # dictionary of permission modules for each VO permission_modules = {} @@ -72,7 +74,7 @@ permission_modules["def"] = module -def load_permission_for_vo(vo): +def load_permission_for_vo(vo: str) -> None: GENERIC_FALLBACK = 'generic_multi_vo' if config.config_has_section('policy'): try: @@ -101,7 +103,13 @@ def load_permission_for_vo(vo): permission_modules[vo] = module -def has_permission(issuer, action, kwargs, *, session: "Optional[Session]" = None): +def has_permission( + issuer: "InternalAccount", + action: str, + kwargs: dict[str, Any], + *, + session: "Optional[Session]" = None +) -> bool: if issuer.vo not in permission_modules: load_permission_for_vo(issuer.vo) return permission_modules[issuer.vo].has_permission(issuer, action, kwargs, session=session) diff --git a/lib/rucio/core/permission/generic.py b/lib/rucio/core/permission/generic.py index 69c4b2401c..3869736742 100644 --- a/lib/rucio/core/permission/generic.py +++ b/lib/rucio/core/permission/generic.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import rucio.core.scope from rucio.core.account import has_account_attribute, list_account_attributes @@ -30,7 +30,7 @@ from rucio.common.types import InternalAccount -def has_permission(issuer, action, kwargs, *, session: "Optional[Session]" = None): +def has_permission(issuer: "InternalAccount", action: str, kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account has the specified permission to execute an action with parameters. @@ -125,11 +125,11 @@ def has_permission(issuer, action, kwargs, *, session: "Optional[Session]" = Non return perm.get(action, perm_default)(issuer=issuer, kwargs=kwargs, session=session) -def _is_root(issuer): +def _is_root(issuer) -> bool: return issuer.external == 'root' -def perm_default(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_default(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Default permission. @@ -141,7 +141,7 @@ def perm_default(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_add_rse(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_rse(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add a RSE. @@ -153,7 +153,7 @@ def perm_add_rse(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_update_rse(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_update_rse(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can update a RSE. @@ -165,7 +165,7 @@ def perm_update_rse(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_add_rule(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_rule(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add a replication rule. @@ -181,7 +181,7 @@ def perm_add_rule(issuer, kwargs, *, session: "Optional[Session]" = None): return False -def perm_add_subscription(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_subscription(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add a subscription. @@ -195,7 +195,7 @@ def perm_add_subscription(issuer, kwargs, *, session: "Optional[Session]" = None return False -def perm_add_rse_attribute(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_rse_attribute(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add a RSE attribute. @@ -209,7 +209,7 @@ def perm_add_rse_attribute(issuer, kwargs, *, session: "Optional[Session]" = Non return False -def perm_del_rse_attribute(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_del_rse_attribute(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete a RSE attribute. @@ -223,7 +223,7 @@ def perm_del_rse_attribute(issuer, kwargs, *, session: "Optional[Session]" = Non return False -def perm_del_rse(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_del_rse(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete a RSE. @@ -235,7 +235,7 @@ def perm_del_rse(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_add_account(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_account(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add an account. @@ -247,7 +247,7 @@ def perm_add_account(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) -def perm_del_account(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_del_account(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can del an account. @@ -259,7 +259,7 @@ def perm_del_account(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) -def perm_update_account(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_update_account(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can update an account. @@ -271,7 +271,7 @@ def perm_update_account(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_add_scope(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_scope(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add a scop to a account. @@ -283,7 +283,7 @@ def perm_add_scope(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_get_auth_token_user_pass(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_auth_token_user_pass(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if a user can request a token with user_pass for an account. @@ -297,7 +297,7 @@ def perm_get_auth_token_user_pass(issuer, kwargs, *, session: "Optional[Session] return False -def perm_get_auth_token_gss(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_auth_token_gss(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if a user can request a token with user_pass for an account. @@ -311,7 +311,7 @@ def perm_get_auth_token_gss(issuer, kwargs, *, session: "Optional[Session]" = No return False -def perm_get_auth_token_x509(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_auth_token_x509(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if a user can request a token with user_pass for an account. @@ -325,7 +325,7 @@ def perm_get_auth_token_x509(issuer, kwargs, *, session: "Optional[Session]" = N return False -def perm_get_auth_token_saml(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_auth_token_saml(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if a user can request a token with user_pass for an account. @@ -339,7 +339,7 @@ def perm_get_auth_token_saml(issuer, kwargs, *, session: "Optional[Session]" = N return False -def perm_add_account_identity(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_account_identity(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add an identity to an account. @@ -352,7 +352,7 @@ def perm_add_account_identity(issuer, kwargs, *, session: "Optional[Session]" = return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_del_account_identity(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_del_account_identity(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete an identity to an account. @@ -365,7 +365,7 @@ def perm_del_account_identity(issuer, kwargs, *, session: "Optional[Session]" = return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_del_identity(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_del_identity(issuer: "InternalAccount", kwargs, *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete an identity. @@ -378,7 +378,7 @@ def perm_del_identity(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or issuer.external in kwargs.get('accounts') -def perm_add_did(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_did(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add an data identifier to a scope. @@ -399,7 +399,7 @@ def perm_add_did(issuer, kwargs, *, session: "Optional[Session]" = None): or kwargs['scope'].external == 'mock' -def perm_add_dids(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_dids(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can bulk add data identifiers. @@ -418,7 +418,7 @@ def perm_add_dids(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_attach_dids(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_attach_dids(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can append an data identifier to the other data identifier. @@ -433,7 +433,7 @@ def perm_attach_dids(issuer, kwargs, *, session: "Optional[Session]" = None): or kwargs['scope'].external == 'mock' -def perm_attach_dids_to_dids(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_attach_dids_to_dids(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can append an data identifier to the other data identifier. @@ -454,7 +454,7 @@ def perm_attach_dids_to_dids(issuer, kwargs, *, session: "Optional[Session]" = N return True -def perm_create_did_sample(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_create_did_sample(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can create a sample of a data identifier collection. @@ -469,7 +469,7 @@ def perm_create_did_sample(issuer, kwargs, *, session: "Optional[Session]" = Non or kwargs['scope'].external == 'mock' -def perm_del_rule(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_del_rule(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an issuer can delete a replication rule. @@ -483,7 +483,7 @@ def perm_del_rule(issuer, kwargs, *, session: "Optional[Session]" = None): return False -def perm_update_rule(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_update_rule(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an issuer can update a replication rule. @@ -497,7 +497,7 @@ def perm_update_rule(issuer, kwargs, *, session: "Optional[Session]" = None): return False -def perm_approve_rule(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_approve_rule(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an issuer can approve a replication rule. @@ -511,7 +511,7 @@ def perm_approve_rule(issuer, kwargs, *, session: "Optional[Session]" = None): return False -def perm_reduce_rule(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_reduce_rule(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an issuer can reduce a replication rule. @@ -525,7 +525,7 @@ def perm_reduce_rule(issuer, kwargs, *, session: "Optional[Session]" = None): return False -def perm_move_rule(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_move_rule(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an issuer can move a replication rule. @@ -539,7 +539,7 @@ def perm_move_rule(issuer, kwargs, *, session: "Optional[Session]" = None): return False -def perm_update_subscription(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_update_subscription(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can update a subscription. @@ -554,7 +554,7 @@ def perm_update_subscription(issuer, kwargs, *, session: "Optional[Session]" = N return False -def perm_detach_dids(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_detach_dids(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can detach an data identifier from the other data identifier. @@ -566,7 +566,7 @@ def perm_detach_dids(issuer, kwargs, *, session: "Optional[Session]" = None): return perm_attach_dids(issuer, kwargs, session=session) -def perm_set_metadata_bulk(issuer: "InternalAccount", kwargs: dict, *, session: "Optional[Session]" = None) -> bool: +def perm_set_metadata_bulk(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can set a metadata on a data identifier. @@ -578,7 +578,7 @@ def perm_set_metadata_bulk(issuer: "InternalAccount", kwargs: dict, *, session: return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) or rucio.core.scope.is_scope_owner(scope=kwargs['scope'], account=issuer, session=session) -def perm_set_metadata(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_set_metadata(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can set a metadata on a data identifier. @@ -590,7 +590,7 @@ def perm_set_metadata(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) or rucio.core.scope.is_scope_owner(scope=kwargs['scope'], account=issuer, session=session) -def perm_set_status(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_set_status(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can set status on an data identifier. @@ -606,7 +606,7 @@ def perm_set_status(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) or rucio.core.scope.is_scope_owner(scope=kwargs['scope'], account=issuer, session=session) -def perm_add_protocol(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_protocol(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add a protocol to an RSE. @@ -618,7 +618,7 @@ def perm_add_protocol(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_del_protocol(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_del_protocol(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete protocols from an RSE. @@ -630,7 +630,7 @@ def perm_del_protocol(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_update_protocol(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_update_protocol(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can update protocols of an RSE. @@ -642,7 +642,7 @@ def perm_update_protocol(issuer, kwargs, *, session: "Optional[Session]" = None) return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_add_qos_policy(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_qos_policy(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add QoS policies to an RSE. @@ -654,7 +654,7 @@ def perm_add_qos_policy(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_delete_qos_policy(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_delete_qos_policy(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete QoS policies from an RSE. @@ -666,7 +666,7 @@ def perm_delete_qos_policy(issuer, kwargs, *, session: "Optional[Session]" = Non return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_declare_bad_file_replicas(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_declare_bad_file_replicas(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can declare bad file replicas. @@ -678,7 +678,7 @@ def perm_declare_bad_file_replicas(issuer, kwargs, *, session: "Optional[Session return _is_root(issuer) -def perm_declare_suspicious_file_replicas(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_declare_suspicious_file_replicas(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can declare suspicious file replicas. @@ -690,7 +690,7 @@ def perm_declare_suspicious_file_replicas(issuer, kwargs, *, session: "Optional[ return True -def perm_add_replicas(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_replicas(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add replicas. @@ -707,7 +707,7 @@ def perm_add_replicas(issuer, kwargs, *, session: "Optional[Session]" = None): or has_account_attribute(account=issuer, key='admin', session=session) -def perm_skip_availability_check(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_skip_availability_check(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can skip the availabity check to add/delete file replicas. @@ -719,7 +719,7 @@ def perm_skip_availability_check(issuer, kwargs, *, session: "Optional[Session]" return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_delete_replicas(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_delete_replicas(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete replicas. @@ -731,7 +731,7 @@ def perm_delete_replicas(issuer, kwargs, *, session: "Optional[Session]" = None) return False -def perm_update_replicas_states(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_update_replicas_states(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete replicas. @@ -743,7 +743,7 @@ def perm_update_replicas_states(issuer, kwargs, *, session: "Optional[Session]" return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_queue_requests(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_queue_requests(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can submit transfer or deletion requests on destination RSEs for data identifiers. @@ -755,7 +755,7 @@ def perm_queue_requests(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) -def perm_list_requests(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_list_requests(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can list requests. @@ -767,7 +767,7 @@ def perm_list_requests(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_list_requests_history(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_list_requests_history(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can list historical requests. @@ -779,7 +779,7 @@ def perm_list_requests_history(issuer, kwargs, *, session: "Optional[Session]" = return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_get_request_by_did(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_request_by_did(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can get a request by DID. @@ -791,7 +791,7 @@ def perm_get_request_by_did(issuer, kwargs, *, session: "Optional[Session]" = No return True -def perm_get_request_history_by_did(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_request_history_by_did(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can get a historical request by DID. @@ -803,7 +803,7 @@ def perm_get_request_history_by_did(issuer, kwargs, *, session: "Optional[Sessio return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_cancel_request(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_cancel_request(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can cancel a request. @@ -815,7 +815,7 @@ def perm_cancel_request(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) -def perm_get_next(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_next(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can retrieve the next request matching the request type and state. @@ -827,7 +827,7 @@ def perm_get_next(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) -def perm_set_rse_usage(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_set_rse_usage(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can set RSE usage information. @@ -839,7 +839,7 @@ def perm_set_rse_usage(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) -def perm_set_rse_limits(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_set_rse_limits(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can set RSE limits. @@ -851,7 +851,7 @@ def perm_set_rse_limits(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_set_local_account_limit(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_set_local_account_limit(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can set an account limit. @@ -872,7 +872,7 @@ def perm_set_local_account_limit(issuer, kwargs, *, session: "Optional[Session]" return False -def perm_set_global_account_limit(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_set_global_account_limit(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can set a global account limit. @@ -895,7 +895,7 @@ def perm_set_global_account_limit(issuer, kwargs, *, session: "Optional[Session] return False -def perm_delete_local_account_limit(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_delete_local_account_limit(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete an account limit. @@ -916,7 +916,7 @@ def perm_delete_local_account_limit(issuer, kwargs, *, session: "Optional[Sessio return False -def perm_delete_global_account_limit(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_delete_global_account_limit(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can delete a global account limit. @@ -940,7 +940,7 @@ def perm_delete_global_account_limit(issuer, kwargs, *, session: "Optional[Sessi return False -def perm_config(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_config(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can read/write the configuration. @@ -952,7 +952,7 @@ def perm_config(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_get_local_account_usage(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_local_account_usage(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can get the account usage of an account. @@ -970,7 +970,7 @@ def perm_get_local_account_usage(issuer, kwargs, *, session: "Optional[Session]" return False -def perm_get_global_account_usage(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_global_account_usage(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can get the account usage of an account. @@ -989,7 +989,7 @@ def perm_get_global_account_usage(issuer, kwargs, *, session: "Optional[Session] return False -def perm_add_account_attribute(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_account_attribute(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add attributes to accounts. @@ -1001,7 +1001,7 @@ def perm_add_account_attribute(issuer, kwargs, *, session: "Optional[Session]" = return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_del_account_attribute(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_del_account_attribute(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can add attributes to accounts. @@ -1013,7 +1013,7 @@ def perm_del_account_attribute(issuer, kwargs, *, session: "Optional[Session]" = return perm_add_account_attribute(issuer, kwargs, session=session) -def perm_list_heartbeats(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_list_heartbeats(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can list heartbeats. @@ -1025,7 +1025,7 @@ def perm_list_heartbeats(issuer, kwargs, *, session: "Optional[Session]" = None) return _is_root(issuer) -def perm_resurrect(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_resurrect(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can resurrect DIDS. @@ -1037,7 +1037,7 @@ def perm_resurrect(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) -def perm_update_lifetime_exceptions(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_update_lifetime_exceptions(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can approve/reject Lifetime Model exceptions. @@ -1063,7 +1063,7 @@ def perm_get_auth_token_ssh(issuer: "InternalAccount", kwargs: dict, *, session: return True -def perm_get_signed_url(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_get_signed_url(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can request a signed URL. @@ -1074,7 +1074,7 @@ def perm_get_signed_url(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) -def perm_add_bad_pfns(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_add_bad_pfns(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can declare bad PFNs. @@ -1086,7 +1086,7 @@ def perm_add_bad_pfns(issuer, kwargs, *, session: "Optional[Session]" = None): return _is_root(issuer) -def perm_remove_did_from_followed(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_remove_did_from_followed(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can remove did from followed table. @@ -1101,7 +1101,7 @@ def perm_remove_did_from_followed(issuer, kwargs, *, session: "Optional[Session] or kwargs['scope'].external == 'mock' -def perm_remove_dids_from_followed(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_remove_dids_from_followed(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can bulk remove dids from followed table. @@ -1117,7 +1117,7 @@ def perm_remove_dids_from_followed(issuer, kwargs, *, session: "Optional[Session return True -def perm_export(issuer, kwargs, *, session: "Optional[Session]" = None): +def perm_export(issuer: "InternalAccount", kwargs: dict[str, Any], *, session: "Optional[Session]" = None) -> bool: """ Checks if an account can export the RSE info.