Skip to content

Commit

Permalink
Transfers: metrics, allow to group response by RSE attribute; fix ruc…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdimaio committed Mar 20, 2024
1 parent 1fc0c97 commit ffb279b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 82 deletions.
5 changes: 3 additions & 2 deletions lib/rucio/api/request.py
Expand Up @@ -220,13 +220,14 @@ def list_requests_history(src_rses, dst_rses, states, issuer, vo='def', offset=N


@read_session
def get_request_metrics(src_rse: Optional[str], dst_rse: Optional[str], activity: Optional[str], issuer, vo='def', *, session: "Session"):
def get_request_metrics(src_rse: Optional[str], dst_rse: Optional[str], activity: Optional[str], group_by_rse_attribute: Optional[str], issuer, vo='def', *, session: "Session"):
"""
Get statistics of requests in a specific state grouped by source RSE, destination RSE, and activity.
:param src_rse: source RSE.
:param dst_rse: destination RSE.
:param activity: activity
:param group_by_rse_attribute: The parameter to group the RSEs by.
:param issuer: Issuing account as a string.
:param session: The database session in use.
"""
Expand All @@ -240,4 +241,4 @@ def get_request_metrics(src_rse: Optional[str], dst_rse: Optional[str], activity
if not permission.has_permission(issuer=issuer, vo=vo, action='get_request_metrics', kwargs=kwargs, session=session):
raise exception.AccessDenied(f'{issuer} cannot get request statistics')

return request.get_request_metrics(dest_rse_id=dst_rse_id, src_rse_id=src_rse_id, activity=activity, session=session)
return request.get_request_metrics(dest_rse_id=dst_rse_id, src_rse_id=src_rse_id, activity=activity, group_by_rse_attribute=group_by_rse_attribute, session=session)
10 changes: 8 additions & 2 deletions lib/rucio/core/request.py
Expand Up @@ -112,7 +112,6 @@ def __init__(
transfertool: str,
requested_at: Optional[datetime.datetime] = None,
):

self.request_id = id_
self.request_type = request_type
self.rule_id = rule_id
Expand Down Expand Up @@ -1849,6 +1848,7 @@ def get_request_metrics(
dest_rse_id: "Optional[str]" = None,
src_rse_id: "Optional[str]" = None,
activity: "Optional[str]" = None,
group_by_rse_attribute: "Optional[str]" = None,
*,
session: "Session"
):
Expand Down Expand Up @@ -1927,7 +1927,13 @@ def get_request_metrics(
metric['src_rse'] = src_rse.name
metric['dst_rse'] = dst_rse.name

response[f'{src_rse.name}:{dst_rse.name}'] = metric
if group_by_rse_attribute:
src_rse_group = src_rse.attributes.get(group_by_rse_attribute, 'UNKNOWN')
dst_rse_group = dst_rse.attributes.get(group_by_rse_attribute, 'UNKNOWN')
if src_rse_group is not None and dst_rse_group is not None:
response[f'{src_rse_group}:{dst_rse_group}'] = metric
else:
response[f'{src_rse.name}:{dst_rse.name}'] = metric

return response

Expand Down
7 changes: 7 additions & 0 deletions lib/rucio/web/rest/flaskapi/v1/requests.py
Expand Up @@ -849,6 +849,11 @@ def get(self):
description: The activity
schema:
type: string
- name: group_by_rse_attribute
in: query
description: The parameter to group the RSEs by.
schema:
type: string
responses:
200:
description: OK
Expand Down Expand Up @@ -949,12 +954,14 @@ def get(self):
dst_rse = flask.request.args.get('dst_rse', default=None)
src_rse = flask.request.args.get('src_rse', default=None)
activity = flask.request.args.get('activity', default=None)
group_by_rse_attribute = flask.request.args.get('group_by_rse_attribute', default=None)
format = flask.request.args.get('format', default=None)

metrics = request.get_request_metrics(
dst_rse=dst_rse,
src_rse=src_rse,
activity=activity,
group_by_rse_attribute=group_by_rse_attribute,
issuer=flask.request.environ.get('issuer'),
vo=flask.request.environ.get('vo')
)
Expand Down
165 changes: 87 additions & 78 deletions tests/test_request.py
Expand Up @@ -289,85 +289,94 @@ def check_error_api(params, exception_class, exception_message, code):
params = {'request_states': 'S', 'src_site': source_site, 'dst_site': 'unknown'}
check_error_api(params, 'NotFound', 'Could not resolve site name unknown to RSE', 404)


@pytest.mark.usefixtures("rest_client")
@pytest.mark.parametrize("file_config_mock", [{"overrides": [
('transfers', 'stats_enabled', 'True'),
]}], indirect=True)
def test_api_metrics(vo, rest_client, auth_token, rse_factory, did_factory, root_account, file_config_mock):

src_rse, src_rse_id = rse_factory.make_mock_rse()
dst_rse, dst_rse_id = rse_factory.make_mock_rse()
add_distance(src_rse_id, dst_rse_id, distance=10)

replica_bytes = 20

did1 = did_factory.random_file_did()
activity1 = 'User Subscription'
add_replica(rse_id=src_rse_id, bytes_=replica_bytes, adler32='beefdead', account=root_account, **did1)

did2 = did_factory.random_file_did()
activity2 = 'Test'
add_replica(rse_id=src_rse_id, bytes_=replica_bytes, adler32='beefdead', account=root_account, **did2)

requests = [
{
'dest_rse_id': dst_rse_id,
'source_rse_id': src_rse_id,
'request_type': RequestType.TRANSFER,
'request_id': generate_uuid(),
'name': did1['name'],
'scope': did1['scope'],
'rule_id': generate_uuid(),
'retry_count': 1,
'attributes': {
'activity': activity,
'bytes': replica_bytes,
'md5': '',
'adler32': ''
class TestApiMetrics:

@pytest.fixture(autouse=True, scope='class')
def setup_test_api_metrics(self, rse_factory, did_factory, root_account, auth_token, vo):
self.src_rse, self.src_rse_id = rse_factory.make_mock_rse()
self.dst_rse, self.dst_rse_id = rse_factory.make_mock_rse()
add_distance(self.src_rse_id, self.dst_rse_id, distance=10)
self.replica_bytes = 20

did1 = did_factory.random_file_did()
self.activity1 = 'User Subscription'
add_replica(rse_id=self.src_rse_id, bytes_=self.replica_bytes, adler32='beefdead', account=root_account, **did1)

did2 = did_factory.random_file_did()
self.activity2 = 'Test'
add_replica(rse_id=self.src_rse_id, bytes_=self.replica_bytes, adler32='beefdead', account=root_account, **did2)

requests = [
{
'dest_rse_id': self.dst_rse_id,
'source_rse_id': self.src_rse_id,
'request_type': RequestType.TRANSFER,
'request_id': generate_uuid(),
'name': did1['name'],
'scope': did1['scope'],
'rule_id': generate_uuid(),
'retry_count': 1,
'attributes': {
'activity': activity,
'bytes': self.replica_bytes,
'md5': '',
'adler32': ''
}
}
}
for did, activity in ((did1, activity1), (did2, activity2))
]
queue_requests(requests)

stats_manager = TransferStatsManager()
stats_manager.observe(
src_rse_id=src_rse_id,
dst_rse_id=dst_rse_id,
activity=activity1,
state=RequestState.DONE,
file_size=367,
)
stats_manager.observe(
src_rse_id=src_rse_id,
dst_rse_id=dst_rse_id,
activity=activity2,
state=RequestState.FAILED,
file_size=1020,
)
stats_manager.force_save()
stats_manager.downsample_and_cleanup()

api_endpoint = '/requests/metrics'
params = {'dst_rse': dst_rse, 'src_rse': src_rse}
headers_dict = {'X-Rucio-Type': 'user', 'X-Rucio-Account': root_account.external}
response = rest_client.get(api_endpoint, query_string=params, headers=headers(auth(auth_token), vohdr(vo), hdrdict(headers_dict)))
metric = json.loads(response.get_data(as_text=True))
assert metric['distance'] == 10
assert metric['bytes']['queued'][activity1] == replica_bytes
assert metric['bytes']['queued'][activity2] == replica_bytes
assert metric['bytes']['queued-total'] == 2 * replica_bytes
assert metric['files']['queued'][activity1] == 1
assert metric['files']['queued'][activity2] == 1
assert metric['files']['queued-total'] == 2
assert metric['files']['done'][activity1]['1h'] == 1
assert metric['bytes']['done'][activity1]['1h'] == 367
assert metric['files']['failed'][activity2]['1h'] == 1
assert metric['src_rse'] == src_rse
assert metric['dst_rse'] == dst_rse

params = {'dst_rse': dst_rse, 'src_rse': src_rse, 'format': 'panda'}
response = rest_client.get(api_endpoint, query_string=params, headers=headers(auth(auth_token), vohdr(vo), hdrdict(headers_dict)))
response = json.loads(response.get_data(as_text=True))
metric = response.get(f'{src_rse}:{dst_rse}')
assert metric is not None
for did, activity in ((did1, self.activity1), (did2, self.activity2))
]
queue_requests(requests)

stats_manager = TransferStatsManager()
stats_manager.observe(
src_rse_id=self.src_rse_id,
dst_rse_id=self.dst_rse_id,
activity=self.activity1,
state=RequestState.DONE,
file_size=367,
)
stats_manager.observe(
src_rse_id=self.src_rse_id,
dst_rse_id=self.dst_rse_id,
activity=self.activity2,
state=RequestState.FAILED,
file_size=1020,
)
stats_manager.force_save()
stats_manager.downsample_and_cleanup()

self.api_endpoint = '/requests/metrics'
self.headers = headers(auth(auth_token), vohdr(vo), hdrdict({'X-Rucio-Type': 'user', 'X-Rucio-Account': root_account.external}))

def test_api_metrics(self):
params = {'dst_rse': self.dst_rse, 'src_rse': self.src_rse}

response = self.rest_client.get(self.api_endpoint, query_string=params, headers=self.headers)
metric = json.loads(response.get_data(as_text=True))
assert metric['distance'] == 10
assert metric['bytes']['queued'][self.activity1] == self.replica_bytes
assert metric['bytes']['queued'][self.activity2] == self.replica_bytes
assert metric['bytes']['queued-total'] == 2 * self.replica_bytes
assert metric['files']['queued'][self.activity1] == 1
assert metric['files']['queued'][self.activity2] == 1
assert metric['files']['queued-total'] == 2
assert metric['files']['done'][self.activity1]['1h'] == 1
assert metric['bytes']['done'][self.activity1]['1h'] == 367
assert metric['files']['failed'][self.activity2]['1h'] == 1
assert metric['src_rse'] == self.src_rse
assert metric['dst_rse'] == self.dst_rse

def test_api_metrics_panda(self):
params = {'dst_rse': self.dst_rse, 'src_rse': self.src_rse, 'format': 'panda'}
response = self.rest_client.get(self.api_endpoint, query_string=params, headers=self.headers)
response = json.loads(response.get_data(as_text=True))
metric = response.get(f'{self.src_rse}:{self.dst_rse}')
assert metric is not None

def test_api_metrics_group_response_by_rse_attribute_existing(self):

def test_api_metrics_group_response_by_rse_attribute_missing(self):

0 comments on commit ffb279b

Please sign in to comment.