Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk list_files : Closes #5465 #5830

Merged
merged 2 commits into from Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 19 additions & 1 deletion lib/rucio/api/did.py
Expand Up @@ -28,7 +28,7 @@
from rucio.db.sqla.session import read_session, stream_session, transactional_session

if TYPE_CHECKING:
from typing import Any, Optional
from typing import Any, Optional, Iterator
from sqlalchemy.orm import Session


Expand Down Expand Up @@ -327,6 +327,24 @@ def list_content_history(scope, name, vo='def', *, session: "Session"):
yield api_update_return_dict(d, session=session)


@stream_session
def bulk_list_files(dids: "list[dict[str, Any]]", long: bool = False, vo: str = 'def', *, session: "Session") -> "Iterator[dict[str, Any]]":
"""
List file contents of a list of data identifiers.

:param dids: A list of DIDs.
:param long: A boolean to choose if more metadata are returned or not.
:param vo: The VO to act on.
:param session: The database session in use.
"""

for did_ in dids:
bari12 marked this conversation as resolved.
Show resolved Hide resolved
did_['scope'] = InternalScope(did_['scope'], vo=vo)

for file_ in did.bulk_list_files(dids=dids, long=long, session=session):
yield api_update_return_dict(file_, session=session)


@stream_session
def list_files(scope, name, long, vo='def', *, session: "Session"):
"""
Expand Down
21 changes: 20 additions & 1 deletion lib/rucio/client/didclient.py
Expand Up @@ -15,6 +15,7 @@

from datetime import datetime
from json import dumps
from typing import Optional, Any
from urllib.parse import quote_plus

from requests.status_codes import codes
Expand Down Expand Up @@ -342,7 +343,7 @@ def list_content_history(self, scope, name):
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)

def list_files(self, scope, name, long=None):
def list_files(self, scope: str, name: str, long: Optional[bool] = None):
"""
List data identifier file contents.

Expand All @@ -364,6 +365,24 @@ def list_files(self, scope, name, long=None):
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)

def bulk_list_files(self, dids: list[dict[str, Any]]):
"""
List data identifier file contents.

:param dids: The list of DIDs.
"""

data = {'dids': dids}
path = '/'.join([self.DIDS_BASEURL, 'bulkfiles'])
url = build_url(choice(self.list_hosts), path=path)

r = self._send_request(url, type_='POST', data=dumps(data), stream=True)
if r.status_code == codes.ok:
return self._load_json_data(r)
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)

def get_did(self, scope, name, dynamic=False, dynamic_depth=None):
"""
Retrieve a single data identifier.
Expand Down
26 changes: 22 additions & 4 deletions lib/rucio/core/did.py
Expand Up @@ -42,13 +42,12 @@
from rucio.db.sqla.util import temp_table_mngr

if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from collections.abc import Iterator, Sequence
from typing import Any, Optional, Union
from sqlalchemy.orm import Session
from sqlalchemy.schema import Table
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.types import InternalAccount, InternalScope, LoggerFunction

LoggerFunction = Callable[..., Any]

METRICS = MetricManager(module=__name__)

Expand Down Expand Up @@ -1888,7 +1887,26 @@ def list_child_datasets(


@stream_session
def list_files(scope, name, long=False, *, session: "Session"):
def bulk_list_files(dids: "list[dict[str, Any]]", long: bool = False, *, session: "Session") -> "Optional[Iterator[dict[str, Any]]]":
"""
List file contents of a list of data identifier.
rdimaio marked this conversation as resolved.
Show resolved Hide resolved

:param dids: A list of DIDs.
:param long: A boolean to choose if more metadata are returned or not.
:param session: The database session in use.
"""
for did in dids:
try:
for file_dict in list_files(scope=did['scope'], name=did['name'], long=long, session=session):
file_dict['parent_scope'] = did['scope']
file_dict['parent_name'] = did['name']
yield file_dict
except exception.DataIdentifierNotFound:
pass


@stream_session
def list_files(scope: "InternalScope", name: str, long: bool = False, *, session: "Session") -> "Iterator[dict[str, Any]]":
"""
List data identifier file contents.

