diff --git a/lib/rucio/core/rule.py b/lib/rucio/core/rule.py index 6d9cd8f3cb..d0cdcd60c3 100644 --- a/lib/rucio/core/rule.py +++ b/lib/rucio/core/rule.py @@ -25,7 +25,7 @@ from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Type, TypeVar, Union from dogpile.cache.api import NoValue -from sqlalchemy import select, update +from sqlalchemy import delete, desc, select, update from sqlalchemy.exc import IntegrityError, StatementError from sqlalchemy.exc import NoResultFound # https://pydoc.dev/sqlalchemy/latest/sqlalchemy.exc.NoResultFound.html from sqlalchemy.sql import func @@ -260,8 +260,13 @@ def add_rule( # 3. Get the did with METRICS.timer('add_rule.get_did'): try: - did = session.query(models.DataIdentifier).filter(models.DataIdentifier.scope == elem['scope'], - models.DataIdentifier.name == elem['name']).one() + stmt = select( + models.DataIdentifier + ).where( + and_(models.DataIdentifier.scope == elem['scope'], + models.DataIdentifier.name == elem['name']) + ) + did = session.execute(stmt).scalar_one() except NoResultFound as exc: raise DataIdentifierNotFound('Data identifier %s:%s is not valid.' % (elem['scope'], elem['name'])) from exc except TypeError as error: @@ -270,23 +275,44 @@ def add_rule( # 3.1 If the did is a constituent, relay the rule to the archive if did.did_type == DIDType.FILE and did.constituent: # Check if a single replica of this DID exists; Do not put rule on file if there are only replicas on TAPE - replica_cnt = session.query(models.RSEFileAssociation).join(models.RSE, models.RSEFileAssociation.rse_id == models.RSE.id)\ - .filter(models.RSEFileAssociation.scope == did.scope, - models.RSEFileAssociation.name == did.name, - models.RSEFileAssociation.state == ReplicaState.AVAILABLE, - models.RSE.rse_type != RSEType.TAPE).count() + stmt = select( + func.count() + ).select_from( + models.RSEFileAssociation + ).join( + models.RSE, + models.RSEFileAssociation.rse_id == models.RSE.id + ).where( + and_(models.RSEFileAssociation.scope == did.scope, + models.RSEFileAssociation.name == did.name, + models.RSEFileAssociation.state == ReplicaState.AVAILABLE, + models.RSE.rse_type != RSEType.TAPE) + ) + replica_cnt = session.execute(stmt).scalar() + if replica_cnt == 0: # Put the rule on the archive - archive = session.query(models.ConstituentAssociation).join(models.RSEFileAssociation, - and_(models.ConstituentAssociation.scope == models.RSEFileAssociation.scope, - models.ConstituentAssociation.name == models.RSEFileAssociation.name))\ - .filter(models.ConstituentAssociation.child_scope == did.scope, - models.ConstituentAssociation.child_name == did.name).first() + stmt = select( + models.ConstituentAssociation + ).join( + models.RSEFileAssociation, + and_(models.ConstituentAssociation.scope == models.RSEFileAssociation.scope, + models.ConstituentAssociation.name == models.RSEFileAssociation.name) + ).where( + and_(models.ConstituentAssociation.child_scope == did.scope, + models.ConstituentAssociation.child_name == did.name) + ) + archive = session.execute(stmt).scalars().first() if archive is not None: elem['name'] = archive.name elem['scope'] = archive.scope try: - did = session.query(models.DataIdentifier).filter(models.DataIdentifier.scope == elem['scope'], - models.DataIdentifier.name == elem['name']).one() + stmt = select( + models.DataIdentifier + ).where( + and_(models.DataIdentifier.scope == elem['scope'], + models.DataIdentifier.name == elem['name']) + ) + did = session.execute(stmt).scalar_one() except NoResultFound as exc: raise DataIdentifierNotFound('Data identifier %s:%s is not valid.' % (elem['scope'], elem['name'])) from exc except TypeError as error: @@ -386,11 +412,25 @@ def add_rule( new_rule.state = RuleState.STUCK new_rule.error = 'MissingSourceReplica' if new_rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=new_rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == new_rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) elif new_rule.locks_replicating_cnt == 0: new_rule.state = RuleState.OK if new_rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=new_rule.id).update({'state': LockState.OK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == new_rule.id + ).values({ + models.DatasetLock.state: LockState.OK + }) + session.execute(stmt) session.flush() if new_rule.notification == RuleNotification.YES: generate_email_for_rule_ok_notification(rule=new_rule, session=session) @@ -398,7 +438,14 @@ def add_rule( else: new_rule.state = RuleState.REPLICATING if new_rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=new_rule.id).update({'state': LockState.REPLICATING}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == new_rule.id + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) + session.execute(stmt) # Add rule to History insert_rule_history(rule=new_rule, recent=True, longterm=True, session=session) @@ -457,9 +504,13 @@ def add_rules( # 2. Get the did with METRICS.timer('add_rules.get_did'): try: - did = session.query(models.DataIdentifier).filter( - models.DataIdentifier.scope == elem['scope'], - models.DataIdentifier.name == elem['name']).one() + stmt = select( + models.DataIdentifier + ).where( + and_(models.DataIdentifier.scope == elem['scope'], + models.DataIdentifier.name == elem['name']) + ) + did = session.execute(stmt).scalar_one() except NoResultFound as exc: raise DataIdentifierNotFound('Data identifier %s:%s is not valid.' % (elem['scope'], elem['name'])) from exc except TypeError as error: @@ -467,23 +518,43 @@ def add_rules( # 2.1 If the did is a constituent, relay the rule to the archive if did.did_type == DIDType.FILE and did.constituent: # Check if a single replica of this DID exists - replica_cnt = session.query(models.RSEFileAssociation).join(models.RSE, models.RSEFileAssociation.rse_id == models.RSE.id)\ - .filter(models.RSEFileAssociation.scope == did.scope, - models.RSEFileAssociation.name == did.name, - models.RSEFileAssociation.state == ReplicaState.AVAILABLE, - models.RSE.rse_type != RSEType.TAPE).count() + stmt = select( + func.count() + ).select_from( + models.RSEFileAssociation + ).join( + models.RSE, + models.RSEFileAssociation.rse_id == models.RSE.id + ).where( + and_(models.RSEFileAssociation.scope == did.scope, + models.RSEFileAssociation.name == did.name, + models.RSEFileAssociation.state == ReplicaState.AVAILABLE, + models.RSE.rse_type != RSEType.TAPE) + ) + replica_cnt = session.execute(stmt).scalar() if replica_cnt == 0: # Put the rule on the archive - archive = session.query(models.ConstituentAssociation).join(models.RSEFileAssociation, - and_(models.ConstituentAssociation.scope == models.RSEFileAssociation.scope, - models.ConstituentAssociation.name == models.RSEFileAssociation.name))\ - .filter(models.ConstituentAssociation.child_scope == did.scope, - models.ConstituentAssociation.child_name == did.name).first() + stmt = select( + models.ConstituentAssociation + ).join( + models.RSEFileAssociation, + and_(models.ConstituentAssociation.scope == models.RSEFileAssociation.scope, + models.ConstituentAssociation.name == models.RSEFileAssociation.name) + ).where( + and_(models.ConstituentAssociation.child_scope == did.scope, + models.ConstituentAssociation.child_name == did.name) + ) + archive = session.execute(stmt).scalars().first() if archive is not None: elem['name'] = archive.name elem['scope'] = archive.scope try: - did = session.query(models.DataIdentifier).filter(models.DataIdentifier.scope == elem['scope'], - models.DataIdentifier.name == elem['name']).one() + stmt = select( + models.DataIdentifier + ).where( + and_(models.DataIdentifier.scope == elem['scope'], + models.DataIdentifier.name == elem['name']) + ) + did = session.execute(stmt).scalar_one() except NoResultFound as exc: raise DataIdentifierNotFound('Data identifier %s:%s is not valid.' % (elem['scope'], elem['name'])) from exc except TypeError as error: @@ -653,11 +724,25 @@ def add_rules( new_rule.state = RuleState.STUCK new_rule.error = 'MissingSourceReplica' if new_rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=new_rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == new_rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) elif new_rule.locks_replicating_cnt == 0: new_rule.state = RuleState.OK if new_rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=new_rule.id).update({'state': LockState.OK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == new_rule.id + ).values({ + models.DatasetLock.state: LockState.OK + }) + session.execute(stmt) session.flush() if new_rule.notification == RuleNotification.YES: generate_email_for_rule_ok_notification(rule=new_rule, session=session) @@ -665,7 +750,14 @@ def add_rules( else: new_rule.state = RuleState.REPLICATING if new_rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=new_rule.id).update({'state': LockState.REPLICATING}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == new_rule.id + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) + session.execute(stmt) # Add rule to History insert_rule_history(rule=new_rule, recent=True, longterm=True, session=session) @@ -693,7 +785,14 @@ def inject_rule( """ try: - rule = session.query(models.ReplicationRule).filter(models.ReplicationRule.id == rule_id).with_for_update(nowait=True).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == rule_id + ).with_for_update( + nowait=True + ) + rule = session.execute(stmt).scalar_one() except NoResultFound as exc: raise RuleNotFound('No rule with the id %s found' % (rule_id)) from exc @@ -710,7 +809,13 @@ def inject_rule( # Remove duplicates from the list of dictionaries dids = [dict(t) for t in {tuple(d.items()) for d in dids}] # Remove dids which already have a similar rule - dids = [did for did in dids if session.query(models.ReplicationRule).filter_by(scope=did['scope'], name=did['name'], account=rule.account, rse_expression=rule.rse_expression).count() == 0] + stmt = select( + models.ReplicationRule.id + ).where( + and_(models.ReplicationRule.account == rule.account, + models.ReplicationRule.rse_expression == rule.rse_expression) + ) + dids = [did for did in dids if session.execute(stmt.where(and_(models.ReplicationRule.scope == did['scope'], models.ReplicationRule.name == did['name']))).scalar_one_or_none() is None] if rule.expires_at: lifetime = (rule.expires_at - datetime.utcnow()).days * 24 * 3600 + (rule.expires_at - datetime.utcnow()).seconds else: @@ -759,8 +864,13 @@ def inject_rule( # 3. Get the did with METRICS.timer('inject_rule.get_did'): try: - did = session.query(models.DataIdentifier).filter(models.DataIdentifier.scope == rule.scope, - models.DataIdentifier.name == rule.name).one() + stmt = select( + models.DataIdentifier + ).where( + and_(models.DataIdentifier.scope == rule.scope, + models.DataIdentifier.name == rule.name) + ) + did = session.execute(stmt).scalar_one() except NoResultFound as exc: raise DataIdentifierNotFound('Data identifier %s:%s is not valid.' % (rule.scope, rule.name)) from exc except TypeError as error: @@ -777,11 +887,25 @@ def inject_rule( rule.state = RuleState.STUCK rule.error = 'MissingSourceReplica' if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) elif rule.locks_replicating_cnt == 0: rule.state = RuleState.OK if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.OK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.OK + }) + session.execute(stmt) session.flush() if rule.notification == RuleNotification.YES: generate_email_for_rule_ok_notification(rule=rule, session=session) @@ -791,7 +915,14 @@ def inject_rule( else: rule.state = RuleState.REPLICATING if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.REPLICATING}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) + session.execute(stmt) # Add rule to History insert_rule_history(rule=rule, recent=True, longterm=True, session=session) @@ -880,15 +1011,21 @@ def list_rule_history( :raises: RucioException """ - query = session.query(models.ReplicationRuleHistoryRecent.updated_at, - models.ReplicationRuleHistoryRecent.state, - models.ReplicationRuleHistoryRecent.locks_ok_cnt, - models.ReplicationRuleHistoryRecent.locks_stuck_cnt, - models.ReplicationRuleHistoryRecent.locks_replicating_cnt).filter_by(id=rule_id).order_by(models.ReplicationRuleHistoryRecent.updated_at) + stmt = select( + models.ReplicationRuleHistory.updated_at, + models.ReplicationRuleHistory.state, + models.ReplicationRuleHistory.locks_ok_cnt, + models.ReplicationRuleHistory.locks_stuck_cnt, + models.ReplicationRuleHistory.locks_replicating_cnt + ).where( + models.ReplicationRuleHistory.id == rule_id + ).order_by( + models.ReplicationRuleHistory.updated_at + ) try: - for rule in query.yield_per(5): - yield {'updated_at': rule[0], 'state': rule[1], 'locks_ok_cnt': rule[2], 'locks_stuck_cnt': rule[3], 'locks_replicating_cnt': rule[4]} + for rule in session.execute(stmt).yield_per(5): + yield rule._asdict() except StatementError as exc: raise RucioException('Badly formatted input (IDs?)') from exc @@ -909,22 +1046,27 @@ def list_rule_full_history( :raises: RucioException """ - query = session.query(models.ReplicationRuleHistory.id, - models.ReplicationRuleHistory.created_at, - models.ReplicationRuleHistory.updated_at, - models.ReplicationRuleHistory.rse_expression, - models.ReplicationRuleHistory.state, - models.ReplicationRuleHistory.account, - models.ReplicationRuleHistory.locks_ok_cnt, - models.ReplicationRuleHistory.locks_stuck_cnt, - models.ReplicationRuleHistory.locks_replicating_cnt).\ - with_hint(models.ReplicationRuleHistory, "INDEX(RULES_HISTORY_SCOPENAME_IDX)", 'oracle').\ - filter(models.ReplicationRuleHistory.scope == scope, models.ReplicationRuleHistory.name == name).\ - order_by(models.ReplicationRuleHistory.created_at, models.ReplicationRuleHistory.updated_at) - - for rule in query.yield_per(5): - yield {'rule_id': rule[0], 'created_at': rule[1], 'updated_at': rule[2], 'rse_expression': rule[3], 'state': rule[4], - 'account': rule[5], 'locks_ok_cnt': rule[6], 'locks_stuck_cnt': rule[7], 'locks_replicating_cnt': rule[8]} + stmt = select( + models.ReplicationRuleHistory.id.label('rule_id'), + models.ReplicationRuleHistory.created_at, + models.ReplicationRuleHistory.updated_at, + models.ReplicationRuleHistory.rse_expression, + models.ReplicationRuleHistory.state, + models.ReplicationRuleHistory.account, + models.ReplicationRuleHistory.locks_ok_cnt, + models.ReplicationRuleHistory.locks_stuck_cnt, + models.ReplicationRuleHistory.locks_replicating_cnt + ).with_hint( + models.ReplicationRuleHistory, 'INDEX(RULES_HISTORY_SCOPENAME_IDX)', 'oracle' + ).where( + and_(models.ReplicationRuleHistory.scope == scope, + models.ReplicationRuleHistory.name == name) + ).order_by( + models.ReplicationRuleHistory.created_at, + models.ReplicationRuleHistory.updated_at + ) + for rule in session.execute(stmt).yield_per(5): + yield rule._asdict() @stream_session @@ -944,13 +1086,22 @@ def list_associated_rules_for_file( """ rucio.core.did.get_did(scope=scope, name=name, session=session) # Check if the did acually exists - query = session.query(models.ReplicationRule).\ - with_hint(models.ReplicaLock, "INDEX(LOCKS LOCKS_PK)", 'oracle').\ - join(models.ReplicaLock, models.ReplicationRule.id == models.ReplicaLock.rule_id).\ - filter(models.ReplicaLock.scope == scope, models.ReplicaLock.name == name).distinct() + stmt = select( + models.ReplicationRule + ).distinct( + ).join( + models.ReplicaLock, + models.ReplicationRule.id == models.ReplicaLock.rule_id + ).with_hint( + models.ReplicaLock, 'INDEX(LOCKS LOCKS_PK)', 'oracle' + ).where( + and_(models.ReplicaLock.scope == scope, + models.ReplicaLock.name == name) + ) + try: - for rule in query.yield_per(5): - yield rule.to_dict() + for result in session.execute(stmt).yield_per(5): + yield result[0].to_dict() except StatementError as exc: raise RucioException('Badly formatted input (IDs?)') from exc @@ -982,9 +1133,14 @@ def delete_rule( with METRICS.timer('delete_rule.total'): try: - rule = session.query(models.ReplicationRule)\ - .filter(models.ReplicationRule.id == rule_id)\ - .with_for_update(nowait=nowait).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == rule_id + ).with_for_update( + nowait=nowait + ) + rule = session.execute(stmt).scalar_one() except NoResultFound as exc: raise RuleNotFound('No rule with the id %s found' % rule_id) from exc if rule.locked and not ignore_rule_lock: @@ -1006,15 +1162,21 @@ def delete_rule( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) return - locks = session.query(models.ReplicaLock)\ - .filter(models.ReplicaLock.rule_id == rule_id)\ - .with_for_update(nowait=nowait).yield_per(100) + stmt = select( + models.ReplicaLock + ).where( + models.ReplicaLock.rule_id == rule_id + ).with_for_update( + nowait=nowait + ) + results = session.execute(stmt).yield_per(100) # Remove locks, set tombstone if applicable transfers_to_delete = [] # [{'scope': , 'name':, 'rse_id':}] account_counter_decreases = {} # {'rse_id': [file_size, file_size, file_size]} - for lock in locks: + for result in results: + lock = result[0] if __delete_lock_and_update_replica(lock=lock, purge_replicas=rule.purge_replicas, nowait=nowait, session=session): transfers_to_delete.append({'scope': lock.scope, 'name': lock.name, 'rse_id': lock.rse_id}) @@ -1023,9 +1185,14 @@ def delete_rule( account_counter_decreases[lock.rse_id].append(lock.bytes) # Delete the DatasetLocks - session.query(models.DatasetLock)\ - .filter(models.DatasetLock.rule_id == rule_id)\ - .delete(synchronize_session=False) + stmt = delete( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule_id + ).execution_options( + synchronize_session=False + ) + session.execute(stmt) # Decrease account_counters for rse_id in account_counter_decreases.keys(): @@ -1072,7 +1239,14 @@ def repair_rule( # start_time = time.time() try: - rule = session.query(models.ReplicationRule).filter(models.ReplicationRule.id == rule_id).with_for_update(nowait=True).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == rule_id + ).with_for_update( + nowait=True + ) + rule = session.execute(stmt).scalar_one() rule.updated_at = datetime.utcnow() # Check if rule is longer than 2 weeks in STUCK @@ -1104,7 +1278,14 @@ def repair_rule( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) # Try to update the DatasetLocks if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) logger(logging.DEBUG, '%s while repairing rule %s', str(error), rule_id) return @@ -1124,7 +1305,14 @@ def repair_rule( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) # Try to update the DatasetLocks if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) logger(logging.DEBUG, '%s while repairing rule %s', type(error).__name__, rule_id) return @@ -1133,19 +1321,32 @@ def repair_rule( rule.locks_ok_cnt = 0 rule.locks_replicating_cnt = 0 rule.locks_stuck_cnt = 0 - rule_counts = session.query(models.ReplicaLock.state, func.count(models.ReplicaLock.state)).filter(models.ReplicaLock.rule_id == rule.id).group_by(models.ReplicaLock.state).all() + stmt = select( + models.ReplicaLock.state, + func.count(models.ReplicaLock.state).label('state_counter') + ).where( + models.ReplicaLock.rule_id == rule.id + ).group_by( + models.ReplicaLock.state + ) + rule_counts = session.execute(stmt).all() for count in rule_counts: - if count[0] == LockState.OK: - rule.locks_ok_cnt = count[1] - elif count[0] == LockState.REPLICATING: - rule.locks_replicating_cnt = count[1] - elif count[0] == LockState.STUCK: - rule.locks_stuck_cnt = count[1] + if count.state == LockState.OK: + rule.locks_ok_cnt = count.state_counter + elif count.state == LockState.REPLICATING: + rule.locks_replicating_cnt = count.state_counter + elif count.state == LockState.STUCK: + rule.locks_stuck_cnt = count.state_counter logger(logging.DEBUG, "Finished resetting counters for rule %s [%d/%d/%d]", str(rule.id), rule.locks_ok_cnt, rule.locks_replicating_cnt, rule.locks_stuck_cnt) # Get the did - did = session.query(models.DataIdentifier).filter(models.DataIdentifier.scope == rule.scope, - models.DataIdentifier.name == rule.name).one() + stmt = select( + models.DataIdentifier + ).where( + and_(models.DataIdentifier.scope == rule.scope, + models.DataIdentifier.name == rule.name) + ) + did = session.execute(stmt).scalar_one() # Detect if there is something wrong with the dataset and # make the decisison on soft or hard repair. @@ -1190,7 +1391,14 @@ def repair_rule( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) # Try to update the DatasetLocks if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) logger(logging.DEBUG, '%s while repairing rule %s', type(error).__name__, rule_id) return @@ -1223,13 +1431,32 @@ def repair_rule( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) # Try to update the DatasetLocks if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) logger(logging.DEBUG, '%s while repairing rule %s', type(error).__name__, rule_id) return # Delete Datasetlocks which are not relevant anymore - validated_datasetlock_rse_ids = [rse_id[0] for rse_id in session.query(models.ReplicaLock.rse_id).filter(models.ReplicaLock.rule_id == rule.id).group_by(models.ReplicaLock.rse_id).all()] - dataset_locks = session.query(models.DatasetLock).filter_by(rule_id=rule.id).all() + stmt = select( + models.ReplicaLock.rse_id + ).distinct( + ).where( + models.ReplicaLock.rule_id == rule.id + ) + validated_datasetlock_rse_ids = session.execute(stmt).scalars().all() + + stmt = select( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ) + dataset_locks = session.execute(stmt).scalars().all() for dataset_lock in dataset_locks: if dataset_lock.rse_id not in validated_datasetlock_rse_ids: dataset_lock.delete(session=session) @@ -1241,7 +1468,14 @@ def repair_rule( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) # Try to update the DatasetLocks if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) # TODO: Increase some kind of Stuck Counter here, The rule should at some point be SUSPENDED return @@ -1255,7 +1489,14 @@ def repair_rule( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) # Try to update the DatasetLocks if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.REPLICATING}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) + session.execute(stmt) return rule.state = RuleState.OK @@ -1265,7 +1506,14 @@ def repair_rule( logger(logging.INFO, 'Rule %s [%d/%d/%d] state=OK', str(rule.id), rule.locks_ok_cnt, rule.locks_replicating_cnt, rule.locks_stuck_cnt) if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.OK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.OK + }) + session.execute(stmt) session.flush() if rule.notification == RuleNotification.YES: generate_email_for_rule_ok_notification(rule=rule, session=session) @@ -1295,7 +1543,12 @@ def get_rule( """ try: - rule = session.query(models.ReplicationRule).filter_by(id=rule_id).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == rule_id + ) + rule = session.execute(stmt).scalar_one() return rule.to_dict() except NoResultFound as exc: raise RuleNotFound('No rule with the id %s found' % (rule_id)) from exc @@ -1422,9 +1675,9 @@ def update_rule( locktype ).where( locktype.rule_id == rule.id - ).values( - account=options['account'] - ) + ).values({ + locktype.account: options['account'] + }) session.execute(query) # Update counters @@ -1492,18 +1745,18 @@ def update_rule( ).where( models.ReplicationRule.id == rid, models.ReplicationRule.state != RuleState.SUSPENDED - ).values( - state=RuleState.STUCK - ) + ).values({ + models.ReplicationRule.state: RuleState.STUCK + }) session.execute(query) query = update( models.DatasetLock ).where( models.DatasetLock.rule_id == rid - ).values( - state=LockState.STUCK - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(query) if options['state'].lower() == 'suspended': @@ -1518,18 +1771,18 @@ def update_rule( ).where( models.ReplicaLock.rule_id == rule.id, models.ReplicaLock.state == LockState.REPLICATING - ).values( - state=LockState.STUCK - ) + ).values({ + models.ReplicaLock.state: LockState.STUCK + }) session.execute(query) query = update( models.DatasetLock ).where( models.DatasetLock.rule_id == rule_id - ).values( - state=LockState.STUCK - ) + ).values({ + models.DatasetLock.state: LockState.STUCK + }) session.execute(query) elif key == 'cancel_requests': @@ -1628,7 +1881,12 @@ def reduce_rule( :raises: RuleReplaceFailed, RuleNotFound """ try: - rule = session.query(models.ReplicationRule).filter_by(id=rule_id).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == rule_id + ) + rule = session.execute(stmt).scalar_one() if copies >= rule.copies: raise RuleReplaceFailed('Copies of the new rule must be smaller than the old rule.') @@ -1668,7 +1926,12 @@ def reduce_rule( session.flush() - new_rule = session.query(models.ReplicationRule).filter_by(id=new_rule_id[0]).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == new_rule_id[0] + ) + new_rule = session.execute(stmt).scalar_one() if new_rule.state != RuleState.OK: raise RuleReplaceFailed('The replacement of the rule failed.') @@ -1702,7 +1965,12 @@ def move_rule( override = override or {} try: - rule = session.query(models.ReplicationRule).filter_by(id=rule_id).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == rule_id + ) + rule = session.execute(stmt).scalar_one() if rule.child_rule_id: raise RuleReplaceFailed('The rule must not have a child rule.') @@ -1776,8 +2044,13 @@ def re_evaluate_did( """ try: - did = session.query(models.DataIdentifier).filter(models.DataIdentifier.scope == scope, - models.DataIdentifier.name == name).one() + stmt = select( + models.DataIdentifier + ).where( + and_(models.DataIdentifier.scope == scope, + models.DataIdentifier.name == name) + ) + did = session.execute(stmt).scalar_one() except NoResultFound as exc: raise DataIdentifierNotFound() from exc @@ -1788,13 +2061,16 @@ def re_evaluate_did( # Update size and length of did if session.bind.dialect.name == 'oracle': - stmt = session.query(func.sum(models.DataIdentifierAssociation.bytes), - func.count(1)).\ - with_hint(models.DataIdentifierAssociation, - "index(CONTENTS CONTENTS_PK)", 'oracle').\ - filter(models.DataIdentifierAssociation.scope == scope, - models.DataIdentifierAssociation.name == name) - for bytes_, length in stmt: + stmt = select( + func.sum(models.DataIdentifierAssociation.bytes), + func.count(1) + ).with_hint( + models.DataIdentifierAssociation, 'INDEX(CONTENTS CONTENTS_PK)', 'oracle' + ).where( + and_(models.DataIdentifierAssociation.scope == scope, + models.DataIdentifierAssociation.name == name) + ) + for bytes_, length in session.execute(stmt): did.bytes = bytes_ did.length = length @@ -1823,20 +2099,21 @@ def get_updated_dids( :param blocked_dids: Blocked dids to filter. :param session: Database session in use. """ - query = session.query(models.UpdatedDID.id, - models.UpdatedDID.scope, - models.UpdatedDID.name, - models.UpdatedDID.rule_evaluation_action) - - query = filter_thread_work(session=session, query=query, total_threads=total_workers, thread_id=worker_number, hash_variable='name') + stmt = select( + models.UpdatedDID.id, + models.UpdatedDID.scope, + models.UpdatedDID.name, + models.UpdatedDID.rule_evaluation_action + ) + stmt = filter_thread_work(session=session, query=stmt, total_threads=total_workers, thread_id=worker_number, hash_variable='name') # Remove blocked dids from query, but only do the first 30 ones, not to overload the query if blocked_dids: chunk = list(chunks(blocked_dids, 30))[0] - query = query.filter(tuple_(models.UpdatedDID.scope, models.UpdatedDID.name).notin_(chunk)) + stmt = stmt.where(tuple_(models.UpdatedDID.scope, models.UpdatedDID.name).notin_(chunk)) if limit: - fetched_dids = query.order_by(models.UpdatedDID.created_at).limit(limit).all() + fetched_dids = stmt.order_by(models.UpdatedDID.created_at).limit(limit).all() filtered_dids = [did for did in fetched_dids if (did.scope, did.name) not in blocked_dids] if len(fetched_dids) == limit and not filtered_dids: return get_updated_dids(total_workers=total_workers, @@ -1847,7 +2124,7 @@ def get_updated_dids( else: return filtered_dids else: - return [did for did in query.order_by(models.UpdatedDID.created_at).all() if (did.scope, did.name) not in blocked_dids] + return [did._tuple() for did in session.execute(stmt.order_by(models.UpdatedDID.created_at)).all() if (did.scope, did.name) not in blocked_dids] @read_session @@ -1872,18 +2149,21 @@ def get_rules_beyond_eol( :param total_workers: Number of total workers. :param session: Database session in use. """ - query = session.query(models.ReplicationRule.scope, - models.ReplicationRule.name, - models.ReplicationRule.rse_expression, - models.ReplicationRule.locked, - models.ReplicationRule.id, - models.ReplicationRule.eol_at, - models.ReplicationRule.expires_at, - models.ReplicationRule.account).\ - filter(models.ReplicationRule.eol_at < date_check) + stmt = select( + models.ReplicationRule.scope, + models.ReplicationRule.name, + models.ReplicationRule.rse_expression, + models.ReplicationRule.locked, + models.ReplicationRule.id, + models.ReplicationRule.eol_at, + models.ReplicationRule.expires_at, + models.ReplicationRule.account + ).where( + models.ReplicationRule.eol_at < date_check + ) - query = filter_thread_work(session=session, query=query, total_threads=total_workers, thread_id=worker_number, hash_variable='name') - return [rule for rule in query.all()] + stmt = filter_thread_work(session=session, query=stmt, total_threads=total_workers, thread_id=worker_number, hash_variable='name') + return [row._tuple() for row in session.execute(stmt).all()] @read_session @@ -1905,18 +2185,25 @@ def get_expired_rules( :param session: Database session in use. """ - query = session.query(models.ReplicationRule.id, models.ReplicationRule.rse_expression).filter(models.ReplicationRule.expires_at < datetime.utcnow(), - models.ReplicationRule.locked == false(), - models.ReplicationRule.child_rule_id == None).\ - with_hint(models.ReplicationRule, "index(rules RULES_EXPIRES_AT_IDX)", 'oracle').\ - order_by(models.ReplicationRule.expires_at) # NOQA - - query = filter_thread_work(session=session, query=query, total_threads=total_workers, thread_id=worker_number, hash_variable='name') + stmt = select( + models.ReplicationRule.id, + models.ReplicationRule.rse_expression + ).with_hint( + models.ReplicationRule, 'INDEX(RULES RULES_EXPIRES_AT_IDX)', 'oracle' + ).where( + and_(models.ReplicationRule.expires_at < datetime.utcnow(), + models.ReplicationRule.locked == false(), + models.ReplicationRule.child_rule_id == null()) + ).order_by( + models.ReplicationRule.expires_at + ) + stmt = filter_thread_work(session=session, query=stmt, total_threads=total_workers, thread_id=worker_number, hash_variable='name') if limit: - fetched_rules = query.limit(limit).all() - filtered_rules = [rule for rule in fetched_rules if rule[0] not in blocked_rules] - if len(fetched_rules) == limit and not filtered_rules: + stmt = stmt.limit(limit) + result = session.execute(stmt).all() + filtered_rules = [rule._tuple() for rule in result if rule.id not in blocked_rules] + if len(result) == limit and not filtered_rules: return get_expired_rules(total_workers=total_workers, worker_number=worker_number, limit=None, @@ -1925,7 +2212,7 @@ def get_expired_rules( else: return filtered_rules else: - return [rule for rule in query.all() if rule[0] not in blocked_rules] + return [rule._tuple() for rule in session.execute(stmt).all() if rule.id not in blocked_rules] @read_session @@ -1947,18 +2234,23 @@ def get_injected_rules( :param session: Database session in use. """ - query = session.query(models.ReplicationRule.id).\ - with_hint(models.ReplicationRule, "index(rules RULES_STATE_IDX)", 'oracle').\ - filter(models.ReplicationRule.state == RuleState.INJECT).\ - order_by(models.ReplicationRule.created_at).\ - filter(models.ReplicationRule.created_at <= datetime.utcnow()) - - query = filter_thread_work(session=session, query=query, total_threads=total_workers, thread_id=worker_number, hash_variable='name') + stmt = select( + models.ReplicationRule.id + ).with_hint( + models.ReplicationRule, 'INDEX(RULES RULES_STATE_IDX)', 'oracle' + ).where( + and_(models.ReplicationRule.state == RuleState.INJECT, + models.ReplicationRule.created_at <= datetime.utcnow()) + ).order_by( + models.ReplicationRule.created_at + ) + stmt = filter_thread_work(session=session, query=stmt, total_threads=total_workers, thread_id=worker_number, hash_variable='name') if limit: - fetched_rules = query.limit(limit).all() - filtered_rules = [rule[0] for rule in fetched_rules if rule[0] not in blocked_rules] - if len(fetched_rules) == limit and not filtered_rules: + stmt = stmt.limit(limit) + result = session.execute(stmt).scalars().all() + filtered_rules = [rule for rule in result if rule not in blocked_rules] + if len(result) == limit and not filtered_rules: return get_injected_rules(total_workers=total_workers, worker_number=worker_number, limit=None, @@ -1967,7 +2259,7 @@ def get_injected_rules( else: return filtered_rules else: - return [rule[0] for rule in query.all() if rule[0] not in blocked_rules] + return [rule for rule in session.execute(stmt).scalars().all() if rule not in blocked_rules] @read_session @@ -1990,21 +2282,26 @@ def get_stuck_rules( :param blocked_rules: Blocked rules to filter out. :param session: Database session in use. """ - query = session.query(models.ReplicationRule.id).\ - with_hint(models.ReplicationRule, "index(rules RULES_STATE_IDX)", 'oracle').\ - filter(models.ReplicationRule.state == RuleState.STUCK).\ - filter(models.ReplicationRule.updated_at < datetime.utcnow() - timedelta(seconds=delta)).\ - filter(or_(models.ReplicationRule.expires_at == null(), - models.ReplicationRule.expires_at > datetime.utcnow(), - models.ReplicationRule.locked == true())).\ - order_by(models.ReplicationRule.updated_at) - - query = filter_thread_work(session=session, query=query, total_threads=total_workers, thread_id=worker_number, hash_variable='name') + stmt = select( + models.ReplicationRule.id + ).with_hint( + models.ReplicationRule, 'INDEX(RULES RULES_STATE_IDX)', 'oracle' + ).where( + and_(models.ReplicationRule.state == RuleState.STUCK, + models.ReplicationRule.updated_at < datetime.utcnow() - timedelta(seconds=delta), + or_(models.ReplicationRule.expires_at == null(), + models.ReplicationRule.expires_at > datetime.utcnow(), + models.ReplicationRule.locked == true())) + ).order_by( + models.ReplicationRule.updated_at + ) + stmt = filter_thread_work(session=session, query=stmt, total_threads=total_workers, thread_id=worker_number, hash_variable='name') if limit: - fetched_rules = query.limit(limit).all() - filtered_rules = [rule[0] for rule in fetched_rules if rule[0] not in blocked_rules] - if len(fetched_rules) == limit and not filtered_rules: + stmt = stmt.limit(limit) + result = session.execute(stmt).scalars().all() + filtered_rules = [rule for rule in result if rule not in blocked_rules] + if len(result) == limit and not filtered_rules: return get_stuck_rules(total_workers=total_workers, worker_number=worker_number, delta=delta, @@ -2014,7 +2311,7 @@ def get_stuck_rules( else: return filtered_rules else: - return [rule[0] for rule in query.all() if rule[0] not in blocked_rules] + return [rule for rule in session.execute(stmt).scalars().all() if rule not in blocked_rules] @transactional_session @@ -2029,7 +2326,12 @@ def delete_updated_did( :param id_: Id of the row not to delete. :param session: The database session in use. """ - session.query(models.UpdatedDID).filter(models.UpdatedDID.id == id_).delete() + stmt = delete( + models.UpdatedDID + ).where( + models.UpdatedDID.id == id_ + ) + session.execute(stmt) @transactional_session @@ -2053,9 +2355,38 @@ def update_rules_for_lost_replica( :param logger: Optional decorated logger that can be passed from the calling daemons or servers. """ - locks = session.query(models.ReplicaLock).filter(models.ReplicaLock.scope == scope, models.ReplicaLock.name == name, models.ReplicaLock.rse_id == rse_id).with_for_update(nowait=nowait).all() - replica = session.query(models.RSEFileAssociation).filter(models.RSEFileAssociation.scope == scope, models.RSEFileAssociation.name == name, models.RSEFileAssociation.rse_id == rse_id).with_for_update(nowait=nowait).one() - requests = session.query(models.Request).filter(models.Request.scope == scope, models.Request.name == name, models.Request.dest_rse_id == rse_id).with_for_update(nowait=nowait).all() + stmt = select( + models.ReplicaLock + ).where( + and_(models.ReplicaLock.scope == scope, + models.ReplicaLock.name == name, + models.ReplicaLock.rse_id == rse_id) + ).with_for_update( + nowait=nowait + ) + locks = session.execute(stmt).scalars().all() + + stmt = select( + models.RSEFileAssociation + ).where( + and_(models.RSEFileAssociation.scope == scope, + models.RSEFileAssociation.name == name, + models.RSEFileAssociation.rse_id == rse_id) + ).with_for_update( + nowait=nowait + ) + replica = session.execute(stmt).scalar_one() + + stmt = select( + models.Request + ).where( + and_(models.Request.scope == scope, + models.Request.name == name, + models.Request.dest_rse_id == rse_id) + ).with_for_update( + nowait=nowait + ) + requests = session.execute(stmt).scalars().all() rse = get_rse_name(rse_id, session=session) @@ -2069,7 +2400,14 @@ def update_rules_for_lost_replica( session.delete(request) for lock in locks: - rule = session.query(models.ReplicationRule).filter(models.ReplicationRule.id == lock.rule_id).with_for_update(nowait=nowait).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == lock.rule_id + ).with_for_update( + nowait=nowait + ) + rule = session.execute(stmt).scalar_one() rule_state_before = rule.state replica.lock_cnt -= 1 if lock.state == LockState.OK: @@ -2086,7 +2424,14 @@ def update_rules_for_lost_replica( elif rule.locks_replicating_cnt == 0 and rule.locks_stuck_cnt == 0: rule.state = RuleState.OK if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.OK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.OK + }) + session.execute(stmt) session.flush() if rule_state_before != RuleState.OK: generate_rule_notifications(rule=rule, session=session) @@ -2104,8 +2449,28 @@ def update_rules_for_lost_replica( replica.tombstone = OBSOLETE replica.state = ReplicaState.UNAVAILABLE - session.query(models.DataIdentifier).filter_by(scope=scope, name=name).update({'availability': DIDAvailability.LOST}) - session.query(models.BadReplicas).filter_by(state=BadFilesStatus.BAD, rse_id=rse_id, scope=scope, name=name).update({'state': BadFilesStatus.LOST, 'updated_at': datetime.utcnow()}) + stmt = update( + models.DataIdentifier + ).where( + and_(models.DataIdentifier.scope == scope, + models.DataIdentifier.name == name) + ).values({ + models.DataIdentifier.availability: DIDAvailability.LOST + }) + session.execute(stmt) + + stmt = update( + models.BadReplicas + ).where( + and_(models.BadReplicas.scope == scope, + models.BadReplicas.name == name, + models.BadReplicas.rse_id == rse_id, + models.BadReplicas.state == BadFilesStatus.BAD) + ).values({ + models.BadReplicas.state: BadFilesStatus.LOST, + models.BadReplicas.updated_at: datetime.utcnow() + }) + session.execute(stmt) for dts in datasets: logger(logging.INFO, 'File %s:%s bad at site %s is completely lost from dataset %s:%s. Will be marked as LOST and detached', scope, name, rse, dts['scope'], dts['name']) rucio.core.did.detach_dids(scope=dts['scope'], name=dts['name'], dids=[{'scope': scope, 'name': name}], session=session) @@ -2140,15 +2505,40 @@ def update_rules_for_bad_replica( :param session: The database session in use. :param logger: Optional decorated logger that can be passed from the calling daemons or servers. """ + stmt = select( + models.ReplicaLock + ).where( + and_(models.ReplicaLock.scope == scope, + models.ReplicaLock.name == name, + models.ReplicaLock.rse_id == rse_id) + ).with_for_update( + nowait=nowait + ) + locks = session.execute(stmt).scalars().all() - locks = session.query(models.ReplicaLock).filter(models.ReplicaLock.scope == scope, models.ReplicaLock.name == name, models.ReplicaLock.rse_id == rse_id).with_for_update(nowait=nowait).all() - replica = session.query(models.RSEFileAssociation).filter(models.RSEFileAssociation.scope == scope, models.RSEFileAssociation.name == name, models.RSEFileAssociation.rse_id == rse_id).with_for_update(nowait=nowait).one() + stmt = select( + models.RSEFileAssociation + ).where( + and_(models.RSEFileAssociation.scope == scope, + models.RSEFileAssociation.name == name, + models.RSEFileAssociation.rse_id == rse_id) + ).with_for_update( + nowait=nowait + ) + replica = session.execute(stmt).scalar_one() nlock = 0 datasets = [] for lock in locks: nlock += 1 - rule = session.query(models.ReplicationRule).filter(models.ReplicationRule.id == lock.rule_id).with_for_update(nowait=nowait).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == lock.rule_id + ).with_for_update( + nowait=nowait + ) + rule = session.execute(stmt).scalar_one() # If source replica expression exists, we remove it if rule.source_replica_expression: rule.source_replica_expression = None @@ -2191,15 +2581,41 @@ def update_rules_for_bad_replica( else: rule.state = RuleState.REPLICATING if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.REPLICATING}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) + session.execute(stmt) # Insert rule history insert_rule_history(rule=rule, recent=True, longterm=False, session=session) if nlock: - session.query(models.RSEFileAssociation).filter(models.RSEFileAssociation.scope == scope, models.RSEFileAssociation.name == name, models.RSEFileAssociation.rse_id == rse_id).update({'state': ReplicaState.COPYING}) + stmt = update( + models.RSEFileAssociation + ).where( + and_(models.RSEFileAssociation.scope == scope, + models.RSEFileAssociation.name == name, + models.RSEFileAssociation.rse_id == rse_id) + ).values({ + models.RSEFileAssociation.state: ReplicaState.COPYING + }) + session.execute(stmt) else: logger(logging.INFO, 'File %s:%s at site %s has no locks. Will be deleted now.', scope, name, get_rse_name(rse_id=rse_id, session=session)) tombstone = OBSOLETE - session.query(models.RSEFileAssociation).filter(models.RSEFileAssociation.scope == scope, models.RSEFileAssociation.name == name, models.RSEFileAssociation.rse_id == rse_id).update({'state': ReplicaState.UNAVAILABLE, 'tombstone': tombstone}) + stmt = update( + models.RSEFileAssociation + ).where( + and_(models.RSEFileAssociation.scope == scope, + models.RSEFileAssociation.name == name, + models.RSEFileAssociation.rse_id == rse_id) + ).values({ + models.RSEFileAssociation.state: ReplicaState.UNAVAILABLE, + models.RSEFileAssociation.tombstone: tombstone + }) + session.execute(stmt) @transactional_session @@ -2262,7 +2678,12 @@ def generate_rule_notifications( if rule.grouping != RuleGrouping.NONE: # Only send DATASETLOCK_OK callbacks for ALL/DATASET grouped rules if rule.notification == RuleNotification.YES: - dataset_locks = session.query(models.DatasetLock).filter_by(rule_id=rule.id).all() + stmt = select( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ) + dataset_locks = session.execute(stmt).scalars().all() for dataset_lock in dataset_locks: payload = {'scope': dataset_lock.scope.external, 'name': dataset_lock.name, @@ -2275,7 +2696,12 @@ def generate_rule_notifications( add_message(event_type='DATASETLOCK_OK', payload=payload, session=session) elif rule.notification == RuleNotification.CLOSE: - dataset_locks = session.query(models.DatasetLock).filter_by(rule_id=rule.id).all() + stmt = select( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ) + dataset_locks = session.execute(stmt).scalars().all() for dataset_lock in dataset_locks: try: did = rucio.core.did.get_did(scope=dataset_lock.scope, name=dataset_lock.name, session=session) @@ -2426,7 +2852,12 @@ def approve_rule( """ try: - rule = session.query(models.ReplicationRule).filter_by(id=rule_id).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == rule_id + ) + rule = session.execute(stmt).scalar_one() if rule.state == RuleState.WAITING_APPROVAL: rule.ignore_account_limit = True rule.state = RuleState.INJECT @@ -2492,7 +2923,12 @@ def deny_rule( """ try: - rule = session.query(models.ReplicationRule).filter_by(id=rule_id).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == rule_id + ) + rule = session.execute(stmt).scalar_one() if rule.state == RuleState.WAITING_APPROVAL: with open('%s/rule_denied_user.tmpl' % config_get('common', 'mailtemplatedir'), 'r') as templatefile: template = Template(templatefile.read()) @@ -2555,7 +2991,12 @@ def examine_rule( 'transfers': []} try: - rule = session.query(models.ReplicationRule).filter_by(id=rule_id).one() + stmt = select( + models.ReplicationRule + ).where( + models.ReplicationRule.id == rule_id + ) + rule = session.execute(stmt).scalar_one() if rule.state == RuleState.OK: result['rule_error'] = 'This replication rule is OK' elif rule.state == RuleState.REPLICATING: @@ -2565,10 +3006,25 @@ def examine_rule( else: result['rule_error'] = rule.error # Get the stuck locks - stuck_locks = session.query(models.ReplicaLock).filter_by(rule_id=rule_id, state=LockState.STUCK).all() + stmt = select( + models.ReplicaLock + ).where( + and_(models.ReplicaLock.rule_id == rule_id, + models.ReplicaLock.state == LockState.STUCK) + ) + stuck_locks = session.execute(stmt).scalars().all() for lock in stuck_locks: # Get the count of requests in the request_history for each lock - transfers = session.query(models.RequestHistory).filter_by(scope=lock.scope, name=lock.name, dest_rse_id=lock.rse_id).order_by(models.RequestHistory.created_at.desc()).all() # pylint: disable=no-member + stmt = select( + models.RequestHistory + ).where( + and_(models.RequestHistory.scope == lock.scope, + models.RequestHistory.name == lock.name, + models.RequestHistory.dest_rse_id == lock.rse_id) + ).order_by( + desc(models.RequestHistory.created_at) + ) + transfers = session.execute(stmt).scalars().all() transfer_cnt = len(transfers) # Get the error of the last request that has been tried and also the SOURCE used for the last request last_error, last_source, last_time, sources = None, None, None, [] @@ -2577,7 +3033,14 @@ def examine_rule( last_error = last_request.state last_time = last_request.created_at last_source = None if last_request.source_rse_id is None else get_rse_name(rse_id=last_request.source_rse_id, session=session) - available_replicas = session.query(models.RSEFileAssociation).filter_by(scope=lock.scope, name=lock.name, state=ReplicaState.AVAILABLE).all() + stmt = select( + models.RSEFileAssociation + ).where( + and_(models.RSEFileAssociation.scope == lock.scope, + models.RSEFileAssociation.name == lock.name, + models.RSEFileAssociation.state == ReplicaState.AVAILABLE) + ) + available_replicas = session.execute(stmt).scalars().all() for replica in available_replicas: sources.append((get_rse_name(rse_id=replica.rse_id, session=session), @@ -2641,8 +3104,14 @@ def release_parent_rule( session.flush() - parent_rules = session.query(models.ReplicationRule).filter_by(child_rule_id=child_rule_id).\ - with_hint(models.ReplicationRule, "index(RULES RULES_CHILD_RULE_ID_IDX)", 'oracle').all() + stmt = select( + models.ReplicationRule + ).with_hint( + models.ReplicationRule, 'INDEX(RULES RULES_CHILD_RULE_ID_IDX)', 'oracle' + ).where( + models.ReplicationRule.child_rule_id == child_rule_id + ) + parent_rules = session.execute(stmt).scalars().all() for rule in parent_rules: if remove_parent_expiration: rule.expires_at = None @@ -2918,9 +3387,25 @@ def __evaluate_did_detach( parent_dids = rucio.core.did.list_all_parent_dids(scope=eval_did.scope, name=eval_did.name, session=session) # Get all RR from parents and eval_did - rules = session.query(models.ReplicationRule).filter_by(scope=eval_did.scope, name=eval_did.name).with_for_update(nowait=True).all() + stmt = select( + models.ReplicationRule + ).where( + and_(models.ReplicationRule.scope == eval_did.scope, + models.ReplicationRule.name == eval_did.name) + ).with_for_update( + nowait=True + ) + rules = list(session.execute(stmt).scalars().all()) for did in parent_dids: - rules.extend(session.query(models.ReplicationRule).filter_by(scope=did['scope'], name=did['name']).with_for_update(nowait=True).all()) + stmt = select( + models.ReplicationRule + ).where( + and_(models.ReplicationRule.scope == did['scope'], + models.ReplicationRule.name == did['name']) + ).with_for_update( + nowait=True + ) + rules.extend(session.execute(stmt).scalars().all()) # Iterate rules and delete locks transfers_to_delete = [] # [{'scope': , 'name':, 'rse_id':}] @@ -2932,8 +3417,12 @@ def __evaluate_did_detach( files[(file['scope'], file['name'])] = True logger(logging.DEBUG, "Removing locks for rule %s [%d/%d/%d]", str(rule.id), rule.locks_ok_cnt, rule.locks_replicating_cnt, rule.locks_stuck_cnt) rule_locks_ok_cnt_before = rule.locks_ok_cnt - query = session.query(models.ReplicaLock).filter_by(rule_id=rule.id) - for lock in query: + stmt = select( + models.ReplicaLock + ).where( + models.ReplicaLock.rule_id == rule.id + ) + for lock in session.execute(stmt).scalars().all(): if (lock.scope, lock.name) not in files: if __delete_lock_and_update_replica(lock=lock, purge_replicas=force_epoch or rule.purge_replicas, nowait=True, session=session): transfers_to_delete.append({'scope': lock.scope, 'name': lock.name, 'rse_id': lock.rse_id}) @@ -2954,7 +3443,12 @@ def __evaluate_did_detach( for ds in rucio.core.did.list_child_datasets(scope=rule.scope, name=rule.name, session=session): child_datasets[(ds['scope'], ds['name'])] = True logger(logging.DEBUG, "Removing dataset_locks for rule %s [%d/%d/%d]", str(rule.id), rule.locks_ok_cnt, rule.locks_replicating_cnt, rule.locks_stuck_cnt) - query = session.query(models.DatasetLock).filter_by(rule_id=rule.id) + stmt = select( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ) + query = session.execute(stmt).scalars().all() for ds_lock in query: if (ds_lock.scope, ds_lock.name) not in child_datasets: ds_lock.delete(flush=False, session=session) @@ -2967,7 +3461,14 @@ def __evaluate_did_detach( elif rule.locks_replicating_cnt == 0 and rule.locks_stuck_cnt == 0: rule.state = RuleState.OK if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.OK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.OK + }) + session.execute(stmt) session.flush() if rule_locks_ok_cnt_before != rule.locks_ok_cnt: generate_rule_notifications(rule=rule, session=session) @@ -3004,10 +3505,15 @@ def __oldest_file_under( :param name: dataset or container name :returns: tuple (scope, name) or None """ - children = session.query(models.DataIdentifierAssociation) \ - .filter(models.DataIdentifierAssociation.scope == scope, - models.DataIdentifierAssociation.name == name) \ - .order_by(models.DataIdentifierAssociation.created_at) + stmt = select( + models.DataIdentifierAssociation + ).where( + and_(models.DataIdentifierAssociation.scope == scope, + models.DataIdentifierAssociation.name == name) + ).order_by( + models.DataIdentifierAssociation.created_at + ) + children = session.execute(stmt).scalars().all() for child in children: if child.child_type == DIDType.FILE: return child.child_scope, child.child_name @@ -3043,12 +3549,16 @@ def __evaluate_did_attach( # Get immediate new child DID's with METRICS.timer('evaluate_did_attach.list_new_child_dids'): - new_child_dids = session.query(models.DataIdentifierAssociation)\ - .with_hint(models.DataIdentifierAssociation, "INDEX_RS_ASC(contents contents_pk)", 'oracle')\ - .filter(models.DataIdentifierAssociation.scope == eval_did.scope, - models.DataIdentifierAssociation.name == eval_did.name, - models.DataIdentifierAssociation.rule_evaluation == True).all() # noqa - + stmt = select( + models.DataIdentifierAssociation + ).with_hint( + models.DataIdentifierAssociation, 'INDEX_RS_ASC(CONTENTS CONTENTS_PK)', 'oracle' + ).where( + and_(models.DataIdentifierAssociation.scope == eval_did.scope, + models.DataIdentifierAssociation.name == eval_did.name, + models.DataIdentifierAssociation.rule_evaluation == true()) + ) + new_child_dids = session.execute(stmt).scalars().all() if new_child_dids: # Get all unsuspended RR from parents and eval_did with METRICS.timer('evaluate_did_attach.get_rules'): @@ -3058,12 +3568,17 @@ def __evaluate_did_attach( models.ReplicationRule.name == did['name'])) rule_clauses.append(and_(models.ReplicationRule.scope == eval_did.scope, models.ReplicationRule.name == eval_did.name)) - rules = session.query(models.ReplicationRule).filter( - or_(*rule_clauses), - models.ReplicationRule.state != RuleState.SUSPENDED, - models.ReplicationRule.state != RuleState.WAITING_APPROVAL, - models.ReplicationRule.state != RuleState.INJECT).with_for_update(nowait=True).all() - + stmt = select( + models.ReplicationRule + ).where( + and_(or_(*rule_clauses), + models.ReplicationRule.state.not_in([RuleState.SUSPENDED, + RuleState.WAITING_APPROVAL, + RuleState.INJECT])) + ).with_for_update( + nowait=True + ) + rules = session.execute(stmt).scalars().all() if rules: # Resolve the new_child_dids to its locks with METRICS.timer('evaluate_did_attach.resolve_did_to_locks_and_replicas'): @@ -3116,7 +3631,14 @@ def __evaluate_did_attach( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) # Try to update the DatasetLocks if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) continue # 2. Create the RSE Selector @@ -3135,7 +3657,14 @@ def __evaluate_did_attach( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) # Try to update the DatasetLocks if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) continue # 3. Apply the Replication rule to the Files @@ -3178,7 +3707,14 @@ def __evaluate_did_attach( insert_rule_history(rule=rule, recent=True, longterm=False, session=session) # Try to update the DatasetLocks if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) continue # 4. Update the Rule State @@ -3188,15 +3724,36 @@ def __evaluate_did_attach( if locks_stuck_before != rule.locks_stuck_cnt: rule.state = RuleState.STUCK rule.error = 'MissingSourceReplica' - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.STUCK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.STUCK + }) + session.execute(stmt) elif rule.locks_replicating_cnt > 0: rule.state = RuleState.REPLICATING if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.REPLICATING}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.REPLICATING + }) + session.execute(stmt) elif rule.locks_replicating_cnt == 0 and rule.locks_stuck_cnt == 0: rule.state = RuleState.OK if rule.grouping != RuleGrouping.NONE: - session.query(models.DatasetLock).filter_by(rule_id=rule.id).update({'state': LockState.OK}) + stmt = update( + models.DatasetLock + ).where( + models.DatasetLock.rule_id == rule.id + ).values({ + models.DatasetLock.state: LockState.OK + }) + session.execute(stmt) session.flush() if rule_locks_ok_cnt_before < rule.locks_ok_cnt: generate_rule_notifications(rule=rule, session=session) @@ -3389,13 +3946,28 @@ def __resolve_dids_to_locks_and_replicas( for lock_clause_chunk in lock_clause_chunks: if locks_rse_clause: - tmp_locks = session.query(models.ReplicaLock).filter(or_(*lock_clause_chunk), or_(*locks_rse_clause))\ - .with_hint(models.ReplicaLock, "index(LOCKS LOCKS_PK)", 'oracle')\ - .with_for_update(nowait=nowait).all() + stmt = select( + models.ReplicaLock + ).with_hint( + models.ReplicaLock, 'INDEX(LOCKS LOCKS_PK)', 'oracle' + ).where( + and_(or_(*lock_clause_chunk), + or_(*locks_rse_clause)) + ).with_for_update( + nowait=nowait + ) + tmp_locks = session.execute(stmt).scalars().all() else: - tmp_locks = session.query(models.ReplicaLock).filter(or_(*lock_clause_chunk))\ - .with_hint(models.ReplicaLock, "index(LOCKS LOCKS_PK)", 'oracle')\ - .with_for_update(nowait=nowait).all() + stmt = select( + models.ReplicaLock + ).with_hint( + models.ReplicaLock, 'INDEX(LOCKS LOCKS_PK)', 'oracle' + ).where( + or_(*lock_clause_chunk) + ).with_for_update( + nowait=nowait + ) + tmp_locks = session.execute(stmt).scalars().all() for lock in tmp_locks: if (lock.scope, lock.name) not in locks: locks[(lock.scope, lock.name)] = [lock] @@ -3404,13 +3976,30 @@ def __resolve_dids_to_locks_and_replicas( for replica_clause_chunk in replica_clause_chunks: if replicas_rse_clause: - tmp_replicas = session.query(models.RSEFileAssociation).filter(or_(*replica_clause_chunk), or_(*replicas_rse_clause), models.RSEFileAssociation.state != ReplicaState.BEING_DELETED)\ - .with_hint(models.RSEFileAssociation, "index(REPLICAS REPLICAS_PK)", 'oracle')\ - .with_for_update(nowait=nowait).all() + stmt = select( + models.RSEFileAssociation + ).with_hint( + models.RSEFileAssociation, 'INDEX(REPLICAS REPLICAS_PK)', 'oracle' + ).where( + and_(or_(*replica_clause_chunk), + or_(*replicas_rse_clause), + models.RSEFileAssociation.state != ReplicaState.BEING_DELETED) + ).with_for_update( + nowait=nowait + ) + tmp_replicas = session.execute(stmt).scalars().all() else: - tmp_replicas = session.query(models.RSEFileAssociation).filter(or_(*replica_clause_chunk), models.RSEFileAssociation.state != ReplicaState.BEING_DELETED)\ - .with_hint(models.RSEFileAssociation, "index(REPLICAS REPLICAS_PK)", 'oracle')\ - .with_for_update(nowait=nowait).all() + stmt = select( + models.RSEFileAssociation + ).with_hint( + models.RSEFileAssociation, 'INDEX(REPLICAS REPLICAS_PK)', 'oracle' + ).where( + and_(or_(*replica_clause_chunk), + models.RSEFileAssociation.state != ReplicaState.BEING_DELETED) + ).with_for_update( + nowait=nowait + ) + tmp_replicas = session.execute(stmt).scalars().all() for replica in tmp_replicas: if (replica.scope, replica.name) not in replicas: replicas[(replica.scope, replica.name)] = [replica] @@ -3419,9 +4008,18 @@ def __resolve_dids_to_locks_and_replicas( if source_rses: for replica_clause_chunk in replica_clause_chunks: - tmp_source_replicas = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.rse_id).\ - filter(or_(*replica_clause_chunk), or_(*source_replicas_rse_clause), models.RSEFileAssociation.state == ReplicaState.AVAILABLE)\ - .with_hint(models.RSEFileAssociation, "index(REPLICAS REPLICAS_PK)", 'oracle').all() + stmt = select( + models.RSEFileAssociation.scope, + models.RSEFileAssociation.name, + models.RSEFileAssociation.rse_id + ).with_hint( + models.RSEFileAssociation, 'INDEX(REPLICAS REPLICAS_PK)', 'oracle' + ).where( + and_(or_(*replica_clause_chunk), + or_(*source_replicas_rse_clause), + models.RSEFileAssociation.state == ReplicaState.AVAILABLE) + ) + tmp_source_replicas = session.execute(stmt).all() for scope, name, rse_id in tmp_source_replicas: if (scope, name) not in source_replicas: source_replicas[(scope, name)] = [rse_id] @@ -3430,7 +4028,13 @@ def __resolve_dids_to_locks_and_replicas( else: # The evaluate_dids will be containers and/or datasets for did in dids: - real_did = session.query(models.DataIdentifier).filter(models.DataIdentifier.scope == did.child_scope, models.DataIdentifier.name == did.child_name).one() + stmt = select( + models.DataIdentifier + ).where( + and_(models.DataIdentifier.scope == did.child_scope, + models.DataIdentifier.name == did.child_name) + ) + real_did = session.execute(stmt).scalar_one() tmp_datasetfiles, tmp_locks, tmp_replicas, tmp_source_replicas = __resolve_did_to_locks_and_replicas(did=real_did, nowait=nowait, restrict_rses=restrict_rses, @@ -3531,10 +4135,16 @@ def __delete_lock_and_update_replica( logger(logging.DEBUG, "Deleting lock %s:%s for rule %s", lock.scope, lock.name, str(lock.rule_id)) lock.delete(session=session, flush=False) try: - replica = session.query(models.RSEFileAssociation).filter( - models.RSEFileAssociation.scope == lock.scope, - models.RSEFileAssociation.name == lock.name, - models.RSEFileAssociation.rse_id == lock.rse_id).with_for_update(nowait=nowait).one() + stmt = select( + models.RSEFileAssociation + ).where( + and_(models.RSEFileAssociation.scope == lock.scope, + models.RSEFileAssociation.name == lock.name, + models.RSEFileAssociation.rse_id == lock.rse_id) + ).with_for_update( + nowait=nowait + ) + replica = session.execute(stmt).scalar_one() replica.lock_cnt -= 1 if replica.lock_cnt == 0: if purge_replicas: