Skip to content

Commit

Permalink
Client : Introduce bulk_list_files method : Closes #5465
Browse files Browse the repository at this point in the history
  • Loading branch information
cserf committed Sep 15, 2023
1 parent fde0ec5 commit f546db2
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 6 deletions.
22 changes: 21 additions & 1 deletion lib/rucio/api/did.py
Expand Up @@ -28,9 +28,11 @@
from rucio.db.sqla.session import read_session, stream_session, transactional_session

if TYPE_CHECKING:
from typing import Any, Optional
from typing import Any, Dict, Optional, List
from sqlalchemy.orm import Session

DIDListType = List[Dict]


@stream_session
def list_dids(scope, filters, did_type='collection', ignore_case=False, limit=None, offset=None, long=False, recursive=False, vo='def', *, session: "Session"):
Expand Down Expand Up @@ -327,6 +329,24 @@ def list_content_history(scope, name, vo='def', *, session: "Session"):
yield api_update_return_dict(d, session=session)


@stream_session
def bulk_list_files(dids: "DIDListType", long: bool = False, vo: str = 'def', *, session: "Session"):
"""
List file contents of a list of data identifier.
:param dids: A list of dids.
:param long: A boolean to choose if more metadata are returned or not.
:param vo: The VO to act on.
:param session: The database session in use.
"""

for did_ in dids:
did_['scope'] = InternalScope(did_['scope'], vo=vo)

for file_ in did.bulk_list_files(dids=dids, long=long, session=session):
yield api_update_return_dict(file_, session=session)


@stream_session
def list_files(scope, name, long, vo='def', *, session: "Session"):
"""
Expand Down
18 changes: 18 additions & 0 deletions lib/rucio/client/didclient.py
Expand Up @@ -364,6 +364,24 @@ def list_files(self, scope, name, long=None):
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)

def bulk_list_files(self, dids):
"""
List data identifier file contents.
:param dids: The list of dids.
"""

data = {'dids': dids}
path = '/'.join([self.DIDS_BASEURL, 'bulkfiles'])
url = build_url(choice(self.list_hosts), path=path)

r = self._send_request(url, type_='POST', data=dumps(data), stream=True)
if r.status_code == codes.ok:
return self._load_json_data(r)
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)

def get_did(self, scope, name, dynamic=False, dynamic_depth=None):
"""
Retrieve a single data identifier.
Expand Down
24 changes: 22 additions & 2 deletions lib/rucio/core/did.py
Expand Up @@ -43,12 +43,13 @@

if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from typing import Any, Optional, Union
from typing import Any, Dict, Optional, Union, List
from sqlalchemy.orm import Session
from sqlalchemy.schema import Table
from rucio.common.types import InternalAccount, InternalScope

LoggerFunction = Callable[..., Any]
DIDListType = List[Dict]

METRICS = MetricManager(module=__name__)