Expand Down
86 changes: 84 additions & 2 deletions lib/rucio/web/rest/flaskapi/v1/dids.py
Expand Up @@ -22,12 +22,12 @@
list_files, scope_list, get_did, set_metadata, get_metadata, get_metadata_bulk, set_status, attach_dids, \
detach_dids, attach_dids_to_dids, get_dataset_by_guid, list_parent_dids, create_did_sample, list_new_dids, \
resurrect, get_users_following_did, remove_did_from_followed, add_did_to_followed, delete_metadata, \
set_metadata_bulk, set_dids_metadata_bulk
set_metadata_bulk, set_dids_metadata_bulk, bulk_list_files
from rucio.api.rule import list_replication_rules, list_associated_replication_rules_for_file
from rucio.common.exception import ScopeNotFound, DatabaseException, DataIdentifierNotFound, DataIdentifierAlreadyExists, \
DuplicateContent, AccessDenied, KeyNotFound, Duplicate, InvalidValueForKey, UnsupportedStatus, \
UnsupportedOperation, RSENotFound, RuleNotFound, InvalidMetadata, InvalidPath, FileAlreadyExists, InvalidObject, FileConsistencyMismatch
from rucio.common.utils import render_json, APIEncoder
from rucio.common.utils import render_json, APIEncoder, parse_response
from rucio.db.sqla.constants import DIDType
from rucio.web.rest.flaskapi.authenticated_bp import AuthenticatedBlueprint
from rucio.web.rest.flaskapi.v1.common import response_headers, check_accept_header_wrapper_flask, \
Expand Down Expand Up @@ -1111,6 +1111,86 @@ def generate(vo):
return generate_http_error_flask(404, error)


class BulkFiles(ErrorHandlingMethodView):

@check_accept_header_wrapper_flask(['application/x-json-stream'])
def post(self):
"""
---
summary: List files bulk
description: List files in multiple dids
tags:
- Data Identifiers
requestBody:
content:
application/json:
schema:
type: array
items:
description: One did to list files.
type: object
required:
- scope
- name
properties:
scope:
description: The did scope.
type: string
name:
description: The did name.
type: string
responses:
201:
description: OK
content:
application/x-json-stream:
schema:
description: All collections file content.
type: array
items:
description: Collections file content.
type: object
properties:
parent_scope:
description: The scope of the parent did.
type: string
parent_name:
description: The name of the parent did.
type: string
scope:
description: The scope of the did.
type: string
name:
description: The name of the did.
type: string
bytes:
description: The size of the did in bytes.
type: integer
guid:
description: The guid of the did.
type: string
events:
description: The number of events of the did.
type: integer
adler32:
description: The adler32 checksum.
type: string
401:
description: Invalid Auth Token
"""
parameters = json_parameters(parse_response)
dids = param_get(parameters, 'dids', default=[])
try:
def generate(vo):
for did in bulk_list_files(dids=dids, vo=vo):
yield render_json(**did) + '\n'

return try_stream(generate(vo=request.environ.get('vo')))
except AccessDenied as error:
return generate_http_error_flask(401, error)
return 'Created', 201


class Parents(ErrorHandlingMethodView):

@check_accept_header_wrapper_flask(['application/x-json-stream'])
Expand Down Expand Up @@ -2195,6 +2275,8 @@ def blueprint():
bp.add_url_rule('/resurrect', view_func=resurrect_view, methods=['post', ])
bulkmeta_view = BulkMeta.as_view('bulkmeta')
bp.add_url_rule('/bulkmeta', view_func=bulkmeta_view, methods=['post', ])
files_view = BulkFiles.as_view('bulkfiles')
bp.add_url_rule('/bulkfiles', view_func=files_view, methods=['post', ])

