diff --git a/lib/rucio/core/did.py b/lib/rucio/core/did.py index 3e5b15dd69..68a00f2ac2 100644 --- a/lib/rucio/core/did.py +++ b/lib/rucio/core/did.py @@ -18,7 +18,7 @@ from enum import Enum from hashlib import md5 from re import match -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Literal, Optional, Union from sqlalchemy import and_, delete, exists, insert, or_, update from sqlalchemy.exc import DatabaseError, IntegrityError, NoResultFound @@ -40,11 +40,11 @@ from rucio.db.sqla.util import temp_table_mngr if TYPE_CHECKING: - from collections.abc import Iterator, Sequence - from typing import Any, Optional, Union + from collections.abc import Iterable, Iterator, Mapping, Sequence from sqlalchemy.orm import Session - from sqlalchemy.schema import Table + from sqlalchemy.sql._typing import ColumnExpressionArgument + from sqlalchemy.sql.selectable import Select from rucio.common.types import InternalAccount, InternalScope, LoggerFunction @@ -54,12 +54,12 @@ @read_session def list_expired_dids( - worker_number: int = None, - total_workers: int = None, - limit: int = None, + worker_number: Optional[int] = None, + total_workers: Optional[int] = None, + limit: Optional[int] = None, *, session: "Session" -): +) -> list[dict[str, Any]]: """ List expired data identifiers. @@ -119,17 +119,17 @@ def list_expired_dids( def add_did( scope: "InternalScope", name: str, - did_type: "Union[str, DIDType]", + did_type: Union[str, DIDType], account: "InternalAccount", - statuses: "Optional[dict[str, Any]]" = None, - meta: "Optional[dict[str, Any]]" = None, - rules: "Optional[Sequence[str]]" = None, - lifetime: "Optional[int]" = None, - dids: "Optional[Sequence[dict[str, Any]]]" = None, - rse_id: "Optional[str]" = None, + statuses: Optional["Mapping[str, Any]"] = None, + meta: Optional["Mapping[str, Any]"] = None, + rules: Optional["Sequence[str]"] = None, + lifetime: Optional[int] = None, + dids: Optional["Sequence[Mapping[str, Any]]"] = None, + rse_id: Optional[str] = None, *, session: "Session", -): +) -> None: """ Add data identifier. @@ -158,7 +158,7 @@ def add_dids( account: "InternalAccount", *, session: "Session", -): +) -> None: """ Bulk add data identifiers. @@ -250,12 +250,12 @@ def add_dids( def attach_dids( scope: "InternalScope", name: str, - dids: "Sequence[dict[str, Any]]", + dids: "Sequence[Mapping[str, Any]]", account: "InternalAccount", - rse_id: "Optional[str]" = None, + rse_id: Optional[str] = None, *, session: "Session", -): +) -> None: """ Append data identifier. @@ -271,12 +271,12 @@ def attach_dids( @transactional_session def attach_dids_to_dids( - attachments: "dict[str, Any]", + attachments: "Sequence[Mapping[str, Any]]", account: "InternalAccount", ignore_duplicate: bool = False, *, session: "Session", -): +) -> None: children_temp_table = temp_table_mngr(session).create_scope_name_table() parent_dids = list() first_iteration = True @@ -349,7 +349,15 @@ def attach_dids_to_dids( session.execute(insert(models.UpdatedDID), parent_dids) -def __add_files_to_archive(parent_did, files_temp_table, files, account, ignore_duplicate=False, *, session: "Session"): +def __add_files_to_archive( + parent_did: models.DataIdentifier, + files_temp_table: Any, + files: "Mapping[tuple[InternalScope, str], Mapping[str, Any]]", + account: "InternalAccount", + ignore_duplicate: bool = False, + *, + session: "Session" +) -> None: """ Add files to archive. @@ -504,7 +512,16 @@ def __add_files_to_archive(parent_did, files_temp_table, files, account, ignore_ @transactional_session -def __add_files_to_dataset(parent_did, files_temp_table, files, account, rse_id, ignore_duplicate=False, *, session: "Session"): +def __add_files_to_dataset( + parent_did: models.DataIdentifier, + files_temp_table: Any, + files: "Mapping[tuple[InternalScope, str], Mapping[str, Any]]", + account: "InternalAccount", + rse_id: str, + ignore_duplicate: bool = False, + *, + session: "Session" +) -> dict[tuple["InternalScope", str], dict[str, Any]]: """ Add files to dataset. @@ -629,7 +646,14 @@ def __add_files_to_dataset(parent_did, files_temp_table, files, account, rse_id, @transactional_session -def __add_collections_to_container(parent_did, collections_temp_table, collections, account, *, session: "Session"): +def __add_collections_to_container( + parent_did: models.DataIdentifier, + collections_temp_table: Any, + collections: "Mapping[tuple[InternalScope, str], Mapping[str, Any]]", + account: "InternalAccount", + *, + session: "Session" +) -> None: """ Add collections (datasets or containers) to container. @@ -729,7 +753,15 @@ def __add_collections_to_container(parent_did, collections_temp_table, collectio raise exception.RucioException(error.args) -def __add_files_to_archive_without_temp_tables(scope, name, files, account, ignore_duplicate=False, *, session: "Session"): +def __add_files_to_archive_without_temp_tables( + scope: "InternalScope", + name: str, + files: "Sequence[dict[str, Any]]", + account: "InternalAccount", + ignore_duplicate: bool = False, + *, + session: "Session" +) -> None: """ Add files to archive. @@ -886,7 +918,16 @@ def __add_files_to_archive_without_temp_tables(scope, name, files, account, igno @transactional_session -def __add_files_to_dataset_without_temp_tables(scope, name, files, account, rse_id, ignore_duplicate=False, *, session: "Session"): +def __add_files_to_dataset_without_temp_tables( + scope: "InternalScope", + name: str, + files: "Sequence[Mapping[str, str]]", + account: "InternalAccount", + rse_id: str, + ignore_duplicate: bool = False, + *, + session: "Session" +) -> list[dict[str, Any]]: """ Add files to dataset. @@ -992,7 +1033,14 @@ def __add_files_to_dataset_without_temp_tables(scope, name, files, account, rse_ @transactional_session -def __add_collections_to_container_without_temp_tables(scope, name, collections, account, *, session: "Session"): +def __add_collections_to_container_without_temp_tables( + scope: "InternalScope", + name: str, + collections: "Sequence[Mapping[str, Any]]", + account: "InternalAccount", + *, + session: "Session" +) -> None: """ Add collections (datasets or containers) to container. @@ -1095,13 +1143,13 @@ def __add_collections_to_container_without_temp_tables(scope, name, collections, @transactional_session def delete_dids( - dids: "Sequence[dict[str, Any]]", + dids: "Sequence[Mapping[str, Any]]", account: "InternalAccount", expire_rules: bool = False, *, session: "Session", logger: "LoggerFunction" = logging.log, -): +) -> None: """ Delete data identifiers @@ -1440,7 +1488,13 @@ def delete_dids( @transactional_session -def detach_dids(scope, name, dids, *, session: "Session"): +def detach_dids( + scope: "InternalScope", + name: str, + dids: "Sequence[Mapping[str, Any]]", + *, + session: "Session" +) -> None: """ Detach data identifier @@ -1560,7 +1614,14 @@ def detach_dids(scope, name, dids, *, session: "Session"): @stream_session -def list_new_dids(did_type, thread=None, total_threads=None, chunk_size=1000, *, session: "Session"): +def list_new_dids( + did_type: Union[str, "DIDType"], + thread: Optional[int] = None, + total_threads: Optional[int] = None, + chunk_size: int = 1000, + *, + session: "Session", +) -> "Iterator[dict[str, Any]]": """ List recent identifiers. @@ -1609,7 +1670,12 @@ def list_new_dids(did_type, thread=None, total_threads=None, chunk_size=1000, *, @transactional_session -def set_new_dids(dids, new_flag, *, session: "Session"): +def set_new_dids( + dids: "Sequence[Mapping[str, Any]]", + new_flag: Optional[bool], + *, + session: "Session" +) -> bool: """ Set/reset the flag new @@ -1646,7 +1712,12 @@ def set_new_dids(dids, new_flag, *, session: "Session"): @stream_session -def list_content(scope, name, *, session: "Session"): +def list_content( + scope: "InternalScope", + name: str, + *, + session: "Session" +) -> "Iterator[dict[str, Any]]": """ List data identifier contents. @@ -1673,7 +1744,12 @@ def list_content(scope, name, *, session: "Session"): @stream_session -def list_content_history(scope, name, *, session: "Session"): +def list_content_history( + scope: "InternalScope", + name: str, + *, + session: "Session" +) -> "Iterator[dict[str, Any]]": """ List data identifier contents history. @@ -1699,7 +1775,12 @@ def list_content_history(scope, name, *, session: "Session"): @stream_session -def list_parent_dids(scope, name, *, session: "Session"): +def list_parent_dids( + scope: "InternalScope", + name: str, + *, + session: "Session" +) -> "Iterator[dict[str, Any]]": """ List parent datasets and containers of a did. @@ -1723,7 +1804,12 @@ def list_parent_dids(scope, name, *, session: "Session"): @stream_session -def list_all_parent_dids(scope, name, *, session: "Session"): +def list_all_parent_dids( + scope: "InternalScope", + name: str, + *, + session: "Session" +) -> "Iterator[dict[str, Any]]": """ List all parent datasets and containers of a did, no matter on what level. @@ -1750,9 +1836,9 @@ def list_all_parent_dids(scope, name, *, session: "Session"): def list_child_dids_stmt( - input_dids_table: "Table", + input_dids_table: Any, did_type: DIDType, -): +) -> "Select[tuple[InternalScope, str]]": """ Build and returns a query which recursively lists children dids of type `did_type` for the dids given as input in a scope/name (temporary) table. @@ -1811,7 +1897,7 @@ def list_one_did_childs_stmt( scope: "InternalScope", name: str, did_type: DIDType, -): +) -> "Select[tuple[InternalScope, str]]": """ Returns the sqlalchemy query for recursively fetching the child dids of type 'did_type' for the input did. @@ -1868,7 +1954,7 @@ def list_child_datasets( name: str, *, session: "Session" -): +) -> list[dict[str, Union["InternalScope", str]]]: """ List all child datasets of a container. @@ -1887,7 +1973,12 @@ def list_child_datasets( @stream_session -def bulk_list_files(dids: "list[dict[str, Any]]", long: bool = False, *, session: "Session") -> "Optional[Iterator[dict[str, Any]]]": +def bulk_list_files( + dids: "Iterable[Mapping[str, Any]]", + long: bool = False, + *, + session: "Session" +) -> "Optional[Iterator[dict[str, Any]]]": """ List file contents of a list of data identifier. @@ -2012,7 +2103,13 @@ def list_files(scope: "InternalScope", name: str, long: bool = False, *, session @stream_session -def scope_list(scope, name=None, recursive=False, *, session: "Session"): +def scope_list( + scope: "InternalScope", + name: Optional[str] = None, + recursive: bool = False, + *, + session: "Session" +) -> "Iterator[dict[str, Any]]": """ List data identifiers in a scope. @@ -2050,7 +2147,7 @@ def __topdids(scope): else: yield {'scope': scope, 'name': row.name, 'type': row.did_type, 'parent': None, 'level': 0, 'bytes': None} - def __diddriller(pdid): + def __diddriller(pdid: "Mapping[str, Any]") -> "Iterator[dict[str, Any]]": stmt = select( models.DataIdentifierAssociation ).filter_by( @@ -2096,7 +2193,12 @@ def __diddriller(pdid): @read_session -def __get_did(scope, name, *, session: "Session"): +def __get_did( + scope: "InternalScope", + name: str, + *, + session: "Session" +) -> "models.DataIdentifier": try: stmt = select( models.DataIdentifier @@ -2141,7 +2243,7 @@ def get_did(scope: "InternalScope", name: str, dynamic_depth: "Optional[DIDType] @read_session -def get_files(files, *, session: "Session"): +def get_files(files: "Sequence[Mapping[str, Any]]", *, session: "Session") -> list[dict[str, Any]]: """ Retrieve a list of files. @@ -2196,8 +2298,17 @@ def get_files(files, *, session: "Session"): @transactional_session -def set_metadata(scope, name, key, value, did_type=None, did=None, - recursive=False, *, session: "Session"): +def set_metadata( + scope: "InternalScope", + name: str, + key: str, + value: Any, + did_type: Optional[DIDType] = None, + did: Optional["Mapping[str, Any]"] = None, + recursive: bool = False, + *, + session: "Session" +) -> None: """ Add single metadata to a data identifier. @@ -2213,7 +2324,14 @@ def set_metadata(scope, name, key, value, did_type=None, did=None, @transactional_session -def set_metadata_bulk(scope, name, meta, recursive=False, *, session: "Session"): +def set_metadata_bulk( + scope: "InternalScope", + name: str, + meta: "Mapping[str, Any]", + recursive: bool = False, + *, + session: "Session" +) -> None: """ Add metadata to a data identifier. @@ -2227,7 +2345,12 @@ def set_metadata_bulk(scope, name, meta, recursive=False, *, session: "Session") @transactional_session -def set_dids_metadata_bulk(dids, recursive=False, *, session: "Session"): +def set_dids_metadata_bulk( + dids: "Iterable[Mapping[str, Any]]", + recursive: bool = False, + *, + session: "Session" +) -> None: """ Add metadata to a list of data identifiers. @@ -2241,7 +2364,13 @@ def set_dids_metadata_bulk(dids, recursive=False, *, session: "Session"): @read_session -def get_metadata(scope, name, plugin='DID_COLUMN', *, session: "Session"): +def get_metadata( + scope: "InternalScope", + name: str, + plugin: str = 'DID_COLUMN', + *, + session: "Session" +) -> dict[str, Any]: """ Get data identifier metadata @@ -2255,7 +2384,11 @@ def get_metadata(scope, name, plugin='DID_COLUMN', *, session: "Session"): @stream_session -def list_parent_dids_bulk(dids, *, session: "Session"): +def list_parent_dids_bulk( + dids: "Iterable[Mapping[str, Any]]", + *, + session: "Session" +) -> "Iterator[dict[str, Any]]": """ List parent datasets and containers of a did. @@ -2287,7 +2420,12 @@ def list_parent_dids_bulk(dids, *, session: "Session"): @stream_session -def get_metadata_bulk(dids, inherit=False, *, session: "Session"): +def get_metadata_bulk( + dids: list["Mapping[Any, Any]"], + inherit: bool = False, + *, + session: "Session" +) -> "Iterator[dict[str, Any]]": """ Get metadata for a list of dids :param dids: A list of dids. @@ -2355,7 +2493,13 @@ def get_metadata_bulk(dids, inherit=False, *, session: "Session"): @transactional_session -def delete_metadata(scope, name, key, *, session: "Session"): +def delete_metadata( + scope: "InternalScope", + name: str, + key: str, + *, + session: "Session" +) -> None: """ Delete a key from the metadata column @@ -2367,7 +2511,13 @@ def delete_metadata(scope, name, key, *, session: "Session"): @transactional_session -def set_status(scope, name, *, session: "Session", **kwargs): +def set_status( + scope: "InternalScope", + name: str, + *, + session: "Session", + **kwargs +) -> None: """ Set data identifier status @@ -2476,8 +2626,19 @@ def set_status(scope, name, *, session: "Session", **kwargs): @read_session -def list_dids(scope, filters, did_type='collection', ignore_case=False, limit=None, - offset=None, long=False, recursive=False, ignore_dids=None, *, session: "Session"): +def list_dids( + scope: "InternalScope", + filters: "Mapping[Any, Any]", + did_type: Literal['all', 'collection', 'dataset', 'container', 'file'] = 'collection', + ignore_case: bool = False, + limit: Optional[int] = None, + offset: Optional[int] = None, + long: bool = False, + recursive: bool = False, + ignore_dids: Optional["Sequence[str]"] = None, + *, + session: "Session" +) -> "Iterator[dict[str, Any]]": """ Search data identifiers. @@ -2496,7 +2657,12 @@ def list_dids(scope, filters, did_type='collection', ignore_case=False, limit=No @read_session -def get_did_atime(scope, name, *, session: "Session"): +def get_did_atime( + scope: "InternalScope", + name: str, + *, + session: "Session" +) -> datetime: """ Get the accessed_at timestamp for a did. Just for testing. :param scope: the scope name. @@ -2515,7 +2681,12 @@ def get_did_atime(scope, name, *, session: "Session"): @read_session -def get_did_access_cnt(scope, name, *, session: "Session"): +def get_did_access_cnt( + scope: "InternalScope", + name: str, + *, + session: "Session" +) -> int: """ Get the access_cnt for a did. Just for testing. :param scope: the scope name. @@ -2534,7 +2705,11 @@ def get_did_access_cnt(scope, name, *, session: "Session"): @stream_session -def get_dataset_by_guid(guid, *, session: "Session"): +def get_dataset_by_guid( + guid: str, + *, + session: "Session" +) -> "Iterator[dict[str, Union[InternalScope, str]]]": """ Get the parent datasets for a given GUID. :param guid: The GUID. @@ -2569,7 +2744,11 @@ def get_dataset_by_guid(guid, *, session: "Session"): @transactional_session -def touch_dids(dids, *, session: "Session"): +def touch_dids( + dids: "Iterable[Mapping[str, Any]]", + *, + session: "Session" +) -> bool: """ Update the accessed_at timestamp and the access_cnt of the given dids. @@ -2604,7 +2783,16 @@ def touch_dids(dids, *, session: "Session"): @transactional_session -def create_did_sample(input_scope, input_name, output_scope, output_name, account, nbfiles, *, session: "Session"): +def create_did_sample( + input_scope: "InternalScope", + input_name: str, + output_scope: "InternalScope", + output_name: str, + account: "InternalAccount", + nbfiles: str, + *, + session: "Session" +) -> None: """ Create a sample from an input collection. @@ -2690,7 +2878,7 @@ def __resolve_bytes_length_events_did( @transactional_session -def resurrect(dids, *, session: "Session"): +def resurrect(dids: "Iterable[Mapping[str, Any]]", *, session: "Session") -> None: """ Resurrect data identifiers. @@ -2749,7 +2937,12 @@ def resurrect(dids, *, session: "Session"): @stream_session -def list_archive_content(scope, name, *, session: "Session"): +def list_archive_content( + scope: "InternalScope", + name, + *, + session: "Session" +) -> "Iterator[dict[str, Any]]": """ List archive contents. @@ -2775,7 +2968,13 @@ def list_archive_content(scope, name, *, session: "Session"): @transactional_session -def add_did_to_followed(scope, name, account, *, session: "Session"): +def add_did_to_followed( + scope: "InternalScope", + name: str, + account: "InternalAccount", + *, + session: "Session" +) -> None: """ Mark a did as followed by the given account @@ -2789,7 +2988,12 @@ def add_did_to_followed(scope, name, account, *, session: "Session"): @transactional_session -def add_dids_to_followed(dids, account, *, session: "Session"): +def add_dids_to_followed( + dids: "Iterable[Mapping[str, Any]]", + account: "InternalAccount", + *, + session: "Session" +) -> None: """ Bulk mark datasets as followed @@ -2819,7 +3023,12 @@ def add_dids_to_followed(dids, account, *, session: "Session"): @stream_session -def get_users_following_did(scope, name, *, session: "Session"): +def get_users_following_did( + scope: "InternalScope", + name: str, + *, + session: "Session" +) -> "Iterator[dict[str, InternalAccount]]": """ Return list of users following a did @@ -2843,7 +3052,13 @@ def get_users_following_did(scope, name, *, session: "Session"): @transactional_session -def remove_did_from_followed(scope, name, account, *, session: "Session"): +def remove_did_from_followed( + scope: "InternalScope", + name: str, + account: "InternalAccount", + *, + session: "Session" +) -> None: """ Mark a did as not followed @@ -2857,7 +3072,12 @@ def remove_did_from_followed(scope, name, account, *, session: "Session"): @transactional_session -def remove_dids_from_followed(dids, account, *, session: "Session"): +def remove_dids_from_followed( + dids: "Iterable[Mapping[str, Any]]", + account: "InternalAccount", + *, + session: "Session" +) -> None: """ Bulk mark datasets as not followed @@ -2882,7 +3102,14 @@ def remove_dids_from_followed(dids, account, *, session: "Session"): @transactional_session -def trigger_event(scope, name, event_type, payload, *, session: "Session"): +def trigger_event( + scope: "InternalScope", + name: str, + event_type: str, + payload: str, + *, + session: "Session" +) -> None: """ Records changes occuring in the did to the FollowEvents table @@ -2911,7 +3138,12 @@ def trigger_event(scope, name, event_type, payload, *, session: "Session"): @read_session -def create_reports(total_workers, worker_number, *, session: "Session"): +def create_reports( + total_workers: int, + worker_number: int, + *, + session: "Session" +) -> None: """ Create a summary report of the events affecting a dataset, for its followers. @@ -2973,7 +3205,12 @@ def create_reports(total_workers, worker_number, *, session: "Session"): @transactional_session -def insert_content_history(filter_, did_created_at, *, session: "Session"): +def insert_content_history( + filter_: "ColumnExpressionArgument[bool]", + did_created_at: datetime, + *, + session: "Session" +) -> None: """ Insert into content history a list of did @@ -3024,7 +3261,7 @@ def insert_content_history(filter_, did_created_at, *, session: "Session"): @transactional_session -def insert_deleted_dids(filter_, *, session: "Session"): +def insert_deleted_dids(filter_: "ColumnExpressionArgument[bool]", *, session: "Session") -> None: """ Insert into deleted_dids a list of did