Expand Down Expand Up @@ -1886,7 +1887,26 @@ def list_child_datasets(


@stream_session
def list_files(scope, name, long=False, *, session: "Session"):
def bulk_list_files(dids: "DIDListType", long: bool = False, *, session: "Session"):
"""
List file contents of a list of data identifier.
:param dids: A list of dids.
:param long: A boolean to choose if more metadata are returned or not.
:param session: The database session in use.
"""
for did in dids:
try:
for file_ in list_files(scope=did['scope'], name=did['name'], long=long, session=session):
file_['parent_scope'] = did['scope']
file_['parent_name'] = did['name']
yield file_
except exception.DataIdentifierNotFound:
pass


@stream_session
def list_files(scope: "InternalScope", name: str, long: bool = False, *, session: "Session"):
"""
List data identifier file contents.
Expand Down
86 changes: 84 additions & 2 deletions lib/rucio/web/rest/flaskapi/v1/dids.py
Expand Up @@ -22,12 +22,12 @@
list_files, scope_list, get_did, set_metadata, get_metadata, get_metadata_bulk, set_status, attach_dids, \
detach_dids, attach_dids_to_dids, get_dataset_by_guid, list_parent_dids, create_did_sample, list_new_dids, \
resurrect, get_users_following_did, remove_did_from_followed, add_did_to_followed, delete_metadata, \
set_metadata_bulk, set_dids_metadata_bulk
set_metadata_bulk, set_dids_metadata_bulk, bulk_list_files
from rucio.api.rule import list_replication_rules, list_associated_replication_rules_for_file
from rucio.common.exception import ScopeNotFound, DataIdentifierNotFound, DataIdentifierAlreadyExists, \
DuplicateContent, AccessDenied, KeyNotFound, Duplicate, InvalidValueForKey, UnsupportedStatus, \
UnsupportedOperation, RSENotFound, RuleNotFound, InvalidMetadata, InvalidPath, FileAlreadyExists, InvalidObject, FileConsistencyMismatch
from rucio.common.utils import render_json, APIEncoder
from rucio.common.utils import render_json, APIEncoder, parse_response
from rucio.db.sqla.constants import DIDType
from rucio.web.rest.flaskapi.authenticated_bp import AuthenticatedBlueprint
from rucio.web.rest.flaskapi.v1.common import response_headers, check_accept_header_wrapper_flask, \
Expand Down Expand Up @@ -1102,6 +1102,86 @@ def generate(vo):
return generate_http_error_flask(404, error)


class BulkFiles(ErrorHandlingMethodView):

@check_accept_header_wrapper_flask(['application/x-json-stream'])
def post(self):
"""
---
summary: List files bulk
description: List files in multiple dids
tags:
- Data Identifiers
requestBody:
content:
application/json:
schema:
type: array
items:
description: One did to list files.
type: object
required:
- scope
- name
properties:
scope:
description: The did scope.
type: string
name:
description: The did name.
type: string
responses:
201:
description: OK
content:
application/x-json-stream:
schema:
description: All collections file content.
type: array
items:
description: Collections file content.
type: object
properties:
parent_scope:
description: The scope of the parent did.
type: string
parent_name:
description: The name of the parent did.
type: string
scope:
description: The scope of the did.
type: string
name:
description: The name of the did.
type: string
bytes:
description: The size of the did in bytes.
type: integer
guid:
description: The guid of the did.
type: string
events:
description: The number of events of the did.
type: integer
adler32:
description: The adler32 checksum.
type: string
401:
description: Invalid Auth Token
"""
parameters = json_parameters(parse_response)
dids = param_get(parameters, 'dids', default=[])
try:
def generate(vo):
for did in bulk_list_files(dids=dids, vo=vo):
yield render_json(**did) + '\n'

return try_stream(generate(vo=request.environ.get('vo')))
except AccessDenied as error:
return generate_http_error_flask(401, error)
return 'Created', 201


class Parents(ErrorHandlingMethodView):

@check_accept_header_wrapper_flask(['application/x-json-stream'])
Expand Down Expand Up @@ -2183,6 +2263,8 @@ def blueprint():
bp.add_url_rule('/resurrect', view_func=resurrect_view, methods=['post', ])
bulkmeta_view = BulkMeta.as_view('bulkmeta')
bp.add_url_rule('/bulkmeta', view_func=bulkmeta_view, methods=['post', ])
files_view = BulkFiles.as_view('bulkfiles')
bp.add_url_rule('/bulkfiles', view_func=files_view, methods=['post', ])

bp.after_request(response_headers)
return bp
Expand Down
61 changes: 60 additions & 1 deletion tests/test_did.py
Expand Up @@ -29,7 +29,7 @@
from rucio.core.did import (list_dids, add_did, delete_dids, get_did_atime, touch_dids, attach_dids, detach_dids,
get_metadata, set_metadata, get_did, get_did_access_cnt, add_did_to_followed,
get_users_following_did, remove_did_from_followed, set_status, list_new_dids,
set_new_dids)
set_new_dids, bulk_list_files)
from rucio.core.replica import add_replica, get_replica
from rucio.db.sqla.constants import DIDType
from rucio.tests.common import rse_name_generator, scope_name_generator, did_name_generator
Expand Down Expand Up @@ -253,6 +253,35 @@ def test_reevaluate_after_close(self, mock_scope, root_account, file_config_mock
new_dids = [did for did in list_new_dids(did_type=None, thread=None, total_threads=None, chunk_size=100000, session=None)]
assert {'scope': mock_scope, 'name': dsn, 'did_type': DIDType.DATASET} in new_dids

def test_bulk_list_files(self, mock_scope, root_account, rse_factory, did_factory):
""" Test the bulk_list_files method"""
_, rse_id = rse_factory.make_mock_rse()
nb_datasets = 5
nb_files = 10
# Try listing existing datasets
datasets = [did_factory.make_dataset() for _ in range(nb_datasets)]
files = []
for dataset in datasets:
new_files = []
for _ in range(nb_files):
new_files.append({'scope': mock_scope, 'name': did_name_generator('file'), 'bytes': 1, 'adler32': '0cc737eb', 'events': None, 'guid': None})
attach_dids(scope=dataset['scope'], name=dataset['name'], rse_id=rse_id, dids=new_files, account=root_account)
for file_ in new_files:
file_['parent_scope'] = dataset['scope']
file_['parent_name'] = dataset['name']
files.extend(new_files)
result = [file_ for file_ in bulk_list_files(datasets)]
assert len(result) == len(files)
for file_ in files:
assert file_ in result
# Try listing non-existing datasets
non_existing_datasets = [{'scope': mock_scope, 'name': did_name_generator('dataset')} for _ in range(nb_datasets)]
datasets.extend(non_existing_datasets)
parent_datasets = [(dataset['parent_scope'], dataset['parent_name']) for dataset in result]
assert len(result) == len(files)
for dataset in non_existing_datasets:
assert (dataset['scope'], dataset['name']) not in parent_datasets


class TestDIDApi:

Expand Down Expand Up @@ -1101,6 +1130,36 @@ def test_bulk_get_meta(self, did_client, replica_client, rse_factory):
list_meta = [_ for _ in did_client.get_metadata_bulk(list_dids)]
assert len(list_meta) == 0

def test_bulk_list_files(self, did_client, mock_scope, root_account, rse_factory, did_factory):
""" Test the bulk_list_files method"""
_, rse_id = rse_factory.make_mock_rse()
nb_datasets = 5
# Try listing existing datasets
datasets = [did_factory.make_dataset() for _ in range(nb_datasets)]
external_datasets = [{'scope': dataset['scope'].external, 'name': dataset['name']} for dataset in datasets]
files = []
for dataset in datasets:
new_files = []
for _ in range(10):
new_files.append({'scope': mock_scope, 'name': did_name_generator('file'), 'bytes': 1, 'adler32': '0cc737eb', 'events': None, 'guid': None})
attach_dids(scope=dataset['scope'], name=dataset['name'], rse_id=rse_id, dids=new_files, account=root_account)
for file_ in new_files:
file_['scope'] = dataset['scope'].external
file_['parent_scope'] = dataset['scope'].external
file_['parent_name'] = dataset['name']
files.extend(new_files)
result = [files for files in did_client.bulk_list_files(external_datasets)]
assert len(result) == len(files)
for file_ in files:
assert file_ in result
# Try listing non-existing datasets
non_existing_datasets = [{'scope': mock_scope.external, 'name': did_name_generator('dataset')} for _ in range(nb_datasets)]
datasets.extend(non_existing_datasets)
parent_datasets = [(dataset['parent_scope'], dataset['parent_name']) for dataset in result]
assert len(result) == len(files)
for dataset in non_existing_datasets:
assert (dataset['scope'], dataset['name']) not in parent_datasets


@pytest.mark.noparallel(reason='uses mock scope')
def test_bulk_get_meta_inheritance(vo, rse_factory, mock_scope, did_factory, rucio_client):
Expand Down

0 comments on commit f546db2

Please sign in to comment.