bp.after_request(response_headers)
return bp
Expand Down
61 changes: 60 additions & 1 deletion tests/test_did.py
Expand Up @@ -29,7 +29,7 @@
from rucio.core.did import (list_dids, add_did, delete_dids, get_did_atime, touch_dids, attach_dids, detach_dids,
get_metadata, set_metadata, get_did, get_did_access_cnt, add_did_to_followed,
get_users_following_did, remove_did_from_followed, set_status, list_new_dids,
set_new_dids)
set_new_dids, bulk_list_files)
from rucio.core.replica import add_replica, get_replica
from rucio.db.sqla.constants import DIDType
from rucio.tests.common import rse_name_generator, scope_name_generator, did_name_generator
Expand Down Expand Up @@ -253,6 +253,35 @@ def test_reevaluate_after_close(self, mock_scope, root_account, file_config_mock
new_dids = [did for did in list_new_dids(did_type=None, thread=None, total_threads=None, chunk_size=100000, session=None)]
assert {'scope': mock_scope, 'name': dsn, 'did_type': DIDType.DATASET} in new_dids

def test_bulk_list_files(self, mock_scope, root_account, rse_factory, did_factory):
""" Test the bulk_list_files method"""
_, rse_id = rse_factory.make_mock_rse()
nb_datasets = 5
nb_files = 10
# Try listing existing datasets
datasets = [did_factory.make_dataset() for _ in range(nb_datasets)]
files = []
for dataset in datasets:
new_files = []
for _ in range(nb_files):
new_files.append({'scope': mock_scope, 'name': did_name_generator('file'), 'bytes': 1, 'adler32': '0cc737eb', 'events': None, 'guid': None})
attach_dids(scope=dataset['scope'], name=dataset['name'], rse_id=rse_id, dids=new_files, account=root_account)
for file_ in new_files:
file_['parent_scope'] = dataset['scope']
file_['parent_name'] = dataset['name']
files.extend(new_files)
result = [file_ for file_ in bulk_list_files(datasets)]
assert len(result) == len(files)
for file_ in files:
assert file_ in result
# Try listing non-existing datasets
non_existing_datasets = [{'scope': mock_scope, 'name': did_name_generator('dataset')} for _ in range(nb_datasets)]
datasets.extend(non_existing_datasets)
parent_datasets = [(dataset['parent_scope'], dataset['parent_name']) for dataset in result]
assert len(result) == len(files)
for dataset in non_existing_datasets:
assert (dataset['scope'], dataset['name']) not in parent_datasets


class TestDIDApi:

Expand Down Expand Up @@ -1104,6 +1133,36 @@ def test_bulk_get_meta(self, did_client, replica_client, rse_factory):
list_meta = [_ for _ in did_client.get_metadata_bulk(list_dids)]
assert len(list_meta) == 0

def test_bulk_list_files(self, did_client, mock_scope, root_account, rse_factory, did_factory):
""" Test the bulk_list_files method"""
_, rse_id = rse_factory.make_mock_rse()
nb_datasets = 5
# Try listing existing datasets
datasets = [did_factory.make_dataset() for _ in range(nb_datasets)]
external_datasets = [{'scope': dataset['scope'].external, 'name': dataset['name']} for dataset in datasets]
files = []
for dataset in datasets:
new_files = []
for _ in range(10):
new_files.append({'scope': mock_scope, 'name': did_name_generator('file'), 'bytes': 1, 'adler32': '0cc737eb', 'events': None, 'guid': None})
attach_dids(scope=dataset['scope'], name=dataset['name'], rse_id=rse_id, dids=new_files, account=root_account)
for file_ in new_files:
file_['scope'] = dataset['scope'].external
file_['parent_scope'] = dataset['scope'].external
file_['parent_name'] = dataset['name']
files.extend(new_files)
result = [files for files in did_client.bulk_list_files(external_datasets)]
assert len(result) == len(files)
for file_ in files:
assert file_ in result
# Try listing non-existing datasets
non_existing_datasets = [{'scope': mock_scope.external, 'name': did_name_generator('dataset')} for _ in range(nb_datasets)]
datasets.extend(non_existing_datasets)
parent_datasets = [(dataset['parent_scope'], dataset['parent_name']) for dataset in result]
assert len(result) == len(files)
for dataset in non_existing_datasets:
assert (dataset['scope'], dataset['name']) not in parent_datasets


@pytest.mark.noparallel(reason='uses mock scope')
def test_bulk_get_meta_inheritance(vo, rse_factory, mock_scope, did_factory, rucio_client):
Expand Down