From c9a4fcb5c32a851143e65487705bb08302d94496 Mon Sep 17 00:00:00 2001 From: Cedric Serfon Date: Fri, 19 Aug 2022 10:49:39 +0200 Subject: [PATCH] Client : Introduce bulk_list_files method : Closes #5465 --- lib/rucio/api/did.py | 22 ++++++- lib/rucio/client/didclient.py | 18 ++++++ lib/rucio/core/did.py | 24 ++++++- lib/rucio/web/rest/flaskapi/v1/dids.py | 86 +++++++++++++++++++++++++- tests/test_did.py | 61 +++++++++++++++++- 5 files changed, 205 insertions(+), 6 deletions(-) diff --git a/lib/rucio/api/did.py b/lib/rucio/api/did.py index 39b5fb9d67..e08f9f25de 100644 --- a/lib/rucio/api/did.py +++ b/lib/rucio/api/did.py @@ -28,9 +28,11 @@ from rucio.db.sqla.session import read_session, stream_session, transactional_session if TYPE_CHECKING: - from typing import Any, Optional + from typing import Any, Dict, Optional, List from sqlalchemy.orm import Session + DIDListType = List[Dict] + @stream_session def list_dids(scope, filters, did_type='collection', ignore_case=False, limit=None, offset=None, long=False, recursive=False, vo='def', *, session: "Session"): @@ -327,6 +329,24 @@ def list_content_history(scope, name, vo='def', *, session: "Session"): yield api_update_return_dict(d, session=session) +@stream_session +def bulk_list_files(dids: "DIDListType", long: bool = False, vo: str = 'def', *, session: "Session"): + """ + List file contents of a list of data identifier. + + :param dids: A list of dids. + :param long: A boolean to choose if more metadata are returned or not. + :param vo: The VO to act on. + :param session: The database session in use. + """ + + for did_ in dids: + did_['scope'] = InternalScope(did_['scope'], vo=vo) + + for file_ in did.bulk_list_files(dids=dids, long=long, session=session): + yield api_update_return_dict(file_, session=session) + + @stream_session def list_files(scope, name, long, vo='def', *, session: "Session"): """ diff --git a/lib/rucio/client/didclient.py b/lib/rucio/client/didclient.py index df89742713..c8bc18fb09 100644 --- a/lib/rucio/client/didclient.py +++ b/lib/rucio/client/didclient.py @@ -364,6 +364,24 @@ def list_files(self, scope, name, long=None): exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content) raise exc_cls(exc_msg) + def bulk_list_files(self, dids): + """ + List data identifier file contents. + + :param dids: The list of dids. + """ + + data = {'dids': dids} + path = '/'.join([self.DIDS_BASEURL, 'bulkfiles']) + url = build_url(choice(self.list_hosts), path=path) + + r = self._send_request(url, type_='POST', data=dumps(data), stream=True) + if r.status_code == codes.ok: + return self._load_json_data(r) + else: + exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content) + raise exc_cls(exc_msg) + def get_did(self, scope, name, dynamic=False, dynamic_depth=None): """ Retrieve a single data identifier. diff --git a/lib/rucio/core/did.py b/lib/rucio/core/did.py index fbcda43d84..4815842eed 100644 --- a/lib/rucio/core/did.py +++ b/lib/rucio/core/did.py @@ -43,12 +43,13 @@ if TYPE_CHECKING: from collections.abc import Callable, Sequence - from typing import Any, Optional, Union + from typing import Any, Dict, Optional, Union, List from sqlalchemy.orm import Session from sqlalchemy.schema import Table from rucio.common.types import InternalAccount, InternalScope LoggerFunction = Callable[..., Any] + DIDListType = List[Dict] METRICS = MetricManager(module=__name__) @@ -1888,7 +1889,26 @@ def list_child_datasets( @stream_session -def list_files(scope, name, long=False, *, session: "Session"): +def bulk_list_files(dids: "DIDListType", long: bool = False, *, session: "Session"): + """ + List file contents of a list of data identifier. + + :param dids: A list of dids. + :param long: A boolean to choose if more metadata are returned or not. + :param session: The database session in use. + """ + for did in dids: + try: + for file_ in list_files(scope=did['scope'], name=did['name'], long=long, session=session): + file_['parent_scope'] = did['scope'] + file_['parent_name'] = did['name'] + yield file_ + except exception.DataIdentifierNotFound: + pass + + +@stream_session +def list_files(scope: "InternalScope", name: str, long: bool = False, *, session: "Session"): """ List data identifier file contents. diff --git a/lib/rucio/web/rest/flaskapi/v1/dids.py b/lib/rucio/web/rest/flaskapi/v1/dids.py index 4c1594752a..f7cbd17c08 100644 --- a/lib/rucio/web/rest/flaskapi/v1/dids.py +++ b/lib/rucio/web/rest/flaskapi/v1/dids.py @@ -22,12 +22,12 @@ list_files, scope_list, get_did, set_metadata, get_metadata, get_metadata_bulk, set_status, attach_dids, \ detach_dids, attach_dids_to_dids, get_dataset_by_guid, list_parent_dids, create_did_sample, list_new_dids, \ resurrect, get_users_following_did, remove_did_from_followed, add_did_to_followed, delete_metadata, \ - set_metadata_bulk, set_dids_metadata_bulk + set_metadata_bulk, set_dids_metadata_bulk, bulk_list_files from rucio.api.rule import list_replication_rules, list_associated_replication_rules_for_file from rucio.common.exception import ScopeNotFound, DatabaseException, DataIdentifierNotFound, DataIdentifierAlreadyExists, \ DuplicateContent, AccessDenied, KeyNotFound, Duplicate, InvalidValueForKey, UnsupportedStatus, \ UnsupportedOperation, RSENotFound, RuleNotFound, InvalidMetadata, InvalidPath, FileAlreadyExists, InvalidObject, FileConsistencyMismatch -from rucio.common.utils import render_json, APIEncoder +from rucio.common.utils import render_json, APIEncoder, parse_response from rucio.db.sqla.constants import DIDType from rucio.web.rest.flaskapi.authenticated_bp import AuthenticatedBlueprint from rucio.web.rest.flaskapi.v1.common import response_headers, check_accept_header_wrapper_flask, \ @@ -1111,6 +1111,86 @@ def generate(vo): return generate_http_error_flask(404, error) +class BulkFiles(ErrorHandlingMethodView): + + @check_accept_header_wrapper_flask(['application/x-json-stream']) + def post(self): + """ + --- + summary: List files bulk + description: List files in multiple dids + tags: + - Data Identifiers + requestBody: + content: + application/json: + schema: + type: array + items: + description: One did to list files. + type: object + required: + - scope + - name + properties: + scope: + description: The did scope. + type: string + name: + description: The did name. + type: string + responses: + 201: + description: OK + content: + application/x-json-stream: + schema: + description: All collections file content. + type: array + items: + description: Collections file content. + type: object + properties: + parent_scope: + description: The scope of the parent did. + type: string + parent_name: + description: The name of the parent did. + type: string + scope: + description: The scope of the did. + type: string + name: + description: The name of the did. + type: string + bytes: + description: The size of the did in bytes. + type: integer + guid: + description: The guid of the did. + type: string + events: + description: The number of events of the did. + type: integer + adler32: + description: The adler32 checksum. + type: string + 401: + description: Invalid Auth Token + """ + parameters = json_parameters(parse_response) + dids = param_get(parameters, 'dids', default=[]) + try: + def generate(vo): + for did in bulk_list_files(dids=dids, vo=vo): + yield render_json(**did) + '\n' + + return try_stream(generate(vo=request.environ.get('vo'))) + except AccessDenied as error: + return generate_http_error_flask(401, error) + return 'Created', 201 + + class Parents(ErrorHandlingMethodView): @check_accept_header_wrapper_flask(['application/x-json-stream']) @@ -2195,6 +2275,8 @@ def blueprint(): bp.add_url_rule('/resurrect', view_func=resurrect_view, methods=['post', ]) bulkmeta_view = BulkMeta.as_view('bulkmeta') bp.add_url_rule('/bulkmeta', view_func=bulkmeta_view, methods=['post', ]) + files_view = BulkFiles.as_view('bulkfiles') + bp.add_url_rule('/bulkfiles', view_func=files_view, methods=['post', ]) bp.after_request(response_headers) return bp diff --git a/tests/test_did.py b/tests/test_did.py index 0b5f9af645..ae4ea46447 100644 --- a/tests/test_did.py +++ b/tests/test_did.py @@ -29,7 +29,7 @@ from rucio.core.did import (list_dids, add_did, delete_dids, get_did_atime, touch_dids, attach_dids, detach_dids, get_metadata, set_metadata, get_did, get_did_access_cnt, add_did_to_followed, get_users_following_did, remove_did_from_followed, set_status, list_new_dids, - set_new_dids) + set_new_dids, bulk_list_files) from rucio.core.replica import add_replica, get_replica from rucio.db.sqla.constants import DIDType from rucio.tests.common import rse_name_generator, scope_name_generator, did_name_generator @@ -253,6 +253,35 @@ def test_reevaluate_after_close(self, mock_scope, root_account, file_config_mock new_dids = [did for did in list_new_dids(did_type=None, thread=None, total_threads=None, chunk_size=100000, session=None)] assert {'scope': mock_scope, 'name': dsn, 'did_type': DIDType.DATASET} in new_dids + def test_bulk_list_files(self, mock_scope, root_account, rse_factory, did_factory): + """ Test the bulk_list_files method""" + _, rse_id = rse_factory.make_mock_rse() + nb_datasets = 5 + nb_files = 10 + # Try listing existing datasets + datasets = [did_factory.make_dataset() for _ in range(nb_datasets)] + files = [] + for dataset in datasets: + new_files = [] + for _ in range(nb_files): + new_files.append({'scope': mock_scope, 'name': did_name_generator('file'), 'bytes': 1, 'adler32': '0cc737eb', 'events': None, 'guid': None}) + attach_dids(scope=dataset['scope'], name=dataset['name'], rse_id=rse_id, dids=new_files, account=root_account) + for file_ in new_files: + file_['parent_scope'] = dataset['scope'] + file_['parent_name'] = dataset['name'] + files.extend(new_files) + result = [file_ for file_ in bulk_list_files(datasets)] + assert len(result) == len(files) + for file_ in files: + assert file_ in result + # Try listing non-existing datasets + non_existing_datasets = [{'scope': mock_scope, 'name': did_name_generator('dataset')} for _ in range(nb_datasets)] + datasets.extend(non_existing_datasets) + parent_datasets = [(dataset['parent_scope'], dataset['parent_name']) for dataset in result] + assert len(result) == len(files) + for dataset in non_existing_datasets: + assert (dataset['scope'], dataset['name']) not in parent_datasets + class TestDIDApi: @@ -1104,6 +1133,36 @@ def test_bulk_get_meta(self, did_client, replica_client, rse_factory): list_meta = [_ for _ in did_client.get_metadata_bulk(list_dids)] assert len(list_meta) == 0 + def test_bulk_list_files(self, did_client, mock_scope, root_account, rse_factory, did_factory): + """ Test the bulk_list_files method""" + _, rse_id = rse_factory.make_mock_rse() + nb_datasets = 5 + # Try listing existing datasets + datasets = [did_factory.make_dataset() for _ in range(nb_datasets)] + external_datasets = [{'scope': dataset['scope'].external, 'name': dataset['name']} for dataset in datasets] + files = [] + for dataset in datasets: + new_files = [] + for _ in range(10): + new_files.append({'scope': mock_scope, 'name': did_name_generator('file'), 'bytes': 1, 'adler32': '0cc737eb', 'events': None, 'guid': None}) + attach_dids(scope=dataset['scope'], name=dataset['name'], rse_id=rse_id, dids=new_files, account=root_account) + for file_ in new_files: + file_['scope'] = dataset['scope'].external + file_['parent_scope'] = dataset['scope'].external + file_['parent_name'] = dataset['name'] + files.extend(new_files) + result = [files for files in did_client.bulk_list_files(external_datasets)] + assert len(result) == len(files) + for file_ in files: + assert file_ in result + # Try listing non-existing datasets + non_existing_datasets = [{'scope': mock_scope.external, 'name': did_name_generator('dataset')} for _ in range(nb_datasets)] + datasets.extend(non_existing_datasets) + parent_datasets = [(dataset['parent_scope'], dataset['parent_name']) for dataset in result] + assert len(result) == len(files) + for dataset in non_existing_datasets: + assert (dataset['scope'], dataset['name']) not in parent_datasets + @pytest.mark.noparallel(reason='uses mock scope') def test_bulk_get_meta_inheritance(vo, rse_factory, mock_scope, did_factory, rucio_client):