From 1fcac3fe548bbdf8a031c568979d14b18abe1d7e Mon Sep 17 00:00:00 2001 From: rdimaio Date: Thu, 22 Feb 2024 17:51:15 +0100 Subject: [PATCH 1/4] Rules: remove unnecessary arguments in __find_surplus_locks_and_remove_them --- lib/rucio/core/rule.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/lib/rucio/core/rule.py b/lib/rucio/core/rule.py index c580d15ef9..6aaa506ad6 100644 --- a/lib/rucio/core/rule.py +++ b/lib/rucio/core/rule.py @@ -1134,11 +1134,7 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): if hard_repair: __find_surplus_locks_and_remove_them(datasetfiles=datasetfiles, locks=locks, - replicas=replicas, - source_replicas=source_replicas, - rseselector=rseselector, rule=rule, - source_rses=[rse['id'] for rse in source_rses], session=session) session.flush() @@ -2568,17 +2564,13 @@ def __find_missing_locks_and_create_them(datasetfiles, locks, replicas, source_r @transactional_session -def __find_surplus_locks_and_remove_them(datasetfiles, locks, replicas, source_replicas, rseselector, rule, source_rses, *, session: "Session", logger=logging.log): +def __find_surplus_locks_and_remove_them(datasetfiles, locks, rule, *, session: "Session", logger=logging.log): """ Find surplocks locks for a rule and delete them. :param datasetfiles: Dict holding all datasets and files. :param locks: Dict holding locks. - :param replicas: Dict holding replicas. - :param source_replicas: Dict holding all source replicas. - :param rseselector: The RSESelector to be used. :param rule: The rule. - :param source_rses: RSE ids for eglible source RSEs. :param session: Session of the db. :param logger: Optional decorated logger that can be passed from the calling daemons or servers. :raises: InsufficientAccountLimit, IntegrityError, InsufficientTargetRSEs From d12748d0ee17498edff3b9ee86653ff9206201ae Mon Sep 17 00:00:00 2001 From: rdimaio Date: Thu, 22 Feb 2024 18:14:51 +0100 Subject: [PATCH 2/4] Rules: add type hints; #6454 --- lib/rucio/core/rule.py | 152 ++++++++++++++++++-------------- lib/rucio/core/rule_grouping.py | 21 ++++- 2 files changed, 102 insertions(+), 71 deletions(-) diff --git a/lib/rucio/core/rule.py b/lib/rucio/core/rule.py index 6aaa506ad6..3665e7b8f4 100644 --- a/lib/rucio/core/rule.py +++ b/lib/rucio/core/rule.py @@ -15,14 +15,14 @@ import json import logging -from collections.abc import Iterator +from collections.abc import Iterable, Iterator from configparser import NoOptionError, NoSectionError from copy import deepcopy from datetime import datetime, timedelta from os import path from re import match from string import Template -from typing import TYPE_CHECKING, Any, Callable, Optional, Type, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Optional, Type, TypeVar, Sequence from dogpile.cache.api import NO_VALUE from sqlalchemy import select, update @@ -45,7 +45,7 @@ InvalidSourceReplicaExpression) from rucio.common.policy import policy_filter, get_scratchdisk_lifetime from rucio.common.schema import validate_schema -from rucio.common.types import InternalScope, InternalAccount +from rucio.common.types import InternalScope, InternalAccount, LoggerFunction from rucio.common.utils import str_to_date, sizefmt, chunks from rucio.core import account_counter, rse_counter, request as request_core, transfer as transfer_core from rucio.core.account import get_account @@ -143,10 +143,10 @@ def default(rule: models.ReplicationRule, did: models.DataIdentifier, session: ' @transactional_session -def add_rule(dids, account, copies, rse_expression, grouping, weight, lifetime, locked, subscription_id, - source_replica_expression=None, activity='User Subscriptions', notify=None, purge_replicas=False, - ignore_availability=False, comment=None, ask_approval=False, asynchronous=False, ignore_account_limit=False, - priority=3, delay_injection=None, split_container=False, meta=None, *, session: "Session", logger=logging.log): +def add_rule(dids: Sequence[dict[str, Any]], account: InternalAccount, copies: int, rse_expression: str, weight: str, lifetime: int, locked: bool, subscription_id: str, + grouping: RuleGrouping = RuleGrouping.DATASET, source_replica_expression: Optional[str] = None, activity: str = 'User Subscriptions', notify: RuleNotification = RuleNotification.NO, purge_replicas: bool = False, + ignore_availability: bool = False, comment: Optional[str] = None, ask_approval: bool = False, asynchronous: bool = False, ignore_account_limit: bool = False, + priority: int = 3, delay_injection: Optional[int] = None, split_container: bool = False, meta: Optional[dict[str, Any]] = None, *, session: "Session", logger: LoggerFunction = logging.log) -> list[str]: """ Adds a replication rule for every did in dids @@ -163,7 +163,7 @@ def add_rule(dids, account, copies, rse_expression, grouping, weight, lifetime, :param subscription_id: The subscription_id, if the rule is created by a subscription. :param source_replica_expression: Only use replicas as source from this RSEs. :param activity: Activity to be passed on to the conveyor. - :param notify: Notification setting of the rule ('Y', 'N', 'C'; None = 'N'). + :param notify: Notification setting of the rule. :param purge_replicas: Purge setting if a replica should be directly deleted after the rule is deleted. :param ignore_availability: Option to ignore the availability of RSEs. :param comment: Comment about the rule. @@ -185,8 +185,6 @@ def add_rule(dids, account, copies, rse_expression, grouping, weight, lifetime, rule_ids = [] - grouping = {'ALL': RuleGrouping.ALL, 'NONE': RuleGrouping.NONE}.get(grouping, RuleGrouping.DATASET) - with METRICS.timer('add_rule.total'): # 1. Resolve the rse_expression into a list of RSE-ids with METRICS.timer('add_rule.parse_rse_expression'): @@ -231,8 +229,6 @@ def add_rule(dids, account, copies, rse_expression, grouping, weight, lifetime, expires_at = datetime.utcnow() + timedelta(seconds=lifetime) if lifetime is not None else None - notify = {'Y': RuleNotification.YES, 'C': RuleNotification.CLOSE, 'P': RuleNotification.PROGRESS}.get(notify, RuleNotification.NO) - for elem in dids: # 3. Get the did with METRICS.timer('add_rule.get_did'): @@ -387,13 +383,13 @@ def add_rule(dids, account, copies, rse_expression, grouping, weight, lifetime, @transactional_session -def add_rules(dids, rules, *, session: "Session", logger=logging.log): +def add_rules(dids: Sequence[dict[str, Any]], rules: Sequence[models.ReplicationRule], *, session: "Session", logger: LoggerFunction = logging.log) -> dict[tuple[InternalScope, str], list[str]]: """ Adds a list of replication rules to every did in dids :params dids: List of data identifiers. :param rules: List of dictionaries defining replication rules. - {account, copies, rse_expression, grouping, weight, lifetime, locked, subscription_id, source_replica_expression, activity, notifiy, purge_replicas} + {account, copies, rse_expression, grouping, weight, lifetime, locked, subscription_id, source_replica_expression, activity, notify, purge_replicas} :param session: The database session in use. :param logger: Optional decorated logger that can be passed from the calling daemons or servers. :returns: Dictionary (scope, name) with list of created rule ids @@ -519,11 +515,11 @@ def add_rules(dids, rules, *, session: "Session", logger=logging.log): # 4. Create the replication rule with METRICS.timer('add_rules.create_rule'): - grouping = {'ALL': RuleGrouping.ALL, 'NONE': RuleGrouping.NONE}.get(rule.get('grouping'), RuleGrouping.DATASET) + grouping = {'ALL': RuleGrouping.ALL, 'NONE': RuleGrouping.NONE}.get(str(rule.get('grouping')), RuleGrouping.DATASET) - expires_at = datetime.utcnow() + timedelta(seconds=rule.get('lifetime')) if rule.get('lifetime') is not None else None + expires_at: Optional[datetime] = datetime.utcnow() + timedelta(seconds=rule.get('lifetime')) if rule.get('lifetime') is not None else None - notify = {'Y': RuleNotification.YES, 'C': RuleNotification.CLOSE, 'P': RuleNotification.PROGRESS}.get(rule.get('notify'), RuleNotification.NO) + notify = RuleNotification(str(rule.get('notify'))) if rule.get('meta') is not None: try: @@ -646,7 +642,7 @@ def add_rules(dids, rules, *, session: "Session", logger=logging.log): @transactional_session -def inject_rule(rule_id, *, session: "Session", logger=logging.log): +def inject_rule(rule_id: str, *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ Inject a replication rule. @@ -765,7 +761,7 @@ def inject_rule(rule_id, *, session: "Session", logger=logging.log): @stream_session -def list_rules(filters={}, *, session: "Session"): +def list_rules(filters: dict[str, Any] = {}, *, session: "Session") -> Iterator[dict]: """ List replication rules. @@ -828,7 +824,7 @@ def list_rules(filters={}, *, session: "Session"): @stream_session -def list_rule_history(rule_id, *, session: "Session"): +def list_rule_history(rule_id: str, *, session: "Session") -> Iterator[dict[str, Any]]: """ List the rule history of a rule. @@ -851,7 +847,7 @@ def list_rule_history(rule_id, *, session: "Session"): @stream_session -def list_rule_full_history(scope, name, *, session: "Session"): +def list_rule_full_history(scope: InternalScope, name: str, *, session: "Session") -> Iterator[dict[str, Any]]: """ List the rule history of a DID. @@ -880,7 +876,7 @@ def list_rule_full_history(scope, name, *, session: "Session"): @stream_session -def list_associated_rules_for_file(scope, name, *, session: "Session"): +def list_associated_rules_for_file(scope: InternalScope, name: str, *, session: "Session") -> Iterator[dict[str, Any]]: """ List replication rules a file is affected from. @@ -903,8 +899,8 @@ def list_associated_rules_for_file(scope, name, *, session: "Session"): @transactional_session -def delete_rule(rule_id, purge_replicas=None, soft=False, delete_parent=False, nowait=False, *, session: "Session", - ignore_rule_lock=False): +def delete_rule(rule_id: str, purge_replicas: Optional[bool] = None, soft: bool = False, delete_parent: bool = False, nowait: bool = False, *, session: "Session", + ignore_rule_lock: bool = False) -> None: """ Delete a replication rule. @@ -987,7 +983,7 @@ def delete_rule(rule_id, purge_replicas=None, soft=False, delete_parent=False, n @transactional_session -def repair_rule(rule_id, *, session: "Session", logger=logging.log): +def repair_rule(rule_id: str, *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ Repair a STUCK replication rule. @@ -1215,7 +1211,7 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): @read_session -def get_rule(rule_id, *, session: "Session"): +def get_rule(rule_id: str, *, session: "Session") -> dict[str, Any]: """ Get a specific replication rule. @@ -1536,7 +1532,7 @@ def update_rule(rule_id: str, options: dict[str, Any], *, session: "Session") -> @transactional_session -def reduce_rule(rule_id, copies, exclude_expression=None, *, session: "Session"): +def reduce_rule(rule_id: str, copies: int, exclude_expression: Optional[str] = None, *, session: "Session") -> str: """ Reduce the number of copies for a rule by atomically replacing the rule. @@ -1602,7 +1598,7 @@ def reduce_rule(rule_id, copies, exclude_expression=None, *, session: "Session") @transactional_session -def move_rule(rule_id: str, rse_expression: str, override: Optional[dict[str, Any]] = None, *, session: "Session"): +def move_rule(rule_id: str, rse_expression: str, override: Optional[dict[str, Any]] = None, *, session: "Session") -> str: """ Move a replication rule to another RSE and, once done, delete the original one. @@ -1671,7 +1667,7 @@ def move_rule(rule_id: str, rse_expression: str, override: Optional[dict[str, An @transactional_session -def re_evaluate_did(scope, name, rule_evaluation_action, *, session: "Session"): +def re_evaluate_did(scope: InternalScope, name: str, rule_evaluation_action: DIDReEvaluation, *, session: "Session") -> None: """ Re-Evaluates a did. @@ -1713,7 +1709,7 @@ def re_evaluate_did(scope, name, rule_evaluation_action, *, session: "Session"): @read_session -def get_updated_dids(total_workers, worker_number, limit=100, blocked_dids=[], *, session: "Session"): +def get_updated_dids(total_workers: int, worker_number: int, limit: int = 100, blocked_dids: Iterable[tuple[str, str]] = [], *, session: "Session") -> list[tuple[str, str]]: """ Get updated dids. @@ -1751,7 +1747,7 @@ def get_updated_dids(total_workers, worker_number, limit=100, blocked_dids=[], * @read_session -def get_rules_beyond_eol(date_check, worker_number, total_workers, *, session: "Session"): +def get_rules_beyond_eol(date_check: datetime, worker_number: int, total_workers: int, *, session: "Session") -> list[dict[str, Any]]: """ Get rules which have eol_at before a certain date. @@ -1775,7 +1771,7 @@ def get_rules_beyond_eol(date_check, worker_number, total_workers, *, session: " @read_session -def get_expired_rules(total_workers, worker_number, limit=100, blocked_rules=[], *, session: "Session"): +def get_expired_rules(total_workers: int, worker_number: int, limit: int = 100, blocked_rules: Sequence[str] = [], *, session: "Session") -> list[tuple[str, str]]: """ Get expired rules. @@ -1810,7 +1806,7 @@ def get_expired_rules(total_workers, worker_number, limit=100, blocked_rules=[], @read_session -def get_injected_rules(total_workers, worker_number, limit=100, blocked_rules=[], *, session: "Session"): +def get_injected_rules(total_workers: int, worker_number: int, limit: int = 100, blocked_rules: Sequence[str] = [], *, session: "Session") -> list[tuple[str, str]]: """ Get rules to be injected. @@ -1845,7 +1841,7 @@ def get_injected_rules(total_workers, worker_number, limit=100, blocked_rules=[] @read_session -def get_stuck_rules(total_workers, worker_number, delta=600, limit=10, blocked_rules=[], *, session: "Session"): +def get_stuck_rules(total_workers: int, worker_number: int, delta: int = 600, limit: int = 10, blocked_rules: Sequence[str] = [], *, session: "Session") -> list[tuple[str, str]]: """ Get stuck rules. @@ -1884,7 +1880,7 @@ def get_stuck_rules(total_workers, worker_number, delta=600, limit=10, blocked_r @transactional_session -def delete_updated_did(id_, *, session: "Session"): +def delete_updated_did(id_: str, *, session: "Session") -> None: """ Delete an updated_did by id. @@ -1895,7 +1891,7 @@ def delete_updated_did(id_, *, session: "Session"): @transactional_session -def update_rules_for_lost_replica(scope, name, rse_id, nowait=False, *, session: "Session", logger=logging.log): +def update_rules_for_lost_replica(scope: InternalScope, name: str, rse_id: str, nowait: bool = False, *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ Update rules if a file replica is lost. @@ -1975,7 +1971,7 @@ def update_rules_for_lost_replica(scope, name, rse_id, nowait=False, *, session: @transactional_session -def update_rules_for_bad_replica(scope, name, rse_id, nowait=False, *, session: "Session", logger=logging.log): +def update_rules_for_bad_replica(scope: InternalScope, name: str, rse_id: str, nowait: bool = False, *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ Update rules if a file replica is bad and has to be recreated. @@ -2049,7 +2045,7 @@ def update_rules_for_bad_replica(scope, name, rse_id, nowait=False, *, session: @transactional_session -def generate_rule_notifications(rule, replicating_locks_before=None, *, session: "Session"): +def generate_rule_notifications(rule: models.ReplicationRule, replicating_locks_before: Optional[int] = None, *, session: "Session") -> None: """ Generate (If necessary) a callback for a rule (DATASETLOCK_OK, RULE_OK, DATASETLOCK_PROGRESS) @@ -2157,7 +2153,7 @@ def generate_rule_notifications(rule, replicating_locks_before=None, *, session: @transactional_session -def generate_email_for_rule_ok_notification(rule, *, session: "Session", logger=logging.log): +def generate_email_for_rule_ok_notification(rule: models.ReplicationRule, *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ Generate (If necessary) an eMail for a rule with notification mode Y. @@ -2208,7 +2204,7 @@ def generate_email_for_rule_ok_notification(rule, *, session: "Session", logger= @transactional_session -def insert_rule_history(rule, recent=True, longterm=False, *, session: "Session"): +def insert_rule_history(rule: models.ReplicationRule, recent: bool = True, longterm: bool = False, *, session: "Session") -> None: """ Insert rule history to recent/longterm history. @@ -2238,7 +2234,7 @@ def insert_rule_history(rule, recent=True, longterm=False, *, session: "Session" @transactional_session -def approve_rule(rule_id, approver=None, notify_approvers=True, *, session: "Session"): +def approve_rule(rule_id: str, approver: Optional[str] = None, notify_approvers: bool = True, *, session: "Session") -> None: """ Approve a specific replication rule. @@ -2298,7 +2294,7 @@ def approve_rule(rule_id, approver=None, notify_approvers=True, *, session: "Ses @transactional_session -def deny_rule(rule_id, approver=None, reason=None, *, session: "Session"): +def deny_rule(rule_id: str, approver: Optional[str] = None, reason: Optional[str] = None, *, session: "Session") -> None: """ Deny a specific replication rule. @@ -2357,7 +2353,7 @@ def deny_rule(rule_id, approver=None, reason=None, *, session: "Session"): @transactional_session -def examine_rule(rule_id, *, session: "Session"): +def examine_rule(rule_id: str, *, session: "Session") -> dict[str, Any]: """ Examine a replication rule for transfer errors. @@ -2414,7 +2410,7 @@ def examine_rule(rule_id, *, session: "Session"): @transactional_session -def get_evaluation_backlog(expiration_time=600, *, session: "Session"): +def get_evaluation_backlog(expiration_time: int = 600, *, session: "Session") -> tuple[int, datetime]: """ Counts the number of entries in the rule evaluation backlog. (Number of files to be evaluated) @@ -2422,15 +2418,17 @@ def get_evaluation_backlog(expiration_time=600, *, session: "Session"): :returns: Tuple (Count, Datetime of oldest entry) """ - result = REGION.get('rule_evaluation_backlog', expiration_time=expiration_time) - if result is NO_VALUE: + cached_backlog = REGION.get('rule_evaluation_backlog', expiration_time=expiration_time) + if cached_backlog is NO_VALUE: result = session.query(func.count(models.UpdatedDID.created_at), func.min(models.UpdatedDID.created_at)).one() REGION.set('rule_evaluation_backlog', result) - return result + return result + else: + return cached_backlog # type: ignore @transactional_session -def release_parent_rule(child_rule_id, remove_parent_expiration=False, *, session: "Session"): +def release_parent_rule(child_rule_id: str, remove_parent_expiration: bool = False, *, session: "Session") -> None: """ Release a potential parent rule, because the child_rule is OK. @@ -2517,11 +2515,14 @@ def list_rules_for_rse_decommissioning( @transactional_session -def __find_missing_locks_and_create_them(datasetfiles, locks, replicas, source_replicas, rseselector, rule, source_rses, *, session: "Session", logger=logging.log): +def __find_missing_locks_and_create_them(datasetfiles: Sequence[dict[str, Any]], locks: dict[tuple[str, str], Sequence[models.ReplicaLock]], + replicas: dict[tuple[str, str], Any], source_replicas: dict[tuple[str, str], Any], + rseselector: RSESelector, rule: models.ReplicationRule, source_rses: Sequence[str], *, + session: "Session", logger: LoggerFunction = logging.log) -> None: """ Find missing locks for a rule and create them. - :param datasetfiles: Dict holding all datasets and files. + :param datasetfiles: Sequence of dicts holding all datasets and files. :param locks: Dict holding locks. :param replicas: Dict holding replicas. :param source_replicas: Dict holding source replicas. @@ -2564,7 +2565,7 @@ def __find_missing_locks_and_create_them(datasetfiles, locks, replicas, source_r @transactional_session -def __find_surplus_locks_and_remove_them(datasetfiles, locks, rule, *, session: "Session", logger=logging.log): +def __find_surplus_locks_and_remove_them(datasetfiles: Sequence[dict[str, Any]], locks: dict[tuple[str, str], list[models.ReplicaLock]], rule: models.ReplicationRule, *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ Find surplocks locks for a rule and delete them. @@ -2608,7 +2609,10 @@ def __find_surplus_locks_and_remove_them(datasetfiles, locks, rule, *, session: @transactional_session -def __find_stuck_locks_and_repair_them(datasetfiles, locks, replicas, source_replicas, rseselector, rule, source_rses, *, session: "Session", logger=logging.log): +def __find_stuck_locks_and_repair_them(datasetfiles: Sequence[dict[str, Any]], locks: dict[tuple[str, str], Sequence[models.ReplicaLock]], + replicas: dict[tuple[str, str], Any], source_replicas: dict[tuple[str, str], Any], + rseselector: RSESelector, rule: models.ReplicationRule, source_rses: Sequence[str], *, + session: "Session", logger: LoggerFunction = logging.log) -> None: """ Find stuck locks for a rule and repair them. @@ -2667,7 +2671,7 @@ def __find_stuck_locks_and_repair_them(datasetfiles, locks, replicas, source_rep @transactional_session -def __evaluate_did_detach(eval_did, *, session: "Session", logger=logging.log): +def __evaluate_did_detach(eval_did: models.DataIdentifier, *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ Evaluate a parent did which has children removed. @@ -2756,7 +2760,7 @@ def __evaluate_did_detach(eval_did, *, session: "Session", logger=logging.log): @transactional_session -def __oldest_file_under(scope, name, *, session: "Session"): +def __oldest_file_under(scope: InternalScope, name: str, *, session: "Session") -> Optional[tuple[InternalScope, str]]: """ Finds oldest file in oldest container/dataset in the container or the dataset, recursively. Oldest means attached to its parent first. @@ -2780,7 +2784,7 @@ def __oldest_file_under(scope, name, *, session: "Session"): @transactional_session -def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): +def __evaluate_did_attach(eval_did: models.DataIdentifier, *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ Evaluate a parent did which has new childs @@ -2970,7 +2974,12 @@ def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): @transactional_session -def __resolve_did_to_locks_and_replicas(did, nowait=False, restrict_rses=None, source_rses=None, only_stuck=False, *, session: "Session"): +def __resolve_did_to_locks_and_replicas(did: models.DataIdentifier, nowait: bool = False, restrict_rses: Optional[Sequence[str]] = None, + source_rses: Optional[Sequence[str]] = None, only_stuck: bool = False, *, + session: "Session") -> tuple[list[dict[str, Any]], + dict[tuple[str, str], models.ReplicaLock], + dict[tuple[str, str], Any], + dict[tuple[str, str], str]]: """ Resolves a did to its constituent childs and reads the locks and replicas of all the constituent files. @@ -2980,7 +2989,7 @@ def __resolve_did_to_locks_and_replicas(did, nowait=False, restrict_rses=None, s :param source_rses: Source rses for this rule. These replicas are not row-locked. :param only_stuck: Get results only for STUCK locks, if True. :param session: Session of the db. - :returns: (datasetfiles, locks, replicas) + :returns: (datasetfiles, locks, replicas, source_replicas) """ datasetfiles = [] # List of Datasets and their files in the Tree [{'scope':, 'name':, 'files': []}] @@ -3067,16 +3076,22 @@ def __resolve_did_to_locks_and_replicas(did, nowait=False, restrict_rses=None, s @transactional_session -def __resolve_dids_to_locks_and_replicas(dids, nowait=False, restrict_rses=[], source_rses=None, *, session: "Session"): +def __resolve_dids_to_locks_and_replicas(dids: Sequence[models.DataIdentifierAssociation], + nowait: bool = False, restrict_rses: Sequence[str] = [], + source_rses: Optional[Sequence[str]] = None, *, + session: "Session") -> tuple[list[dict[str, Any]], + dict[tuple[str, str], models.ReplicaLock], + dict[tuple[str, str], Any], + dict[tuple[str, str], str]]: """ Resolves a list of dids to its constituent childs and reads the locks and replicas of all the constituent files. - :param dids: The list of DIDAssociation objects. + :param dids: The list of DataIdentifierAssociation objects. :param nowait: Nowait parameter for the FOR UPDATE statement. :param restrict_rses: Possible rses of the rule, so only these replica/locks should be considered. :param source_rses: Source rses for this rule. These replicas are not row-locked. :param session: Session of the db. - :returns: (datasetfiles, locks, replicas) + :returns: (datasetfiles, locks, replicas, source_replicas) """ datasetfiles = [] # List of Datasets and their files in the Tree [{'scope':, 'name':, 'files': []}] @@ -3179,7 +3194,10 @@ def __resolve_dids_to_locks_and_replicas(dids, nowait=False, restrict_rses=[], s @transactional_session -def __create_locks_replicas_transfers(datasetfiles, locks, replicas, source_replicas, rseselector, rule, preferred_rse_ids=[], source_rses=[], *, session: "Session", logger=logging.log): +def __create_locks_replicas_transfers(datasetfiles: Sequence[dict[str, Any]], locks: dict[tuple[str, str], Sequence[models.ReplicaLock]], + replicas: dict[tuple[str, str], Any], source_replicas: dict[tuple[str, str], Any], + rseselector: RSESelector, rule: models.ReplicationRule, preferred_rse_ids: Sequence[str] = [], + source_rses: Sequence[str] = [], *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ Apply a created replication rule to a set of files @@ -3232,7 +3250,7 @@ def __create_locks_replicas_transfers(datasetfiles, locks, replicas, source_repl @transactional_session -def __delete_lock_and_update_replica(lock, purge_replicas=False, nowait=False, *, session: "Session", logger=logging.log): +def __delete_lock_and_update_replica(lock: models.ReplicaLock, purge_replicas: bool = False, nowait: bool = False, *, session: "Session", logger: LoggerFunction = logging.log) -> bool: """ Delete a lock and update the associated replica. @@ -3271,7 +3289,7 @@ def __delete_lock_and_update_replica(lock, purge_replicas=False, nowait=False, * @transactional_session -def __create_rule_approval_email(rule: "models.ReplicationRule", *, session: "Session"): +def __create_rule_approval_email(rule: models.ReplicationRule, *, session: "Session") -> None: """ Create the rule notification email. @@ -3357,11 +3375,11 @@ def __create_rule_approval_email(rule: "models.ReplicationRule", *, session: "Se @transactional_session -def _create_recipients_list(rse_expression: str, filter_=None, *, session: "Session"): +def _create_recipients_list(rse_expression: str, filter_: Optional[str] = None, *, session: "Session") -> list[tuple[str, InternalAccount]]: """ Create a list of recipients for a notification email based on rse_expression. - :param rse_exoression: The rse_expression. + :param rse_expression: The rse_expression. :param session: The database session in use. """ @@ -3450,7 +3468,7 @@ def __progress_class(replicating_locks, total_locks): @policy_filter @transactional_session -def archive_localgroupdisk_datasets(scope, name, *, session: "Session", logger=logging.log): +def archive_localgroupdisk_datasets(scope: InternalScope, name: str, *, session: "Session", logger: LoggerFunction = logging.log) -> None: """ ATLAS policy to archive a dataset which has a replica on LOCALGROUPDISK @@ -3518,7 +3536,7 @@ def archive_localgroupdisk_datasets(scope, name, *, session: "Session", logger=l @policy_filter @read_session -def get_scratch_policy(account, rses, lifetime, *, session: "Session"): +def get_scratch_policy(account: str, rses: Sequence[dict[str, Any]], lifetime: int, *, session: "Session") -> int: """ ATLAS policy for rules on SCRATCHDISK diff --git a/lib/rucio/core/rule_grouping.py b/lib/rucio/core/rule_grouping.py index e8d9457137..9661c7d262 100644 --- a/lib/rucio/core/rule_grouping.py +++ b/lib/rucio/core/rule_grouping.py @@ -15,7 +15,7 @@ import logging from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Sequence, Any from sqlalchemy import func from sqlalchemy.orm.exc import NoResultFound @@ -26,6 +26,7 @@ from rucio.common.config import config_get_int from rucio.common.exception import InsufficientTargetRSEs from rucio.core import account_counter, rse_counter, request as request_core +from rucio.core.rse_selector import RSESelector from rucio.core.rse import get_rse, get_rse_attribute, get_rse_name from rucio.db.sqla import models from rucio.db.sqla.constants import LockState, RuleGrouping, ReplicaState, RequestType, DIDType, OBSOLETE @@ -36,7 +37,13 @@ @transactional_session -def apply_rule_grouping(datasetfiles, locks, replicas, source_replicas, rseselector, rule, preferred_rse_ids=[], source_rses=[], *, session: "Session"): +def apply_rule_grouping(datasetfiles: Sequence[dict[str, Any]], locks: dict[tuple[str, str], models.ReplicaLock], + replicas: dict[tuple[str, str], Any], source_replicas: dict[tuple[str, str], Any], + rseselector: RSESelector, rule: models.ReplicationRule, preferred_rse_ids: Sequence[str] = [], + source_rses: Sequence[str] = [], *, + session: "Session") -> tuple[dict[str, list[dict[str, models.RSEFileAssociation]]], + dict[str, list[dict[str, models.ReplicaLock]]], + list[dict[str, Any]]]: """ Apply rule grouping to files. @@ -49,7 +56,7 @@ def apply_rule_grouping(datasetfiles, locks, replicas, source_replicas, rseselec :param preferred_rse_ids: Preferred RSE's to select. :param source_rses: RSE ids of eglible source replicas. :param session: Session of the db. - :returns: List of replicas to create, List of locks to create, List of transfers to create + :returns: Dict of replicas to create, Dict of locks to create, List of transfers to create :raises: InsufficientQuota, InsufficientTargetRSEs, RSEOverQuota :attention: This method modifies the contents of the locks and replicas input parameters. """ @@ -96,7 +103,13 @@ def apply_rule_grouping(datasetfiles, locks, replicas, source_replicas, rseselec @transactional_session -def repair_stuck_locks_and_apply_rule_grouping(datasetfiles, locks, replicas, source_replicas, rseselector, rule, source_rses, *, session: "Session"): +def repair_stuck_locks_and_apply_rule_grouping(datasetfiles: Sequence[dict[str, Any]], locks: dict[tuple[str, str], models.ReplicaLock], + replicas: dict[tuple[str, str], Any], source_replicas: dict[tuple[str, str], Any], + rseselector: RSESelector, rule: models.ReplicationRule, source_rses: Sequence[str], *, + session: "Session") -> tuple[dict[str, list[dict[str, models.RSEFileAssociation]]], + dict[str, list[dict[str, models.ReplicaLock]]], + list[dict[str, Any]], + dict[str, list[dict[str, models.ReplicaLock]]]]: """ Apply rule grouping to files. From 0f016e9934528e55dd0621c26980c5a6681ebd90 Mon Sep 17 00:00:00 2001 From: rdimaio Date: Tue, 27 Feb 2024 16:25:52 +0100 Subject: [PATCH 3/4] Rules: update query to SQLAlchemy 2.0 syntax; #6057 --- lib/rucio/core/rule.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/rucio/core/rule.py b/lib/rucio/core/rule.py index 3665e7b8f4..f1363bbc7f 100644 --- a/lib/rucio/core/rule.py +++ b/lib/rucio/core/rule.py @@ -2420,7 +2420,8 @@ def get_evaluation_backlog(expiration_time: int = 600, *, session: "Session") -> cached_backlog = REGION.get('rule_evaluation_backlog', expiration_time=expiration_time) if cached_backlog is NO_VALUE: - result = session.query(func.count(models.UpdatedDID.created_at), func.min(models.UpdatedDID.created_at)).one() + stmt = select(func.count(models.UpdatedDID.created_at), func.min(models.UpdatedDID.created_at)) + result = session.execute(stmt).scalars().one() REGION.set('rule_evaluation_backlog', result) return result else: From b41d55556650baff71384664099c0e10514d4b99 Mon Sep 17 00:00:00 2001 From: rdimaio Date: Tue, 27 Feb 2024 16:38:04 +0100 Subject: [PATCH 4/4] Rules: Add RuleDict typed dict; #6454 --- lib/rucio/common/types.py | 17 ++++++++++++++++- lib/rucio/core/rule.py | 22 +++++++++++++--------- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/lib/rucio/common/types.py b/lib/rucio/common/types.py index c3b0628dd8..818d761428 100644 --- a/lib/rucio/common/types.py +++ b/lib/rucio/common/types.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Callable, Optional, TypedDict, Union +from typing import Any, Callable, Literal, Optional, TypedDict, Union class InternalType(object): @@ -159,3 +159,18 @@ class RSESettingsDict(TypedDict): deterministic: bool domain: list[str] protocols: list[RSEProtocolDict] + + +class RuleDict(TypedDict): + account: InternalAccount + copies: int + rse_expression: str + grouping: Literal['ALL', 'DATASET', 'NONE'] + weight: str + lifetime: int + locked: bool + subscription_id: str + source_replica_expression: Optional[str] + activity: str + notify: Optional[Literal['Y', 'N', 'C']] + purge_replicas: bool diff --git a/lib/rucio/core/rule.py b/lib/rucio/core/rule.py index f1363bbc7f..465bead963 100644 --- a/lib/rucio/core/rule.py +++ b/lib/rucio/core/rule.py @@ -22,7 +22,7 @@ from os import path from re import match from string import Template -from typing import TYPE_CHECKING, Any, Callable, Optional, Type, TypeVar, Sequence +from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Type, TypeVar, Sequence from dogpile.cache.api import NO_VALUE from sqlalchemy import select, update @@ -45,7 +45,7 @@ InvalidSourceReplicaExpression) from rucio.common.policy import policy_filter, get_scratchdisk_lifetime from rucio.common.schema import validate_schema -from rucio.common.types import InternalScope, InternalAccount, LoggerFunction +from rucio.common.types import InternalScope, InternalAccount, LoggerFunction, RuleDict from rucio.common.utils import str_to_date, sizefmt, chunks from rucio.core import account_counter, rse_counter, request as request_core, transfer as transfer_core from rucio.core.account import get_account @@ -143,8 +143,8 @@ def default(rule: models.ReplicationRule, did: models.DataIdentifier, session: ' @transactional_session -def add_rule(dids: Sequence[dict[str, Any]], account: InternalAccount, copies: int, rse_expression: str, weight: str, lifetime: int, locked: bool, subscription_id: str, - grouping: RuleGrouping = RuleGrouping.DATASET, source_replica_expression: Optional[str] = None, activity: str = 'User Subscriptions', notify: RuleNotification = RuleNotification.NO, purge_replicas: bool = False, +def add_rule(dids: Sequence[dict[str, Any]], account: InternalAccount, copies: int, rse_expression: str, grouping: Literal['ALL', 'DATASET', 'NONE'], weight: str, lifetime: int, locked: bool, subscription_id: str, + source_replica_expression: Optional[str] = None, activity: str = 'User Subscriptions', notify: Optional[Literal['Y', 'N', 'C']] = None, purge_replicas: bool = False, ignore_availability: bool = False, comment: Optional[str] = None, ask_approval: bool = False, asynchronous: bool = False, ignore_account_limit: bool = False, priority: int = 3, delay_injection: Optional[int] = None, split_container: bool = False, meta: Optional[dict[str, Any]] = None, *, session: "Session", logger: LoggerFunction = logging.log) -> list[str]: """ @@ -163,7 +163,7 @@ def add_rule(dids: Sequence[dict[str, Any]], account: InternalAccount, copies: i :param subscription_id: The subscription_id, if the rule is created by a subscription. :param source_replica_expression: Only use replicas as source from this RSEs. :param activity: Activity to be passed on to the conveyor. - :param notify: Notification setting of the rule. + :param notify: Notification setting of the rule ('Y', 'N', 'C'; None = 'N'). :param purge_replicas: Purge setting if a replica should be directly deleted after the rule is deleted. :param ignore_availability: Option to ignore the availability of RSEs. :param comment: Comment about the rule. @@ -185,6 +185,8 @@ def add_rule(dids: Sequence[dict[str, Any]], account: InternalAccount, copies: i rule_ids = [] + grouping_value = {'ALL': RuleGrouping.ALL, 'NONE': RuleGrouping.NONE}.get(grouping, RuleGrouping.DATASET) + with METRICS.timer('add_rule.total'): # 1. Resolve the rse_expression into a list of RSE-ids with METRICS.timer('add_rule.parse_rse_expression'): @@ -229,6 +231,8 @@ def add_rule(dids: Sequence[dict[str, Any]], account: InternalAccount, copies: i expires_at = datetime.utcnow() + timedelta(seconds=lifetime) if lifetime is not None else None + notify_value = {'Y': RuleNotification.YES, 'C': RuleNotification.CLOSE, 'P': RuleNotification.PROGRESS}.get(str(notify or ''), RuleNotification.NO) + for elem in dids: # 3. Get the did with METRICS.timer('add_rule.get_did'): @@ -286,13 +290,13 @@ def add_rule(dids: Sequence[dict[str, Any]], account: InternalAccount, copies: i copies=copies, rse_expression=rse_expression, locked=locked, - grouping=grouping, + grouping=grouping_value, expires_at=expires_at, weight=weight, source_replica_expression=source_replica_expression, activity=activity, subscription_id=subscription_id, - notification=notify, + notification=notify_value, purge_replicas=purge_replicas, ignore_availability=ignore_availability, comments=comment, @@ -383,7 +387,7 @@ def add_rule(dids: Sequence[dict[str, Any]], account: InternalAccount, copies: i @transactional_session -def add_rules(dids: Sequence[dict[str, Any]], rules: Sequence[models.ReplicationRule], *, session: "Session", logger: LoggerFunction = logging.log) -> dict[tuple[InternalScope, str], list[str]]: +def add_rules(dids: Sequence[dict[str, Any]], rules: Sequence[RuleDict], *, session: "Session", logger: LoggerFunction = logging.log) -> dict[tuple[InternalScope, str], list[str]]: """ Adds a list of replication rules to every did in dids @@ -519,7 +523,7 @@ def add_rules(dids: Sequence[dict[str, Any]], rules: Sequence[models.Replication expires_at: Optional[datetime] = datetime.utcnow() + timedelta(seconds=rule.get('lifetime')) if rule.get('lifetime') is not None else None - notify = RuleNotification(str(rule.get('notify'))) + notify = {'Y': RuleNotification.YES, 'C': RuleNotification.CLOSE, 'P': RuleNotification.PROGRESS, None: RuleNotification.NO}.get(rule.get('notify')) if rule.get('meta') is not None: try: