Skip to content

Commit

Permalink
Core & Internals: Syntax conventions and small type changes
Browse files Browse the repository at this point in the history
  • Loading branch information
erlingstaff committed Mar 13, 2024
1 parent 0585b91 commit 45e497e
Showing 1 changed file with 13 additions and 19 deletions.
32 changes: 13 additions & 19 deletions lib/rucio/core/rule.py
Expand Up @@ -22,9 +22,9 @@
from os import path
from re import match
from string import Template
from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Type, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Type, TypeVar, Union

from dogpile.cache.api import NO_VALUE
from dogpile.cache.api import NoValue
from sqlalchemy import delete, desc, select, update
from sqlalchemy.exc import IntegrityError, StatementError
from sqlalchemy.exc import NoResultFound # https://pydoc.dev/sqlalchemy/latest/sqlalchemy.exc.NoResultFound.html
Expand Down Expand Up @@ -1379,13 +1379,11 @@ def repair_rule(rule_id: str, *, session: "Session", logger: LoggerFunction = lo
# Delete Datasetlocks which are not relevant anymore
stmt = select(
models.ReplicaLock.rse_id
).distinct(
).where(
models.ReplicaLock.rule_id == rule.id
).group_by(
models.ReplicaLock.rse_id
)
results = session.execute(stmt).all()
validated_datasetlock_rse_ids = [rse_id.rse_id for rse_id in results]
validated_datasetlock_rse_ids = session.execute(stmt).scalars().all()

stmt = select(
models.DatasetLock
Expand Down Expand Up @@ -1979,8 +1977,7 @@ def re_evaluate_did(scope: InternalScope, name: str, rule_evaluation_action: DID
and_(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)
)
result = session.execute(stmt)
for bytes_, length in result:
for bytes_, length in session.execute(stmt):
did.bytes = bytes_
did.length = length

Expand Down Expand Up @@ -2054,8 +2051,7 @@ def get_rules_beyond_eol(date_check: datetime, worker_number: int, total_workers
)

stmt = filter_thread_work(session=session, query=stmt, total_threads=total_workers, thread_id=worker_number, hash_variable='name')
result = session.execute(stmt).all()
return [rule for rule in result] # type: ignore
return session.execute(stmt).all() # type: ignore


@read_session
Expand Down Expand Up @@ -2889,16 +2885,16 @@ def get_evaluation_backlog(expiration_time: int = 600, *, session: "Session") ->
:returns: Tuple (Count, Datetime of oldest entry)
"""

cached_backlog = REGION.get('rule_evaluation_backlog', expiration_time=expiration_time)
if cached_backlog is NO_VALUE:
cached_backlog: Union[NoValue, tuple[int, datetime]] = REGION.get('rule_evaluation_backlog', expiration_time=expiration_time)
if isinstance(cached_backlog, NoValue):
stmt = select(
func.count(models.UpdatedDID.created_at),
func.min(models.UpdatedDID.created_at)
)
result = session.execute(stmt).scalars().one()
REGION.set('rule_evaluation_backlog', result)
return result # type: ignore
return cached_backlog # type: ignore
return result
return cached_backlog


@transactional_session
Expand Down Expand Up @@ -3186,8 +3182,7 @@ def __evaluate_did_detach(eval_did: models.DataIdentifier, *, session: "Session"
).with_for_update(
nowait=True
)
parent_rules = list(session.execute(stmt).scalars().all())
rules.extend(parent_rules)
rules.extend(session.execute(stmt).scalars().all())

# Iterate rules and delete locks
transfers_to_delete = [] # [{'scope': , 'name':, 'rse_id':}]
Expand All @@ -3204,8 +3199,7 @@ def __evaluate_did_detach(eval_did: models.DataIdentifier, *, session: "Session"
).where(
models.ReplicaLock.rule_id == rule.id
)
locks = session.execute(stmt).scalars().all()
for lock in locks:
for lock in session.execute(stmt).scalars().all():
if (lock.scope, lock.name) not in files:
if __delete_lock_and_update_replica(lock=lock, purge_replicas=force_epoch or rule.purge_replicas, nowait=True, session=session):
transfers_to_delete.append({'scope': lock.scope, 'name': lock.name, 'rse_id': lock.rse_id})
Expand Down Expand Up @@ -3329,7 +3323,7 @@ def __evaluate_did_attach(eval_did: models.DataIdentifier, *, session: "Session"
).where(
and_(models.DataIdentifierAssociation.scope == eval_did.scope,
models.DataIdentifierAssociation.name == eval_did.name,
models.DataIdentifierAssociation.rule_evaluation == true()) # pylint: disable=C0121:singleton-comparison # noqa: E712
models.DataIdentifierAssociation.rule_evaluation == true())
)
new_child_dids = session.execute(stmt).scalars().all()
if new_child_dids:
Expand Down

0 comments on commit 45e497e

Please sign in to comment.