diff --git a/lib/rucio/core/rule.py b/lib/rucio/core/rule.py index 2a9aceb518..fa2976ddd4 100644 --- a/lib/rucio/core/rule.py +++ b/lib/rucio/core/rule.py @@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any, Callable, Optional, Type, TypeVar from dogpile.cache.api import NO_VALUE -from sqlalchemy import select, update, delete, desc +from sqlalchemy import delete, desc, select, update from sqlalchemy.exc import IntegrityError, StatementError from sqlalchemy.exc import NoResultFound # https://pydoc.dev/sqlalchemy/latest/sqlalchemy.exc.NoResultFound.html from sqlalchemy.sql import func @@ -252,10 +252,12 @@ def add_rule(dids, account, copies, rse_expression, grouping, weight, lifetime, if did.did_type == DIDType.FILE and did.constituent: # Check if a single replica of this DID exists; Do not put rule on file if there are only replicas on TAPE stmt = select( - func.count(models.RSEFileAssociation.rse_id) + func.count() + ).select_from( + models.RSEFileAssociation ).join( models.RSE, - models.RSE.id == models.RSEFileAssociation.rse_id + models.RSEFileAssociation.rse_id == models.RSE.id ).where( and_(models.RSEFileAssociation.scope == did.scope, models.RSEFileAssociation.name == did.name, @@ -390,9 +392,9 @@ def add_rule(dids, account, copies, rse_expression, grouping, weight, lifetime, models.DatasetLock ).where( models.DatasetLock.rule_id == new_rule.id - ).values( - {'state': LockState.STUCK} - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) elif new_rule.locks_replicating_cnt == 0: new_rule.state = RuleState.OK @@ -400,10 +402,10 @@ def add_rule(dids, account, copies, rse_expression, grouping, weight, lifetime, stmt = update( models.DatasetLock ).where( - models.DatasetLock.scope == new_rule.scope - ).values( - {'state': LockState.OK} - ) + models.DatasetLock.rule_id == new_rule.id + ).values({ + models.DatasetLock.state: LockState.OK + }) session.execute(stmt) session.flush() if new_rule.notification == RuleNotification.YES: @@ -415,10 +417,10 @@ def add_rule(dids, account, copies, rse_expression, grouping, weight, lifetime, stmt = update( models.DatasetLock ).where( - models.DatasetLock.scope == new_rule.scope - ).values( - {'state': LockState.REPLICATING} - ) + models.DatasetLock.rule_id == new_rule.id + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) session.execute(stmt) # Add rule to History @@ -488,9 +490,11 @@ def add_rules(dids, rules, *, session: "Session", logger=logging.log): if did.did_type == DIDType.FILE and did.constituent: # Check if a single replica of this DID exists stmt = select( func.count(models.RSEFileAssociation.rse_id) + ).select_from( + models.RSEFileAssociation ).join( models.RSE, - models.RSE.id == models.RSEFileAssociation.rse_id + models.RSEFileAssociation.rse_id == models.RSE.id ).where( and_(models.RSEFileAssociation.scope == did.scope, models.RSEFileAssociation.name == did.name, @@ -520,7 +524,7 @@ def add_rules(dids, rules, *, session: "Session", logger=logging.log): and_(models.DataIdentifier.scope == elem['scope'], models.DataIdentifier.name == elem['name']) ) - did = session.execute(stmt).scalars().one() + did = session.execute(stmt).scalar_one() except NoResultFound as exc: raise DataIdentifierNotFound('Data identifier %s:%s is not valid.' % (elem['scope'], elem['name'])) from exc except TypeError as error: @@ -693,9 +697,9 @@ def add_rules(dids, rules, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == new_rule.id - ).values( - {'state': LockState.STUCK} - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) elif new_rule.locks_replicating_cnt == 0: new_rule.state = RuleState.OK @@ -704,9 +708,9 @@ def add_rules(dids, rules, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == new_rule.id - ).values( - {'state': LockState.OK} - ) + ).values({ + models.DatasetLock.state: LockState.OK + }) session.execute(stmt) session.flush() if new_rule.notification == RuleNotification.YES: @@ -719,9 +723,9 @@ def add_rules(dids, rules, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == new_rule.id - ).values( - {'state': LockState.REPLICATING} - ) + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) session.execute(stmt) # Add rule to History @@ -851,9 +855,9 @@ def inject_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.STUCK} - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) elif rule.locks_replicating_cnt == 0: rule.state = RuleState.OK @@ -862,9 +866,9 @@ def inject_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.OK} - ) + ).values({ + models.DatasetLock.state: LockState.OK + }) session.execute(stmt) session.flush() if rule.notification == RuleNotification.YES: @@ -879,9 +883,9 @@ def inject_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.REPLICATING} - ) + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) session.execute(stmt) # Add rule to History @@ -964,7 +968,7 @@ def list_rule_history(rule_id, *, session: "Session"): """ stmt = select( - models.ReplicationRuleHistory.id, + models.ReplicationRuleHistory.updated_at, models.ReplicationRuleHistory.state, models.ReplicationRuleHistory.locks_ok_cnt, models.ReplicationRuleHistory.locks_stuck_cnt, @@ -972,12 +976,12 @@ def list_rule_history(rule_id, *, session: "Session"): ).where( models.ReplicationRuleHistory.id == rule_id ).order_by( - models.ReplicationRuleHistory.created_at + models.ReplicationRuleHistory.updated_at ) try: for rule in session.execute(stmt).yield_per(5): - yield {'updated_at': rule[0], 'state': rule[1], 'locks_ok_cnt': rule[2], 'locks_stuck_cnt': rule[3], 'locks_replicating_cnt': rule[4]} + yield rule._asdict() except StatementError as exc: raise RucioException('Badly formatted input (IDs?)') from exc @@ -994,7 +998,7 @@ def list_rule_full_history(scope, name, *, session: "Session"): """ stmt = select( - models.ReplicationRuleHistory.id, + models.ReplicationRuleHistory.id.label('rule_id'), models.ReplicationRuleHistory.created_at, models.ReplicationRuleHistory.updated_at, models.ReplicationRuleHistory.rse_expression, @@ -1004,7 +1008,7 @@ def list_rule_full_history(scope, name, *, session: "Session"): models.ReplicationRuleHistory.locks_stuck_cnt, models.ReplicationRuleHistory.locks_replicating_cnt ).with_hint( - models.ReplicationRuleHistory, "INDEX(RULES_HISTORY_SCOPENAME_IDX)", 'oracle' + models.ReplicationRuleHistory, 'INDEX(RULES_HISTORY_SCOPENAME_IDX)', 'oracle' ).where( and_(models.ReplicationRuleHistory.scope == scope, models.ReplicationRuleHistory.name == name) @@ -1013,8 +1017,7 @@ def list_rule_full_history(scope, name, *, session: "Session"): models.ReplicationRuleHistory.updated_at ) for rule in session.execute(stmt).yield_per(5): - yield {'rule_id': rule[0], 'created_at': rule[1], 'updated_at': rule[2], 'rse_expression': rule[3], 'state': rule[4], - 'account': rule[5], 'locks_ok_cnt': rule[6], 'locks_stuck_cnt': rule[7], 'locks_replicating_cnt': rule[8]} + yield rule._asdict() @stream_session @@ -1031,15 +1034,16 @@ def list_associated_rules_for_file(scope, name, *, session: "Session"): rucio.core.did.get_did(scope=scope, name=name, session=session) # Check if the did acually exists stmt = select( models.ReplicationRule - ).with_hint( - models.ReplicationRule, "INDEX(RULES RULES_PK)", 'oracle' + ).distinct( ).join( models.ReplicaLock, models.ReplicationRule.id == models.ReplicaLock.rule_id + ).with_hint( + models.ReplicaLock, 'INDEX(LOCKS LOCKS_PK)', 'oracle' ).where( and_(models.ReplicaLock.scope == scope, models.ReplicaLock.name == name) - ).distinct() + ) try: for result in session.execute(stmt).yield_per(5): @@ -1069,10 +1073,10 @@ def delete_rule(rule_id, purge_replicas=None, soft=False, delete_parent=False, n try: stmt = select( models.ReplicationRule - ).with_for_update( - nowait=nowait ).where( models.ReplicationRule.id == rule_id + ).with_for_update( + nowait=nowait ) rule = session.execute(stmt).scalar_one() except NoResultFound as exc: @@ -1098,10 +1102,10 @@ def delete_rule(rule_id, purge_replicas=None, soft=False, delete_parent=False, n stmt = select( models.ReplicaLock - ).with_for_update( - nowait=nowait ).where( models.ReplicaLock.rule_id == rule_id + ).with_for_update( + nowait=nowait ) results = session.execute(stmt).yield_per(100) @@ -1170,10 +1174,10 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): try: stmt = select( models.ReplicationRule - ).with_for_update( - nowait=True ).where( models.ReplicationRule.id == rule_id + ).with_for_update( + nowait=True ) rule = session.execute(stmt).scalar_one() rule.updated_at = datetime.utcnow() @@ -1211,9 +1215,9 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.STUCK} - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) logger(logging.DEBUG, '%s while repairing rule %s', str(error), rule_id) return @@ -1238,9 +1242,9 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.STUCK} - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) logger(logging.DEBUG, '%s while repairing rule %s', type(error).__name__, rule_id) return @@ -1252,7 +1256,7 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): rule.locks_stuck_cnt = 0 stmt = select( models.ReplicaLock.state, - func.count(models.ReplicaLock.state) + func.count(models.ReplicaLock.state).label('state_counter') ).where( models.ReplicaLock.rule_id == rule.id ).group_by( @@ -1260,12 +1264,12 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): ) rule_counts = session.execute(stmt).all() for count in rule_counts: - if count[0] == LockState.OK: - rule.locks_ok_cnt = count[1] - elif count[0] == LockState.REPLICATING: - rule.locks_replicating_cnt = count[1] - elif count[0] == LockState.STUCK: - rule.locks_stuck_cnt = count[1] + if count._mapping['state'] == LockState.OK: + rule.locks_ok_cnt = count._mapping['state_counter'] + elif count._mapping['state'] == LockState.REPLICATING: + rule.locks_replicating_cnt = count._mapping['state_counter'] + elif count._mapping['state'] == LockState.STUCK: + rule.locks_stuck_cnt = count._mapping['state_counter'] logger(logging.DEBUG, "Finished resetting counters for rule %s [%d/%d/%d]", str(rule.id), rule.locks_ok_cnt, rule.locks_replicating_cnt, rule.locks_stuck_cnt) # Get the did @@ -1324,9 +1328,9 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.STUCK} - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) logger(logging.DEBUG, '%s while repairing rule %s', type(error).__name__, rule_id) return @@ -1368,15 +1372,24 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.STUCK} - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) logger(logging.DEBUG, '%s while repairing rule %s', type(error).__name__, rule_id) return # Delete Datasetlocks which are not relevant anymore - validated_datasetlock_rse_ids = [rse_id[0] for rse_id in session.query(models.ReplicaLock.rse_id).filter(models.ReplicaLock.rule_id == rule.id).group_by(models.ReplicaLock.rse_id).all()] + stmt = select( + models.ReplicaLock.rse_id + ).where( + models.ReplicaLock.rule_id == rule.id + ).group_by( + models.ReplicaLock.rse_id + ) + results = session.execute(stmt).all() + validated_datasetlock_rse_ids = [rse_id._mapping['rse_id'] for rse_id in results] + stmt = select( models.DatasetLock ).where( @@ -1398,9 +1411,9 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.STUCK} - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) # TODO: Increase some kind of Stuck Counter here, The rule should at some point be SUSPENDED return @@ -1419,9 +1432,9 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.REPLICATING} - ) + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) session.execute(stmt) return @@ -1436,9 +1449,9 @@ def repair_rule(rule_id, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.OK} - ) + ).values({ + models.DatasetLock.state: LockState.OK + }) session.execute(stmt) session.flush() if rule.notification == RuleNotification.YES: @@ -1592,9 +1605,9 @@ def update_rule(rule_id: str, options: dict[str, Any], *, session: "Session") -> locktype ).where( locktype.rule_id == rule.id - ).values( - account=options['account'] - ) + ).values({ + locktype.account: options['account'] + }) session.execute(query) # Update counters @@ -1662,18 +1675,18 @@ def update_rule(rule_id: str, options: dict[str, Any], *, session: "Session") -> ).where( models.ReplicationRule.id == rid, models.ReplicationRule.state != RuleState.SUSPENDED - ).values( - state=RuleState.STUCK - ) + ).values({ + models.ReplicationRule.state: RuleState.STUCK + }) session.execute(query) query = update( models.DatasetLock ).where( models.DatasetLock.rule_id == rid - ).values( - state=LockState.STUCK - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(query) if options['state'].lower() == 'suspended': @@ -1688,18 +1701,18 @@ def update_rule(rule_id: str, options: dict[str, Any], *, session: "Session") -> ).where( models.ReplicaLock.rule_id == rule.id, models.ReplicaLock.state == LockState.REPLICATING - ).values( - state=LockState.STUCK - ) + ).values({ + models.ReplicaLock.state: LockState.STUCK + }) session.execute(query) query = update( models.DatasetLock ).where( models.DatasetLock.rule_id == rule_id - ).values( - state=LockState.STUCK - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(query) elif key == 'cancel_requests': @@ -1964,7 +1977,7 @@ def re_evaluate_did(scope, name, rule_evaluation_action, *, session: "Session"): func.sum(models.DataIdentifierAssociation.bytes), func.count(1) ).with_hint( - models.DataIdentifierAssociation, "index(CONTENTS CONTENTS_PK)", 'oracle' + models.DataIdentifierAssociation, 'INDEX(CONTENTS CONTENTS_PK)', 'oracle' ).where( and_(models.DataIdentifierAssociation.scope == scope, models.DataIdentifierAssociation.name == name) @@ -2066,18 +2079,18 @@ def get_expired_rules(total_workers, worker_number, limit=100, blocked_rules=[], ).where( and_(models.ReplicationRule.expires_at < datetime.utcnow(), models.ReplicationRule.locked == false(), - models.ReplicationRule.child_rule_id == None) # NOQA: E711 - ).with_hint( - models.ReplicationRule, "index(rules RULES_EXPIRES_AT_IDX)", 'oracle' + models.ReplicationRule.child_rule_id == null()) ).order_by( models.ReplicationRule.expires_at + ).with_hint( + models.ReplicationRule, 'INDEX(RULES RULES_EXPIRES_AT_IDX)', 'oracle' ) stmt = filter_thread_work(session=session, query=stmt, total_threads=total_workers, thread_id=worker_number, hash_variable='name') if limit: stmt = stmt.limit(limit) result = session.execute(stmt).all() - filtered_rules = [rule for rule in result if rule[0] not in blocked_rules] + filtered_rules = [rule for rule in result if rule._mapping['id'] not in blocked_rules] if len(result) == limit and not filtered_rules: return get_expired_rules(total_workers=total_workers, worker_number=worker_number, @@ -2087,7 +2100,7 @@ def get_expired_rules(total_workers, worker_number, limit=100, blocked_rules=[], else: return filtered_rules else: - return [rule for rule in session.execute(stmt).all() if rule[0] not in blocked_rules] + return [rule for rule in session.execute(stmt).all() if rule._mapping['id'] not in blocked_rules] @read_session @@ -2104,11 +2117,11 @@ def get_injected_rules(total_workers, worker_number, limit=100, blocked_rules=[] stmt = select( models.ReplicationRule.id + ).with_hint( + models.ReplicationRule, 'INDEX(RULES RULES_STATE_IDX)', 'oracle' ).where( and_(models.ReplicationRule.state == RuleState.INJECT, models.ReplicationRule.created_at <= datetime.utcnow()) - ).with_hint( - models.ReplicationRule, "index(rules RULES_STATE_IDX)", 'oracle' ).order_by( models.ReplicationRule.created_at ) @@ -2117,7 +2130,7 @@ def get_injected_rules(total_workers, worker_number, limit=100, blocked_rules=[] if limit: stmt = stmt.limit(limit) result = session.execute(stmt).all() - filtered_rules = [rule for rule in result if rule[0] not in blocked_rules] + filtered_rules = [rule for rule in result if rule._mapping['id'] not in blocked_rules] if len(result) == limit and not filtered_rules: return get_injected_rules(total_workers=total_workers, worker_number=worker_number, @@ -2127,7 +2140,7 @@ def get_injected_rules(total_workers, worker_number, limit=100, blocked_rules=[] else: return filtered_rules else: - return [rule for rule in session.execute(stmt).all() if rule[0] not in blocked_rules] + return [rule for rule in session.execute(stmt).all() if rule._mapping['id'] not in blocked_rules] @read_session @@ -2144,14 +2157,14 @@ def get_stuck_rules(total_workers, worker_number, delta=600, limit=10, blocked_r """ stmt = select( models.ReplicationRule.id + ).with_hint( + models.ReplicationRule, 'INDEX(RULES RULES_STATE_IDX)', 'oracle' ).where( and_(models.ReplicationRule.state == RuleState.STUCK, models.ReplicationRule.updated_at < datetime.utcnow() - timedelta(seconds=delta), or_(models.ReplicationRule.expires_at == null(), models.ReplicationRule.expires_at > datetime.utcnow(), models.ReplicationRule.locked == true())) - ).with_hint( - models.ReplicationRule, "index(rules RULES_STATE_IDX)", 'oracle' ).order_by( models.ReplicationRule.updated_at ) @@ -2160,7 +2173,7 @@ def get_stuck_rules(total_workers, worker_number, delta=600, limit=10, blocked_r if limit: stmt = stmt.limit(limit) result = session.execute(stmt).all() - filtered_rules = [rule for rule in result if rule[0] not in blocked_rules] + filtered_rules = [rule for rule in result if rule._mapping['id'] not in blocked_rules] if len(result) == limit and not filtered_rules: return get_stuck_rules(total_workers=total_workers, worker_number=worker_number, @@ -2171,7 +2184,7 @@ def get_stuck_rules(total_workers, worker_number, delta=600, limit=10, blocked_r else: return filtered_rules else: - return [rule for rule in session.execute(stmt).all() if rule[0] not in blocked_rules] + return [rule for rule in session.execute(stmt).all() if rule._mapping['id'] not in blocked_rules] @transactional_session @@ -2209,7 +2222,9 @@ def update_rules_for_lost_replica(scope, name, rse_id, nowait=False, *, session: and_(models.ReplicaLock.scope == scope, models.ReplicaLock.name == name, models.ReplicaLock.rse_id == rse_id) - ).with_for_update(nowait=nowait) + ).with_for_update( + nowait=nowait + ) locks = session.execute(stmt).scalars().all() stmt = select( @@ -2218,7 +2233,9 @@ def update_rules_for_lost_replica(scope, name, rse_id, nowait=False, *, session: and_(models.RSEFileAssociation.scope == scope, models.RSEFileAssociation.name == name, models.RSEFileAssociation.rse_id == rse_id) - ).with_for_update(nowait=nowait) + ).with_for_update( + nowait=nowait + ) replica = session.execute(stmt).scalar_one() stmt = select( @@ -2227,8 +2244,10 @@ def update_rules_for_lost_replica(scope, name, rse_id, nowait=False, *, session: and_(models.Request.scope == scope, models.Request.name == name, models.Request.dest_rse_id == rse_id) - ).with_for_update(nowait=nowait) - requests = session.execute(stmt).all() + ).with_for_update( + nowait=nowait + ) + requests = session.execute(stmt).scalars().all() rse = get_rse_name(rse_id, session=session) @@ -2270,9 +2289,9 @@ def update_rules_for_lost_replica(scope, name, rse_id, nowait=False, *, session: models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - state=LockState.OK - ) + ).values({ + models.DatasetLock.state: LockState.OK + }) session.execute(stmt) session.flush() if rule_state_before != RuleState.OK: @@ -2296,9 +2315,9 @@ def update_rules_for_lost_replica(scope, name, rse_id, nowait=False, *, session: ).where( and_(models.DataIdentifier.scope == scope, models.DataIdentifier.name == name) - ).values( - {'availability': DIDAvailability.LOST} - ) + ).values({ + models.DataIdentifier.availability: DIDAvailability.LOST + }) session.execute(stmt) stmt = update( @@ -2308,10 +2327,10 @@ def update_rules_for_lost_replica(scope, name, rse_id, nowait=False, *, session: models.BadReplicas.name == name, models.BadReplicas.rse_id == rse_id, models.BadReplicas.state == BadFilesStatus.BAD) - ).values( - {'state': BadFilesStatus.LOST, - 'updated_at': datetime.utcnow()} - ) + ).values({ + models.BadReplicas.state: BadFilesStatus.LOST, + models.BadReplicas.updated_at: datetime.utcnow() + }) session.execute(stmt) for dts in datasets: logger(logging.INFO, 'File %s:%s bad at site %s is completely lost from dataset %s:%s. Will be marked as LOST and detached', scope, name, rse, dts['scope'], dts['name']) @@ -2419,9 +2438,9 @@ def update_rules_for_bad_replica(scope, name, rse_id, nowait=False, *, session: models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - {'state': LockState.REPLICATING} - ) + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) session.execute(stmt) # Insert rule history insert_rule_history(rule=rule, recent=True, longterm=False, session=session) @@ -2429,12 +2448,12 @@ def update_rules_for_bad_replica(scope, name, rse_id, nowait=False, *, session: stmt = update( models.RSEFileAssociation ).where( - and_(models.DataIdentifier.scope == scope, - models.RSEFileAssociation.rse_id == rse_id, - models.DataIdentifier.name == name) - ).values( - {'state': ReplicaState.COPYING} - ) + and_(models.RSEFileAssociation.scope == scope, + models.RSEFileAssociation.name == name, + models.RSEFileAssociation.rse_id == rse_id) + ).values({ + models.RSEFileAssociation.state: ReplicaState.COPYING + }) session.execute(stmt) else: logger(logging.INFO, 'File %s:%s at site %s has no locks. Will be deleted now.', scope, name, get_rse_name(rse_id=rse_id, session=session)) @@ -2445,10 +2464,10 @@ def update_rules_for_bad_replica(scope, name, rse_id, nowait=False, *, session: and_(models.RSEFileAssociation.scope == scope, models.RSEFileAssociation.name == name, models.RSEFileAssociation.rse_id == rse_id) - ).values( - {'state': ReplicaState.UNAVAILABLE, - 'tombstone': tombstone} - ) + ).values({ + models.RSEFileAssociation.state: ReplicaState.UNAVAILABLE, + models.RSEFileAssociation.tombstone: tombstone + }) session.execute(stmt) @@ -2898,10 +2917,10 @@ def release_parent_rule(child_rule_id, remove_parent_expiration=False, *, sessio stmt = select( models.ReplicationRule + ).with_hint( + models.ReplicationRule, 'INDEX(RULES RULES_CHILD_RULE_ID_IDX)', 'oracle' ).where( models.ReplicationRule.child_rule_id == child_rule_id - ).with_hint( - models.ReplicationRule, "index(RULES RULES_CHILD_RULE_ID_IDX)", 'oracle' ) parent_rules = session.execute(stmt).scalars().all() for rule in parent_rules: @@ -3163,9 +3182,9 @@ def __evaluate_did_detach(eval_did, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - state=LockState.OK - ) + ).values({ + models.DatasetLock.state: LockState.OK + }) session.execute(stmt) session.flush() if rule_locks_ok_cnt_before != rule.locks_ok_cnt: @@ -3239,12 +3258,12 @@ def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): with METRICS.timer('evaluate_did_attach.list_new_child_dids'): stmt = select( models.DataIdentifierAssociation + ).with_hint( + models.DataIdentifierAssociation, 'INDEX_RS_ASC(CONTENTS CONTENTS_PK)', 'oracle' ).where( and_(models.DataIdentifierAssociation.scope == eval_did.scope, models.DataIdentifierAssociation.name == eval_did.name, - models.DataIdentifierAssociation.rule_evaluation == True) # pylint: disable=C0121:singleton-comparison # noqa: E712 - ).with_hint( - models.DataIdentifierAssociation, "INDEX_RS_ASC(contents contents_pk)", 'oracle' + models.DataIdentifierAssociation.rule_evaluation == true()) # pylint: disable=C0121:singleton-comparison # noqa: E712 ) new_child_dids = session.execute(stmt).scalars().all() if new_child_dids: @@ -3260,9 +3279,9 @@ def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): models.ReplicationRule ).where( and_(or_(*rule_clauses), - models.ReplicationRule.state != RuleState.SUSPENDED, - models.ReplicationRule.state != RuleState.WAITING_APPROVAL, - models.ReplicationRule.state != RuleState.INJECT) + models.ReplicationRule.state.not_in([RuleState.SUSPENDED, + RuleState.WAITING_APPROVAL, + RuleState.INJECT])) ).with_for_update( nowait=True ) @@ -3323,9 +3342,9 @@ def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - state=LockState.STUCK - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) continue @@ -3349,9 +3368,9 @@ def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - state=LockState.STUCK - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) continue @@ -3399,9 +3418,9 @@ def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - state=LockState.STUCK - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) continue @@ -3416,9 +3435,9 @@ def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - state=LockState.STUCK - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(stmt) elif rule.locks_replicating_cnt > 0: rule.state = RuleState.REPLICATING @@ -3427,9 +3446,9 @@ def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - state=LockState.REPLICATING - ) + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) session.execute(stmt) elif rule.locks_replicating_cnt == 0 and rule.locks_stuck_cnt == 0: rule.state = RuleState.OK @@ -3438,9 +3457,9 @@ def __evaluate_did_attach(eval_did, *, session: "Session", logger=logging.log): models.DatasetLock ).where( models.DatasetLock.rule_id == rule.id - ).values( - state=LockState.OK - ) + ).values({ + models.DatasetLock.state: LockState.OK + }) session.execute(stmt) session.flush() if rule_locks_ok_cnt_before < rule.locks_ok_cnt: @@ -3615,11 +3634,11 @@ def __resolve_dids_to_locks_and_replicas(dids, nowait=False, restrict_rses=[], s if locks_rse_clause: stmt = select( models.ReplicaLock + ).with_hint( + models.ReplicaLock, 'INDEX(LOCKS LOCKS_PK)', 'oracle' ).where( and_(or_(*lock_clause_chunk), or_(*locks_rse_clause)) - ).with_hint( - models.ReplicaLock, "index(LOCKS LOCKS_PK)", 'oracle' ).with_for_update( nowait=nowait ) @@ -3627,10 +3646,10 @@ def __resolve_dids_to_locks_and_replicas(dids, nowait=False, restrict_rses=[], s else: stmt = select( models.ReplicaLock + ).with_hint( + models.ReplicaLock, 'INDEX(LOCKS LOCKS_PK)', 'oracle' ).where( or_(*lock_clause_chunk) - ).with_hint( - models.ReplicaLock, "index(LOCKS LOCKS_PK)", 'oracle' ).with_for_update( nowait=nowait ) @@ -3645,12 +3664,12 @@ def __resolve_dids_to_locks_and_replicas(dids, nowait=False, restrict_rses=[], s if replicas_rse_clause: stmt = select( models.RSEFileAssociation + ).with_hint( + models.RSEFileAssociation, 'INDEX(REPLICAS REPLICAS_PK)', 'oracle' ).where( and_(or_(*replica_clause_chunk), or_(*replicas_rse_clause), models.RSEFileAssociation.state != ReplicaState.BEING_DELETED) - ).with_hint( - models.RSEFileAssociation, "index(REPLICAS REPLICAS_PK)", 'oracle' ).with_for_update( nowait=nowait ) @@ -3658,11 +3677,11 @@ def __resolve_dids_to_locks_and_replicas(dids, nowait=False, restrict_rses=[], s else: stmt = select( models.RSEFileAssociation + ).with_hint( + models.RSEFileAssociation, 'INDEX(REPLICAS REPLICAS_PK)', 'oracle' ).where( and_(or_(*replica_clause_chunk), models.RSEFileAssociation.state != ReplicaState.BEING_DELETED) - ).with_hint( - models.RSEFileAssociation, "index(REPLICAS REPLICAS_PK)", 'oracle' ).with_for_update( nowait=nowait ) @@ -3679,12 +3698,12 @@ def __resolve_dids_to_locks_and_replicas(dids, nowait=False, restrict_rses=[], s models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.rse_id + ).with_hint( + models.RSEFileAssociation, 'INDEX(REPLICAS REPLICAS_PK)', 'oracle' ).where( and_(or_(*replica_clause_chunk), or_(*source_replicas_rse_clause), models.RSEFileAssociation.state == ReplicaState.AVAILABLE) - ).with_hint( - models.RSEFileAssociation, "index(REPLICAS REPLICAS_PK)", 'oracle' ) tmp_source_replicas = session.execute(stmt).all() for scope, name, rse_id in tmp_source_replicas: