diff --git a/bin/rucio-reaper b/bin/rucio-reaper index 711fa836e2..4068533a45 100755 --- a/bin/rucio-reaper +++ b/bin/rucio-reaper @@ -18,10 +18,9 @@ ''' import argparse -import signal from rucio.common.utils import StoreAndDeprecateWarningAction -from rucio.daemons.reaper.reaper import run, stop +from rucio.daemons.reaper.reaper import Reaper def get_parser(): @@ -61,24 +60,22 @@ def get_parser(): if __name__ == "__main__": - # Bind our callback to the SIGTERM signal and run the daemon: - signal.signal(signal.SIGTERM, stop) - parser = get_parser() args = parser.parse_args() try: - run(threads=args.threads, - chunk_size=args.chunk_size, - include_rses=args.include_rses, - exclude_rses=args.exclude_rses, - rses=args.rses, - vos=args.vos, - once=args.run_once, - greedy=args.greedy, - scheme=args.scheme, - delay_seconds=args.delay_seconds, - sleep_time=args.sleep_time, - auto_exclude_threshold=args.auto_exclude_threshold, - auto_exclude_timeout=args.auto_exclude_timeout) + reaper = Reaper(threads=args.threads, + chunk_size=args.chunk_size, + include_rses=args.include_rses, + exclude_rses=args.exclude_rses, + rses=args.rses, + vos=args.vos, + once=args.run_once, + greedy=args.greedy, + scheme=args.scheme, + delay_seconds=args.delay_seconds, + sleep_time=args.sleep_time, + auto_exclude_threshold=args.auto_exclude_threshold, + auto_exclude_timeout=args.auto_exclude_timeout) + reaper.run() except KeyboardInterrupt: - stop() + reaper.stop() diff --git a/lib/rucio/common/exception.py b/lib/rucio/common/exception.py index d975fb3b7e..2993a0d33e 100644 --- a/lib/rucio/common/exception.py +++ b/lib/rucio/common/exception.py @@ -1090,3 +1090,13 @@ def __init__(self, *args, **kwargs): super(DeprecationError, self).__init__(*args, **kwargs) self._message = 'Command or function has been deprecated.' self.error_code = 105 + + +class ReaperNoRSEsToProcess(RucioException): + """ + Reaper found no RSEs to process. + """ + def __init__(self, *args, **kwargs): + super(ReaperNoRSEsToProcess, self).__init__(*args, **kwargs) + self._message = 'Reaper: No RSEs to process found.' + self.error_code = 106 diff --git a/lib/rucio/core/rse_expression_parser.py b/lib/rucio/core/rse_expression_parser.py index 5a87d937fd..a739d4e678 100644 --- a/lib/rucio/core/rse_expression_parser.py +++ b/lib/rucio/core/rse_expression_parser.py @@ -42,7 +42,7 @@ @transactional_session -def parse_expression(expression, filter_=None, *, session: "Session"): +def parse_expression(expression: str, filter_=None, *, session: "Session"): """ Parse a RSE expression and return the list of RSE dictionaries. diff --git a/lib/rucio/daemons/common.py b/lib/rucio/daemons/common.py index 424cfa7992..d4478c8810 100644 --- a/lib/rucio/daemons/common.py +++ b/lib/rucio/daemons/common.py @@ -76,9 +76,17 @@ def _pre_run_checks() -> None: raise DatabaseException("Database was not updated, daemon won't start") @abstractmethod - def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> None: + def _run_once( + self, heartbeat_handler: "HeartbeatHandler", **_kwargs + ) -> tuple[bool, Any]: """ Daemon-specific logic (to be defined in child classes) for a single iteration + + :param heartbeat_handler: Handler to set and manage the heartbeat for this execution. + + :returns: Tuple of (must_sleep, ret_value). + must_sleep: set to True to signal to the calling context that it must sleep before next execution, False otherwise + ret_value: Daemon-specific return value """ pass @@ -107,20 +115,23 @@ def run(self) -> None: """ self._pre_run_checks() - if self.once: - self._call_daemon() - else: - logging.info("main: starting threads") - threads = [ - threading.Thread(target=self._call_daemon) - for _ in range(0, self.total_workers) - ] - [t.start() for t in threads] - logging.info("main: waiting for interrupts") + logging.info(f"{self.daemon_name}: starting threads") + thread_list = [ + threading.Thread(target=self._call_daemon) + for _ in range(0, self.total_workers) + ] + [t.start() for t in thread_list] - # Interruptible joins require a timeout. - while threads[0].is_alive(): - [t.join(timeout=3.14) for t in threads] + if not self.once: + logging.info(f"{self.daemon_name}: waiting for interrupts") + + # Interruptible joins require a timeout. + while thread_list: + thread_list = [ + thread.join(timeout=3.14) + for thread in thread_list + if thread and thread.is_alive() + ] def stop( self, signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None diff --git a/lib/rucio/daemons/reaper/reaper.py b/lib/rucio/daemons/reaper/reaper.py index 9272ba5ffd..301349ab7f 100644 --- a/lib/rucio/daemons/reaper/reaper.py +++ b/lib/rucio/daemons/reaper/reaper.py @@ -13,11 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -''' +""" Reaper is a daemon to manage file deletion. -''' +""" -import functools import logging import random import threading @@ -25,670 +24,991 @@ import traceback from configparser import NoOptionError, NoSectionError from datetime import datetime, timedelta -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional, Any from dogpile.cache.api import NoValue from math import log2 from sqlalchemy.exc import DatabaseError, IntegrityError -import rucio.db.sqla.util from rucio.common.cache import make_region_memcached from rucio.common.config import config_get, config_get_bool, config_get_int -from rucio.common.exception import (DatabaseException, RSENotFound, - ReplicaUnAvailable, ReplicaNotFound, ServiceUnavailable, - RSEAccessDenied, ResourceTemporaryUnavailable, SourceNotFound, - VONotFound, RSEProtocolNotSupported) -from rucio.common.logging import setup_logging +from rucio.common.exception import ( + DatabaseException, + RSENotFound, + ReplicaUnAvailable, + ReplicaNotFound, + ServiceUnavailable, + RSEAccessDenied, + ResourceTemporaryUnavailable, + SourceNotFound, + VONotFound, + RSEProtocolNotSupported, + ReaperNoRSEsToProcess, +) from rucio.common.stopwatch import Stopwatch from rucio.common.utils import chunks +from rucio.common.types import RSESettingsDict from rucio.core.credential import get_signed_url from rucio.core.heartbeat import list_payload_counts from rucio.core.message import add_message from rucio.core.monitor import MetricManager from rucio.core.oidc import request_token from rucio.core.replica import list_and_mark_unlocked_replicas, delete_replicas -from rucio.core.rse import (determine_audience_for_rse, determine_scope_for_rse, - list_rses, RseData) +from rucio.core.rse import ( + determine_audience_for_rse, + determine_scope_for_rse, + list_rses, + RseData, +) from rucio.core.rse_expression_parser import parse_expression from rucio.core.rule import get_evaluation_backlog from rucio.core.vo import list_vos -from rucio.daemons.common import run_daemon +from rucio.daemons.common import Daemon, HeartbeatHandler from rucio.rse import rsemanager as rsemgr if TYPE_CHECKING: from collections.abc import Callable - from types import FrameType - from typing import Any, Optional - - from rucio.daemons.common import HeartbeatHandler GRACEFUL_STOP = threading.Event() METRICS = MetricManager(module=__name__) REGION = make_region_memcached(expiration_time=600) -DAEMON_NAME = 'reaper' - -EXCLUDED_RSE_GAUGE = METRICS.gauge('excluded_rses.{rse}', documentation='Temporarly excluded RSEs') - - -def get_rses_to_process(rses, include_rses, exclude_rses, vos): - """ - Return the list of RSEs to process based on rses, include_rses and exclude_rses - - :param rses: List of RSEs the reaper should work against. If empty, it considers all RSEs. - :param exclude_rses: RSE expression to exclude RSEs from the Reaper. - :param include_rses: RSE expression to include RSEs. - :param vos: VOs on which to look for RSEs. Only used in multi-VO mode. - If None, we either use all VOs if run from "def" - - :returns: A list of RSEs to process - """ - multi_vo = config_get_bool('common', 'multi_vo', raise_exception=False, default=False) - if not multi_vo: - if vos: - logging.log(logging.WARNING, 'Ignoring argument vos, this is only applicable in a multi-VO setup.') - vos = ['def'] - else: - if vos: - invalid = set(vos) - set([v['vo'] for v in list_vos()]) - if invalid: - msg = 'VO{} {} cannot be found'.format('s' if len(invalid) > 1 else '', ', '.join([repr(v) for v in invalid])) - raise VONotFound(msg) - else: - vos = [v['vo'] for v in list_vos()] - logging.log(logging.INFO, 'Reaper: This instance will work on VO%s: %s' % ('s' if len(vos) > 1 else '', ', '.join([v for v in vos]))) +EXCLUDED_RSE_GAUGE = METRICS.gauge( + "excluded_rses.{rse}", documentation="Temporarily excluded RSEs" +) + + +class Reaper(Daemon): + def __init__( + self, + rses: Optional[list[str]] = None, + scheme: Optional[str] = None, + exclude_rses: Optional[str] = None, + include_rses: Optional[str] = None, + chunk_size: int = 100, + greedy: bool = False, + vos: Optional[list[str]] = None, + delay_seconds: int = 0, + auto_exclude_threshold: int = 100, + auto_exclude_timeout: int = 600, + **_kwargs, + ) -> None: + """ + :param rses: List of RSEs the reaper should work against. + If empty, it considers all RSEs. + :param scheme: Force the reaper to use a particular protocol/scheme, e.g., mock. + :param exclude_rses: RSE expression to exclude RSEs from the Reaper. + :param include_rses: RSE expression to include RSEs. + :param chunk_size: The size of chunk for deletion. + :param threads_per_worker: Total number of threads created by each worker. + :param greedy: If True, delete right away replicas with tombstone. + :param vos: VOs on which to look for RSEs. Only used in multi-VO mode. + If None, we either use all VOs if run from "def", + or the current VO otherwise. + :param delay_seconds: The delay to query replicas in BEING_DELETED state. + :param auto_exclude_threshold: Number of service unavailable exceptions after which the RSE gets temporarily excluded. + :param auto_exclude_timeout: Timeout for temporarily excluded RSEs. + """ + super().__init__(daemon_name="reaper", **_kwargs) + self.rses = rses + self.scheme = scheme + self.exclude_rses = exclude_rses + self.include_rses = include_rses + self.chunk_size = chunk_size + self.greedy = greedy + self.vos = vos + self.delay_seconds = delay_seconds + self.auto_exclude_threshold = auto_exclude_threshold + self.auto_exclude_timeout = auto_exclude_timeout + + def _pre_run_checks(self): + super()._pre_run_checks() + rses_to_process = self._get_rses_to_process() + if not rses_to_process: + logging.log(logging.ERROR, "Reaper: No RSEs to process found.") + raise ReaperNoRSEsToProcess("Reaper: No RSEs to process found.") + logging.log( + logging.INFO, + "Reaper: This instance will work on RSEs: %s", + ", ".join([rse["rse"] for rse in rses_to_process]), + ) - cache_key = 'rses_to_process_1%s2%s3%s' % (str(rses), str(include_rses), str(exclude_rses)) - if multi_vo: - cache_key += '@%s' % '-'.join(vo for vo in vos) + def _run_once( + self, heartbeat_handler: "HeartbeatHandler", **_kwargs + ) -> tuple[bool, Any]: - result = REGION.get(cache_key) - if not isinstance(result, NoValue): - return result + must_sleep = True + + _, total_workers, logger = heartbeat_handler.live() + logger(logging.INFO, "Reaper started") - all_rses = [] - for vo in vos: - all_rses.extend(list_rses(filters={'vo': vo})) - - if rses: - invalid = set(rses) - set([rse['rse'] for rse in all_rses]) - if invalid: - msg = 'RSE{} {} cannot be found'.format('s' if len(invalid) > 1 else '', - ', '.join([repr(rse) for rse in invalid])) - raise RSENotFound(msg) - rses = [rse for rse in all_rses if rse['rse'] in rses] - else: - rses = all_rses - - if include_rses: - included_rses = parse_expression(include_rses) - rses = [rse for rse in rses if rse in included_rses] - - if exclude_rses: - excluded_rses = parse_expression(exclude_rses) - rses = [rse for rse in rses if rse not in excluded_rses] - - REGION.set(cache_key, rses) - logging.log(logging.INFO, 'Reaper: This instance will work on RSEs: %s', ', '.join([rse['rse'] for rse in rses])) - return rses - - -def delete_from_storage(heartbeat_handler, hb_payload, replicas, prot, rse_info, is_staging, auto_exclude_threshold, logger=logging.log): - deleted_files = [] - rse_name = rse_info['rse'] - rse_id = rse_info['id'] - noaccess_attempts = 0 - pfns_to_bulk_delete = [] - try: - prot.connect() - for replica in replicas: - # Physical deletion - _, _, logger = heartbeat_handler.live(payload=hb_payload) - stopwatch = Stopwatch() - deletion_dict = {'scope': replica['scope'].external, - 'name': replica['name'], - 'rse': rse_name, - 'file-size': replica['bytes'], - 'bytes': replica['bytes'], - 'url': replica['pfn'], - 'protocol': prot.attributes['scheme'], - 'datatype': replica['datatype']} + # Try to get auto exclude parameters from the config table. Otherwise use CLI parameters. + # It's best to access these at every iteration, instead of in the constructor, + # as the config table can be changed at any moment by Rucio administrators. + auto_exclude_threshold = config_get_int( + "reaper", + "auto_exclude_threshold", + default=self.auto_exclude_threshold, + raise_exception=False, + ) + auto_exclude_timeout = config_get_int( + "reaper", + "auto_exclude_timeout", + default=self.auto_exclude_timeout, + raise_exception=False, + ) + # Check if there is a Judge Evaluator backlog + max_evaluator_backlog_count = config_get_int( + "reaper", "max_evaluator_backlog_count", default=None, raise_exception=False + ) + max_evaluator_backlog_duration = config_get_int( + "reaper", + "max_evaluator_backlog_duration", + default=None, + raise_exception=False, + ) + if max_evaluator_backlog_count or max_evaluator_backlog_duration: + backlog = get_evaluation_backlog() + count_is_hit = ( + max_evaluator_backlog_count + and backlog[0] + and backlog[0] > max_evaluator_backlog_count + ) + duration_is_hit = ( + max_evaluator_backlog_duration + and backlog[1] + and backlog[1] + < datetime.utcnow() - timedelta(minutes=max_evaluator_backlog_duration) + ) + if count_is_hit and duration_is_hit: + logger( + logging.ERROR, + "Reaper: Judge evaluator backlog count and duration hit, stopping operation", + ) + return must_sleep, None + elif count_is_hit: + logger( + logging.ERROR, + "Reaper: Judge evaluator backlog count hit, stopping operation", + ) + return must_sleep, None + elif duration_is_hit: + logger( + logging.ERROR, + "Reaper: Judge evaluator backlog duration hit, stopping operation", + ) + return must_sleep, None + + rses_to_process = self._get_rses_to_process() + rses_to_process = [ + RseData(id_=rse["id"], name=rse["rse"], columns=rse) + for rse in rses_to_process + ] + if not rses_to_process: + logger(logging.ERROR, "Reaper: No RSEs found. Will sleep for 30 seconds") + return must_sleep, None + + # On big deletion campaigns, we desire to re-iterate fast on RSEs which have a lot of data to delete. + # The called function will return the RSEs which have more work remaining. + # Call the deletion routine again on this returned subset of RSEs. + # Scale the number of allowed iterations with the number of total reaper workers + iteration = 0 + max_fast_reiterations = int(log2(total_workers)) + while rses_to_process and iteration <= max_fast_reiterations: + rses_to_process = self._deletion_routine( + rses_to_process=rses_to_process, + auto_exclude_threshold=auto_exclude_threshold, + auto_exclude_timeout=auto_exclude_timeout, + heartbeat_handler=heartbeat_handler, + ) + if rses_to_process and iteration < max_fast_reiterations: + logger( + logging.INFO, + "Will perform fast-reiteration %d/%d with rses: %s", + iteration + 1, + max_fast_reiterations, + [str(rse) for rse in rses_to_process], + ) + iteration += 1 + + if rses_to_process: + # There is still more work to be performed. + # Inform the calling context that it must call reaper again (on the full list of rses) + must_sleep = False + + return must_sleep, None + + def _deletion_routine( + self, + rses_to_process: list[RseData], + auto_exclude_threshold: int, + auto_exclude_timeout: int, + heartbeat_handler: "HeartbeatHandler", + ) -> Optional[list[RseData]]: + + dict_rses = {} + _, total_workers, logger = heartbeat_handler.live() + tot_needed_free_space = 0 + for rse in rses_to_process: + # Check if RSE is blocklisted + if not rse.columns["availability_delete"]: + logger(logging.DEBUG, "RSE %s is blocklisted for delete", rse.name) + continue + rse.ensure_loaded(load_attributes=True) + enable_greedy = rse.attributes.get("greedyDeletion", False) or self.greedy + needed_free_space, only_delete_obsolete = Reaper.__check_rse_usage_cached( + rse, greedy=enable_greedy, logger=logger + ) + if needed_free_space: + dict_rses[rse] = [ + needed_free_space, + only_delete_obsolete, + enable_greedy, + ] + tot_needed_free_space += needed_free_space + elif only_delete_obsolete: + dict_rses[rse] = [ + needed_free_space, + only_delete_obsolete, + enable_greedy, + ] + else: + logger(logging.DEBUG, "Nothing to delete on %s", rse.name) + + rses_with_params = [ + (rse, needed_free_space, only_delete_obsolete, enable_greedy) + for rse, ( + needed_free_space, + only_delete_obsolete, + enable_greedy, + ) in dict_rses.items() + ] + + # Ordering the RSEs based on the needed free space + sorted_rses = sorted(rses_with_params, key=lambda x: x[1], reverse=True) + log_msg_str = ", ".join( + f"{rse}:{needed_free_space}:{only_delete_obsolete}:{enable_greedy}" + for rse, needed_free_space, only_delete_obsolete, enable_greedy in sorted_rses + ) + logger( + logging.DEBUG, + "List of RSEs to process ordered by needed space desc: %s", + log_msg_str, + ) + + random.shuffle(rses_with_params) + + work_remaining_by_rse = {} + paused_rses = [] + for ( + rse, + needed_free_space, + only_delete_obsolete, + enable_greedy, + ) in rses_with_params: + result = REGION.get("pause_deletion_%s" % rse.id, expiration_time=120) + if not isinstance(result, NoValue): + paused_rses.append(rse.name) + logger( + logging.DEBUG, + "Not enough replicas to delete on %s during the previous cycle. Deletion paused for a while", + rse.name, + ) + continue + + result = REGION.get( + "temporary_exclude_%s" % rse.id, expiration_time=auto_exclude_timeout + ) + if not isinstance(result, NoValue): + logger( + logging.WARNING, + "Too many failed attempts for %s in last cycle. RSE is temporarily excluded.", + rse.name, + ) + EXCLUDED_RSE_GAUGE.labels(rse=rse.name).set(1) + continue + EXCLUDED_RSE_GAUGE.labels(rse=rse.name).set(0) + + percent = 0 + if tot_needed_free_space: + percent = needed_free_space / tot_needed_free_space * 100 + logger( + logging.DEBUG, + "Working on %s. Percentage of the total space needed %.2f", + rse.name, + percent, + ) + + rse_hostname = self._rse_deletion_hostname(rse) + if not rse_hostname: + if self.scheme: + logger( + logging.WARNING, + "Protocol %s not supported on %s", + self.scheme, + rse.name, + ) + else: + logger( + logging.WARNING, "No default delete protocol for %s", rse.name + ) + REGION.set("pause_deletion_%s" % rse.id, True) + continue + + hb_payload = Reaper.__try_reserve_worker_slot( + heartbeat_handler=heartbeat_handler, rse=rse, hostname=rse_hostname + ) + if not hb_payload: + # Might need to reschedule a try on this RSE later in the same cycle + continue + + # List and mark BEING_DELETED the files to delete + del_start_time = time.time() try: - if replica['scope'].vo != 'def': - deletion_dict['vo'] = replica['scope'].vo - logger(logging.DEBUG, 'Deletion ATTEMPT of %s:%s as %s on %s', replica['scope'], replica['name'], replica['pfn'], rse_name) - # For STAGING RSEs, no physical deletion - if is_staging: - logger(logging.WARNING, 'Deletion STAGING of %s:%s as %s on %s, will only delete the catalog and not do physical deletion', replica['scope'], replica['name'], replica['pfn'], rse_name) - deleted_files.append({'scope': replica['scope'], 'name': replica['name']}) - continue - - if replica['pfn']: - pfn = replica['pfn'] - # sign the URL if necessary - if prot.attributes['scheme'] == 'https' and rse_info['sign_url'] is not None: - pfn = get_signed_url(rse_id, rse_info['sign_url'], 'delete', pfn) - if prot.attributes['scheme'] == 'globus': - pfns_to_bulk_delete.append(replica['pfn']) - else: - prot.delete(pfn) + with METRICS.timer("list_unlocked_replicas"): + if only_delete_obsolete: + logger( + logging.DEBUG, + "Will run list_and_mark_unlocked_replicas on %s. No space needed, will only delete EPOCH tombstoned replicas", + rse.name, + ) + replicas = list_and_mark_unlocked_replicas( + limit=self.chunk_size, + bytes_=needed_free_space, + rse_id=rse.id, + delay_seconds=self.delay_seconds, + only_delete_obsolete=only_delete_obsolete, + session=None, + ) + logger( + logging.DEBUG, + "list_and_mark_unlocked_replicas on %s for %s bytes in %s seconds: %s replicas", + rse.name, + needed_free_space, + time.time() - del_start_time, + len(replicas), + ) + if (len(replicas) == 0 and enable_greedy) or ( + len(replicas) < self.chunk_size and not enable_greedy + ): + logger( + logging.DEBUG, + "Not enough replicas to delete on %s (%s requested vs %s returned). Will skip any new attempts on this RSE until next cycle", + rse.name, + self.chunk_size, + len(replicas), + ) + REGION.set("pause_deletion_%s" % rse.id, True) + work_remaining_by_rse[rse] = False else: - logger(logging.WARNING, 'Deletion UNAVAILABLE of %s:%s as %s on %s', replica['scope'], replica['name'], replica['pfn'], rse_name) - - duration = stopwatch.elapsed - METRICS.timer('delete.{scheme}.{rse}').labels(scheme=prot.attributes['scheme'], rse=rse_name).observe(duration) - - deleted_files.append({'scope': replica['scope'], 'name': replica['name']}) - - deletion_dict['duration'] = duration - add_message('deletion-done', deletion_dict) - logger(logging.INFO, 'Deletion SUCCESS of %s:%s as %s on %s in %.2f seconds', replica['scope'], replica['name'], replica['pfn'], rse_name, duration) - - except SourceNotFound: - duration = stopwatch.elapsed - err_msg = 'Deletion NOTFOUND of %s:%s as %s on %s in %.2f seconds' % (replica['scope'], replica['name'], replica['pfn'], rse_name, duration) - logger(logging.WARNING, '%s', err_msg) - deletion_dict['reason'] = 'File Not Found' - deletion_dict['duration'] = duration - add_message('deletion-not-found', deletion_dict) - deleted_files.append({'scope': replica['scope'], 'name': replica['name']}) - - except (ServiceUnavailable, RSEAccessDenied, ResourceTemporaryUnavailable) as error: - duration = stopwatch.elapsed - logger(logging.WARNING, 'Deletion NOACCESS of %s:%s as %s on %s: %s in %.2f', replica['scope'], replica['name'], replica['pfn'], rse_name, str(error), duration) - deletion_dict['reason'] = str(error) - deletion_dict['duration'] = duration - add_message('deletion-failed', deletion_dict) - noaccess_attempts += 1 - if noaccess_attempts >= auto_exclude_threshold: - logger(logging.INFO, 'Too many (%d) NOACCESS attempts for %s. RSE will be temporarly excluded.', noaccess_attempts, rse_name) - REGION.set('temporary_exclude_%s' % rse_id, True) - METRICS.gauge('excluded_rses.{rse}').labels(rse=rse_name).set(1) - - EXCLUDED_RSE_GAUGE.labels(rse=rse_name).set(1) - break - - except Exception as error: - duration = stopwatch.elapsed - logger(logging.CRITICAL, 'Deletion CRITICAL of %s:%s as %s on %s in %.2f seconds : %s', replica['scope'], replica['name'], replica['pfn'], rse_name, duration, str(traceback.format_exc())) - deletion_dict['reason'] = str(error) - deletion_dict['duration'] = duration - add_message('deletion-failed', deletion_dict) - - if pfns_to_bulk_delete and prot.attributes['scheme'] == 'globus': - logger(logging.DEBUG, 'Attempting bulk delete on RSE %s for scheme %s', rse_name, prot.attributes['scheme']) - prot.bulk_delete(pfns_to_bulk_delete) - - except (ServiceUnavailable, RSEAccessDenied, ResourceTemporaryUnavailable) as error: - for replica in replicas: - logger(logging.WARNING, 'Deletion NOACCESS of %s:%s as %s on %s: %s', replica['scope'], replica['name'], replica['pfn'], rse_name, str(error)) - payload = {'scope': replica['scope'].external, - 'name': replica['name'], - 'rse': rse_name, - 'file-size': replica['bytes'], - 'bytes': replica['bytes'], - 'url': replica['pfn'], - 'reason': str(error), - 'protocol': prot.attributes['scheme']} - if replica['scope'].vo != 'def': - payload['vo'] = replica['scope'].vo - add_message('deletion-failed', payload) - logger(logging.INFO, 'Cannot connect to %s. RSE will be temporarly excluded.', rse_name) - REGION.set('temporary_exclude_%s' % rse_id, True) - EXCLUDED_RSE_GAUGE.labels(rse=rse_name).set(1) - finally: - prot.close() - return deleted_files - - -def _rse_deletion_hostname(rse: RseData, scheme: "Optional[str]") -> "Optional[str]": - """ - Retrieves the hostname of the default deletion protocol - """ - rse.ensure_loaded(load_info=True) - for prot in rse.info['protocols']: - if scheme: - if prot['scheme'] == scheme and prot['domains']['wan']['delete'] != 0: - return prot['hostname'] + work_remaining_by_rse[rse] = True + + except (DatabaseException, IntegrityError, DatabaseError) as error: + logger(logging.ERROR, "%s", str(error)) + continue + except Exception: + logger(logging.CRITICAL, "Exception", exc_info=True) + continue + # Physical deletion will take place there + try: + rse.ensure_loaded(load_info=True, load_attributes=True) + prot = rsemgr.create_protocol( + rse.info, "delete", scheme=self.scheme, logger=logger + ) + if ( + rse.attributes.get("oidc_support") is True + and prot.attributes["scheme"] == "davs" + ): + audience = config_get( + "reaper", "oidc_audience", False + ) or determine_audience_for_rse(rse.id) + # FIXME: At the time of writing, StoRM requires `storage.read` + # in order to perform a stat operation. + scope = determine_scope_for_rse( + rse.id, scopes=["storage.modify", "storage.read"] + ) + auth_token = request_token(audience, scope) + if auth_token: + logger( + logging.INFO, "Using a token to delete on RSE %s", rse.name + ) + prot = rsemgr.create_protocol( + rse.info, + "delete", + scheme=self.scheme, + auth_token=auth_token, + logger=logger, + ) + else: + logger( + logging.WARNING, + "Failed to procure a token to delete on RSE %s", + rse.name, + ) + for file_replicas in chunks(replicas, self.chunk_size): + # Refresh heartbeat + _, total_workers, logger = heartbeat_handler.live( + payload=hb_payload + ) + del_start_time = time.time() + for replica in file_replicas: + try: + replica["pfn"] = str( + list( + rsemgr.lfns2pfns( + rse_settings=rse.info, + lfns=[ + { + "scope": replica["scope"].external, + "name": replica["name"], + "path": replica["path"], + } + ], + operation="delete", + scheme=self.scheme, + ).values() + )[0] + ) + except (ReplicaUnAvailable, ReplicaNotFound) as error: + logger( + logging.WARNING, + "Failed get pfn UNAVAILABLE replica %s:%s on %s with error %s", + replica["scope"], + replica["name"], + rse.name, + str(error), + ) + replica["pfn"] = None + + except Exception: + logger(logging.CRITICAL, "Exception", exc_info=True) + + is_staging = rse.columns["staging_area"] + deleted_files = Reaper._delete_from_storage( + heartbeat_handler=heartbeat_handler, + hb_payload=hb_payload, + replicas=file_replicas, + prot=prot, + rse_info=rse.info, + is_staging=is_staging, + auto_exclude_threshold=auto_exclude_threshold, + ) + logger( + logging.INFO, + "%i files processed in %s seconds", + len(file_replicas), + time.time() - del_start_time, + ) + + # Then finally delete the replicas + del_start = time.time() + delete_replicas(rse_id=rse.id, files=deleted_files) + logger( + logging.DEBUG, + "delete_replicas successed on %s : %s replicas in %s seconds", + rse.name, + len(deleted_files), + time.time() - del_start, + ) + METRICS.counter("deletion.done").inc(len(deleted_files)) + except RSEProtocolNotSupported: + logger( + logging.WARNING, + "Protocol %s not supported on %s", + self.scheme, + rse.name, + ) + except Exception: + logger(logging.CRITICAL, "Exception", exc_info=True) + + if paused_rses: + logger( + logging.INFO, + "Deletion paused for a while for following RSEs: %s", + ", ".join(paused_rses), + ) + + rses_with_more_work = [ + rse for rse, has_more_work in work_remaining_by_rse.items() if has_more_work + ] + return rses_with_more_work + + def _get_rses_to_process(self): + """ + Return the list of RSEs to process based on rses, include_rses and exclude_rses + + :returns: A list of RSEs to process + """ + multi_vo = config_get_bool( + "common", "multi_vo", raise_exception=False, default=False + ) + if not multi_vo: + if self.vos: + logging.log( + logging.WARNING, + "Ignoring argument VOs, this is only applicable in a multi-VO setup.", + ) + self.vos = ["def"] else: - if prot['domains']['wan']['delete'] == 1: - return prot['hostname'] - return None + if self.vos: + invalid = set(self.vos) - set([v["vo"] for v in list_vos()]) + if invalid: + msg = "VO{} {} cannot be found".format( + "s" if len(invalid) > 1 else "", + ", ".join([repr(v) for v in invalid]), + ) + raise VONotFound(msg) + else: + self.vos = [v["vo"] for v in list_vos()] + logging.log( + logging.INFO, + "Reaper: This instance will work on VO%s: %s" + % ("s" if len(self.vos) > 1 else "", ", ".join([v for v in self.vos])), + ) + + cache_key = "rses_to_process_1%s2%s3%s" % ( + str(self.rses), + str(self.include_rses), + str(self.exclude_rses), + ) + if multi_vo: + cache_key += "@%s" % "-".join(vo for vo in self.vos) + + result = REGION.get(cache_key) + if not isinstance(result, NoValue): + return result + all_rses = [] + for vo in self.vos: + all_rses.extend(list_rses(filters={"vo": vo})) -def get_max_deletion_threads_by_hostname(hostname: str) -> int: - """ - Internal method to check RSE usage and limits. + if self.rses: + invalid = set(self.rses) - set([rse["rse"] for rse in all_rses]) + if invalid: + msg = "RSE{} {} cannot be found".format( + "s" if len(invalid) > 1 else "", + ", ".join([repr(rse) for rse in invalid]), + ) + raise RSENotFound(msg) + rses = [rse for rse in all_rses if rse["rse"] in self.rses] + else: + rses = all_rses - :param hostname: the hostname of the SE + if self.include_rses: + included_rses = parse_expression(self.include_rses) + rses = [rse for rse in rses if rse in included_rses] - :returns: The maximum deletion thread for the SE. - """ - result = REGION.get('max_deletion_threads_%s' % hostname) - if isinstance(result, NoValue): + if self.exclude_rses: + excluded_rses = parse_expression(self.exclude_rses) + rses = [rse for rse in rses if rse not in excluded_rses] + + REGION.set(cache_key, rses) + logging.log( + logging.INFO, + "Reaper: This instance will work on RSEs: %s", + ", ".join([rse["rse"] for rse in rses]), + ) + return rses + + def _rse_deletion_hostname(self, rse: RseData) -> "Optional[str]": + """ + Retrieves the hostname of the default deletion protocol + """ + rse.ensure_loaded(load_info=True) + for prot in rse.info["protocols"]: + if self.scheme: + if ( + prot["scheme"] == self.scheme + and prot["domains"]["wan"]["delete"] != 0 + ): + return prot["hostname"] + else: + if prot["domains"]["wan"]["delete"] == 1: + return prot["hostname"] + return None + + @staticmethod + def _delete_from_storage( + heartbeat_handler: "HeartbeatHandler", + hb_payload: Optional[str], + replicas: list[dict], + prot: Any, # TODO: define type for protocol, currently does not exist + rse_info: RSESettingsDict, + is_staging: bool, + auto_exclude_threshold: int, + ) -> list[dict]: + deleted_files = [] + rse_name = rse_info["rse"] + rse_id = rse_info["id"] + noaccess_attempts = 0 + pfns_to_bulk_delete = [] try: - max_deletion_thread = config_get_int('reaper', 'max_deletion_threads_%s' % hostname) - except (NoOptionError, NoSectionError, RuntimeError): + prot.connect() + for replica in replicas: + # Physical deletion + _, _, logger = heartbeat_handler.live(payload=hb_payload) + stopwatch = Stopwatch() + deletion_dict = { + "scope": replica["scope"].external, + "name": replica["name"], + "rse": rse_name, + "file-size": replica["bytes"], + "bytes": replica["bytes"], + "url": replica["pfn"], + "protocol": prot.attributes["scheme"], + "datatype": replica["datatype"], + } + try: + if replica["scope"].vo != "def": + deletion_dict["vo"] = replica["scope"].vo + logger( + logging.DEBUG, + "Deletion ATTEMPT of %s:%s as %s on %s", + replica["scope"], + replica["name"], + replica["pfn"], + rse_name, + ) + # For STAGING RSEs, no physical deletion + if is_staging: + logger( + logging.WARNING, + "Deletion STAGING of %s:%s as %s on %s, will only delete the catalog and not do physical deletion", + replica["scope"], + replica["name"], + replica["pfn"], + rse_name, + ) + deleted_files.append( + {"scope": replica["scope"], "name": replica["name"]} + ) + continue + + if replica["pfn"]: + pfn = replica["pfn"] + # sign the URL if necessary + if ( + prot.attributes["scheme"] == "https" + and rse_info["sign_url"] is not None + ): + pfn = get_signed_url( + rse_id, rse_info["sign_url"], "delete", pfn + ) + if prot.attributes["scheme"] == "globus": + pfns_to_bulk_delete.append(replica["pfn"]) + else: + prot.delete(pfn) + else: + logger( + logging.WARNING, + "Deletion UNAVAILABLE of %s:%s as %s on %s", + replica["scope"], + replica["name"], + replica["pfn"], + rse_name, + ) + + duration = stopwatch.elapsed + METRICS.timer("delete.{scheme}.{rse}").labels( + scheme=prot.attributes["scheme"], rse=rse_name + ).observe(duration) + + deleted_files.append( + {"scope": replica["scope"], "name": replica["name"]} + ) + + deletion_dict["duration"] = duration + add_message("deletion-done", deletion_dict) + logger( + logging.INFO, + "Deletion SUCCESS of %s:%s as %s on %s in %.2f seconds", + replica["scope"], + replica["name"], + replica["pfn"], + rse_name, + duration, + ) + + except SourceNotFound: + duration = stopwatch.elapsed + err_msg = ( + "Deletion NOTFOUND of %s:%s as %s on %s in %.2f seconds" + % ( + replica["scope"], + replica["name"], + replica["pfn"], + rse_name, + duration, + ) + ) + logger(logging.WARNING, "%s", err_msg) + deletion_dict["reason"] = "File Not Found" + deletion_dict["duration"] = duration + add_message("deletion-not-found", deletion_dict) + deleted_files.append( + {"scope": replica["scope"], "name": replica["name"]} + ) + + except ( + ServiceUnavailable, + RSEAccessDenied, + ResourceTemporaryUnavailable, + ) as error: + duration = stopwatch.elapsed + logger( + logging.WARNING, + "Deletion NOACCESS of %s:%s as %s on %s: %s in %.2f", + replica["scope"], + replica["name"], + replica["pfn"], + rse_name, + str(error), + duration, + ) + deletion_dict["reason"] = str(error) + deletion_dict["duration"] = duration + add_message("deletion-failed", deletion_dict) + noaccess_attempts += 1 + if noaccess_attempts >= auto_exclude_threshold: + logger( + logging.INFO, + "Too many (%d) NOACCESS attempts for %s. RSE will be temporarily excluded.", + noaccess_attempts, + rse_name, + ) + REGION.set("temporary_exclude_%s" % rse_id, True) + METRICS.gauge("excluded_rses.{rse}").labels(rse=rse_name).set(1) + + EXCLUDED_RSE_GAUGE.labels(rse=rse_name).set(1) + break + + except Exception as error: + duration = stopwatch.elapsed + logger( + logging.CRITICAL, + "Deletion CRITICAL of %s:%s as %s on %s in %.2f seconds : %s", + replica["scope"], + replica["name"], + replica["pfn"], + rse_name, + duration, + str(traceback.format_exc()), + ) + deletion_dict["reason"] = str(error) + deletion_dict["duration"] = duration + add_message("deletion-failed", deletion_dict) + + if pfns_to_bulk_delete and prot.attributes["scheme"] == "globus": + logger( + logging.DEBUG, + "Attempting bulk delete on RSE %s for scheme %s", + rse_name, + prot.attributes["scheme"], + ) + prot.bulk_delete(pfns_to_bulk_delete) + + except ( + ServiceUnavailable, + RSEAccessDenied, + ResourceTemporaryUnavailable, + ) as error: + for replica in replicas: + logger( + logging.WARNING, + "Deletion NOACCESS of %s:%s as %s on %s: %s", + replica["scope"], + replica["name"], + replica["pfn"], + rse_name, + str(error), + ) + payload = { + "scope": replica["scope"].external, + "name": replica["name"], + "rse": rse_name, + "file-size": replica["bytes"], + "bytes": replica["bytes"], + "url": replica["pfn"], + "reason": str(error), + "protocol": prot.attributes["scheme"], + } + if replica["scope"].vo != "def": + payload["vo"] = replica["scope"].vo + add_message("deletion-failed", payload) + logger( + logging.INFO, + "Cannot connect to %s. RSE will be temporarily excluded.", + rse_name, + ) + REGION.set("temporary_exclude_%s" % rse_id, True) + EXCLUDED_RSE_GAUGE.labels(rse=rse_name).set(1) + finally: + prot.close() + return deleted_files + + @staticmethod + def _get_max_deletion_threads_by_hostname(hostname: str) -> int: + """ + Internal method to check RSE usage and limits. + + :param hostname: the hostname of the SE + + :returns: The maximum deletion thread for the SE. + """ + result = REGION.get("max_deletion_threads_%s" % hostname) + if isinstance(result, NoValue): try: - max_deletion_thread = config_get_int('reaper', 'nb_workers_by_hostname') + max_deletion_thread = config_get_int( + "reaper", "max_deletion_threads_%s" % hostname + ) except (NoOptionError, NoSectionError, RuntimeError): - max_deletion_thread = 5 - REGION.set('max_deletion_threads_%s' % hostname, max_deletion_thread) - result = max_deletion_thread - return result - - -def __try_reserve_worker_slot(heartbeat_handler: "HeartbeatHandler", rse: RseData, hostname: str, logger: "Callable[..., Any]") -> "Optional[str]": - """ - The maximum number of concurrent workers is limited per hostname and per RSE due to storage performance reasons. - This function tries to reserve a slot to run the deletion worker for the given RSE and hostname. - - The function doesn't guarantee strong consistency: the number of total workers may end being slightly - higher than the configured limit. - - The reservation is done using the "payload" field of the rucio heart-beats. - if reservation successful, returns the heartbeat payload used for the reservation. Otherwise, returns None - """ - - rse_hostname_key = '%s,%s' % (rse.id, hostname) - payload_cnt = list_payload_counts(heartbeat_handler.executable, older_than=heartbeat_handler.older_than) - tot_threads_for_hostname = 0 - tot_threads_for_rse = 0 - for key in payload_cnt: - if key and key.find(',') > -1: - if key.split(',')[1] == hostname: - tot_threads_for_hostname += payload_cnt[key] - if key.split(',')[0] == str(rse.id): - tot_threads_for_rse += payload_cnt[key] - max_deletion_thread = get_max_deletion_threads_by_hostname(hostname) - if rse_hostname_key in payload_cnt and tot_threads_for_hostname >= max_deletion_thread: - logger(logging.DEBUG, 'Too many deletion threads for %s on RSE %s. Back off', hostname, rse.name) - return None - logger(logging.INFO, 'Nb workers on %s smaller than the limit (current %i vs max %i). Starting new worker on RSE %s', hostname, tot_threads_for_hostname, max_deletion_thread, rse.name) - _, total_workers, logger = heartbeat_handler.live(payload=rse_hostname_key) - logger(logging.DEBUG, 'Total deletion workers for %s : %i', hostname, tot_threads_for_hostname + 1) - return rse_hostname_key - - -def __check_rse_usage_cached(rse: RseData, greedy: bool = False, logger: "Callable[..., Any]" = logging.log) -> tuple[int, bool]: - """ - Wrapper around __check_rse_usage which manages the cache entry. - """ - cache_key = 'rse_usage_%s' % rse.id - result = REGION.get(cache_key) - if isinstance(result, NoValue): - result = __check_rse_usage(rse=rse, greedy=greedy, logger=logger) - REGION.set(cache_key, result) - return result - - -def __check_rse_usage(rse: RseData, greedy: bool = False, logger: "Callable[..., Any]" = logging.log) -> tuple[int, bool]: - """ - Internal method to check RSE usage and limits. - - :param rse_name: The RSE name. - :param rse_id: The RSE id. - :param greedy: If True, needed_free_space will be set to 1TB regardless of actual rse usage. - - :returns: needed_free_space, only_delete_obsolete. - """ - - needed_free_space = 0 - # First of all check if greedy mode is enabled for this RSE - if greedy: - return 1000000000000, False - - rse.ensure_loaded(load_limits=True, load_usage=True, load_attributes=True) - available_sources = {} - available_sources['total'] = {key['source']: key['total'] for key in rse.usage} - available_sources['used'] = {key['source']: key['used'] for key in rse.usage} - - # Get RSE limits - min_free_space = rse.limits.get('MinFreeSpace', 0) - - # Check from which sources to get used and total spaces (default storage) - # If specified sources do not exist, only delete obsolete - source_for_total_space = rse.attributes.get('source_for_total_space', 'storage') - if source_for_total_space not in available_sources['total']: - logger(logging.WARNING, 'RSE: %s, \'%s\' requested for source_for_total_space but cannot be found. Will only delete obsolete', - rse.name, source_for_total_space) - return 0, True - source_for_used_space = rse.attributes.get('source_for_used_space', 'storage') - if source_for_used_space not in available_sources['used']: - logger(logging.WARNING, 'RSE: %s, \'%s\' requested for source_for_used_space but cannot be found. Will only delete obsolete', - rse.name, source_for_used_space) - return 0, True + try: + max_deletion_thread = config_get_int( + "reaper", "nb_workers_by_hostname" + ) + except (NoOptionError, NoSectionError, RuntimeError): + max_deletion_thread = 5 + REGION.set("max_deletion_threads_%s" % hostname, max_deletion_thread) + result = max_deletion_thread + return result - logger(logging.DEBUG, 'RSE: %s, source_for_total_space: %s, source_for_used_space: %s', - rse.name, source_for_total_space, source_for_used_space) - - # Get total and used space - total = available_sources['total'][source_for_total_space] - used = available_sources['used'][source_for_used_space] - - free = total - used - if min_free_space: - needed_free_space = min_free_space - free - - # If needed_free_space negative, nothing to delete except if some Epoch tombstoned replicas - if needed_free_space > 0: - return needed_free_space, False - - return 0, True - - -def reaper(rses, include_rses, exclude_rses, vos=None, chunk_size=100, once=False, greedy=False, - scheme=None, delay_seconds=0, sleep_time=60, auto_exclude_threshold=100, auto_exclude_timeout=600): - """ - Main loop to select and delete files. - - :param rses: List of RSEs the reaper should work against. If empty, it considers all RSEs. - :param include_rses: RSE expression to include RSEs. - :param exclude_rses: RSE expression to exclude RSEs from the Reaper. - :param vos: VOs on which to look for RSEs. Only used in multi-VO mode. - If None, we either use all VOs if run from "def", or the current VO otherwise. - :param chunk_size: The size of chunk for deletion. - :param once: If True, only runs one iteration of the main loop. - :param greedy: If True, delete right away replicas with tombstone. - :param scheme: Force the reaper to use a particular protocol, e.g., mock. - :param delay_seconds: The delay to query replicas in BEING_DELETED state. - :param sleep_time: Time between two cycles. - :param auto_exclude_threshold: Number of service unavailable exceptions after which the RSE gets temporarily excluded. - :param auto_exclude_timeout: Timeout for temporarily excluded RSEs. - """ - run_daemon( - once=once, - graceful_stop=GRACEFUL_STOP, - executable=DAEMON_NAME, - partition_wait_time=0 if once else 10, - sleep_time=sleep_time, - run_once_fnc=functools.partial( - run_once, - rses=rses, - include_rses=include_rses, - exclude_rses=exclude_rses, - vos=vos, - chunk_size=chunk_size, - greedy=greedy, - scheme=scheme, - delay_seconds=delay_seconds, - auto_exclude_threshold=auto_exclude_threshold, - auto_exclude_timeout=auto_exclude_timeout, + @staticmethod + def __try_reserve_worker_slot( + heartbeat_handler: "HeartbeatHandler", rse: RseData, hostname: str + ) -> "Optional[str]": + """ + The maximum number of concurrent workers is limited per hostname and per RSE due to storage performance reasons. + This function tries to reserve a slot to run the deletion worker for the given RSE and hostname. + + The function doesn't guarantee strong consistency: the number of total workers may end being slightly + higher than the configured limit. + + The reservation is done using the "payload" field of the rucio heart-beats. + if reservation successful, returns the heartbeat payload used for the reservation. Otherwise, returns None + """ + + rse_hostname_key = "%s,%s" % (rse.id, hostname) + _, total_workers, logger = heartbeat_handler.live(payload=rse_hostname_key) + payload_cnt = list_payload_counts( + heartbeat_handler.executable, older_than=heartbeat_handler.older_than ) - ) - - -def run_once(rses, include_rses, exclude_rses, vos, chunk_size, greedy, scheme, - delay_seconds, auto_exclude_threshold, auto_exclude_timeout, - heartbeat_handler, **_kwargs): - - must_sleep = True - - _, total_workers, logger = heartbeat_handler.live() - logger(logging.INFO, 'Reaper started') - - # try to get auto exclude parameters from the config table. Otherwise use CLI parameters. - auto_exclude_threshold = config_get_int('reaper', 'auto_exclude_threshold', default=auto_exclude_threshold, raise_exception=False) - auto_exclude_timeout = config_get_int('reaper', 'auto_exclude_timeout', default=auto_exclude_timeout, raise_exception=False) - # Check if there is a Judge Evaluator backlog - max_evaluator_backlog_count = config_get_int('reaper', 'max_evaluator_backlog_count', default=None, raise_exception=False) - max_evaluator_backlog_duration = config_get_int('reaper', 'max_evaluator_backlog_duration', default=None, raise_exception=False) - if max_evaluator_backlog_count or max_evaluator_backlog_duration: - backlog = get_evaluation_backlog() - count_is_hit = max_evaluator_backlog_count and backlog[0] and backlog[0] > max_evaluator_backlog_count - duration_is_hit = max_evaluator_backlog_duration and backlog[1] and backlog[1] < datetime.utcnow() - timedelta(minutes=max_evaluator_backlog_duration) - if count_is_hit and duration_is_hit: - logger(logging.ERROR, 'Reaper: Judge evaluator backlog count and duration hit, stopping operation') - return must_sleep - elif count_is_hit: - logger(logging.ERROR, 'Reaper: Judge evaluator backlog count hit, stopping operation') - return must_sleep - elif duration_is_hit: - logger(logging.ERROR, 'Reaper: Judge evaluator backlog duration hit, stopping operation') - return must_sleep - - rses_to_process = get_rses_to_process(rses, include_rses, exclude_rses, vos) - rses_to_process = [RseData(id_=rse['id'], name=rse['rse'], columns=rse) for rse in rses_to_process] - if not rses_to_process: - logger(logging.ERROR, 'Reaper: No RSEs found. Will sleep for 30 seconds') - return must_sleep - - # On big deletion campaigns, we desire to re-iterate fast on RSEs which have a lot of data to delete. - # The called function will return the RSEs which have more work remaining. - # Call the deletion routine again on this returned subset of RSEs. - # Scale the number of allowed iterations with the number of total reaper workers - iteration = 0 - max_fast_reiterations = int(log2(total_workers)) - while rses_to_process and iteration <= max_fast_reiterations: - rses_to_process = _run_once( - rses_to_process=rses_to_process, - chunk_size=chunk_size, - greedy=greedy, - scheme=scheme, - delay_seconds=delay_seconds, - auto_exclude_threshold=auto_exclude_threshold, - auto_exclude_timeout=auto_exclude_timeout, - heartbeat_handler=heartbeat_handler, + tot_threads_for_hostname = 0 + tot_threads_for_rse = 0 + for key in payload_cnt: + if key and key.find(",") > -1: + if key.split(",")[1] == hostname: + tot_threads_for_hostname += payload_cnt[key] + if key.split(",")[0] == str(rse.id): + tot_threads_for_rse += payload_cnt[key] + max_deletion_thread = Reaper._get_max_deletion_threads_by_hostname(hostname) + if ( + rse_hostname_key in payload_cnt + and tot_threads_for_hostname >= max_deletion_thread + ): + logger( + logging.DEBUG, + "Too many deletion threads for %s on RSE %s. Back off", + hostname, + rse.name, + ) + return None + logger( + logging.INFO, + "Nb workers on %s smaller than the limit (current %i vs max %i). Starting new worker on RSE %s", + hostname, + tot_threads_for_hostname, + max_deletion_thread, + rse.name, ) - if rses_to_process and iteration < max_fast_reiterations: - logger(logging.INFO, "Will perform fast-reiteration %d/%d with rses: %s", iteration + 1, max_fast_reiterations, [str(rse) for rse in rses_to_process]) - iteration += 1 - - if rses_to_process: - # There is still more work to be performed. - # Inform the calling context that it must call reaper again (on the full list of rses) - must_sleep = False - - return must_sleep - - -def _run_once(rses_to_process, chunk_size, greedy, scheme, - delay_seconds, auto_exclude_threshold, auto_exclude_timeout, - heartbeat_handler, **_kwargs): - - dict_rses = {} - _, total_workers, logger = heartbeat_handler.live() - tot_needed_free_space = 0 - for rse in rses_to_process: - # Check if RSE is blocklisted - if not rse.columns['availability_delete']: - logger(logging.DEBUG, 'RSE %s is blocklisted for delete', rse.name) - continue - rse.ensure_loaded(load_attributes=True) - enable_greedy = rse.attributes.get('greedyDeletion', False) or greedy - needed_free_space, only_delete_obsolete = __check_rse_usage_cached(rse, greedy=enable_greedy, logger=logger) - if needed_free_space: - dict_rses[rse] = [needed_free_space, only_delete_obsolete, enable_greedy] - tot_needed_free_space += needed_free_space - elif only_delete_obsolete: - dict_rses[rse] = [needed_free_space, only_delete_obsolete, enable_greedy] - else: - logger(logging.DEBUG, 'Nothing to delete on %s', rse.name) - - rses_with_params = [(rse, needed_free_space, only_delete_obsolete, enable_greedy) - for rse, (needed_free_space, only_delete_obsolete, enable_greedy) in dict_rses.items()] - - # Ordering the RSEs based on the needed free space - sorted_rses = sorted(rses_with_params, key=lambda x: x[1], reverse=True) - log_msg_str = ', '.join(f'{rse}:{needed_free_space}:{only_delete_obsolete}:{enable_greedy}' - for rse, needed_free_space, only_delete_obsolete, enable_greedy in sorted_rses) - logger(logging.DEBUG, 'List of RSEs to process ordered by needed space desc: %s', log_msg_str) + logger( + logging.DEBUG, + "Total deletion workers for %s : %i", + hostname, + tot_threads_for_hostname + 1, + ) + return rse_hostname_key + + @staticmethod + def __check_rse_usage_cached( + rse: RseData, + greedy: bool = False, + logger: "Callable[..., Any]" = logging.log, + ) -> tuple[int, bool]: + """ + Wrapper around __check_rse_usage which manages the cache entry. + """ + cache_key = "rse_usage_%s" % rse.id + result = REGION.get(cache_key) + if isinstance(result, NoValue): + result = Reaper.__check_rse_usage(rse=rse, greedy=greedy, logger=logger) + REGION.set(cache_key, result) + return result - random.shuffle(rses_with_params) + @staticmethod + def __check_rse_usage( + rse: RseData, greedy: bool = False, logger: "Callable[..., Any]" = logging.log + ) -> tuple[int, bool]: + """ + Internal method to check RSE usage and limits. + + :param rse_name: The RSE name. + :param rse_id: The RSE id. + :param greedy: If True, needed_free_space will be set to 1TB regardless of actual rse usage. + + :returns: needed_free_space, only_delete_obsolete. + """ + + needed_free_space = 0 + # First of all check if greedy mode is enabled for this RSE + if greedy: + return 1000000000000, False + + rse.ensure_loaded(load_limits=True, load_usage=True, load_attributes=True) + available_sources = {} + available_sources["total"] = {key["source"]: key["total"] for key in rse.usage} + available_sources["used"] = {key["source"]: key["used"] for key in rse.usage} + + # Get RSE limits + min_free_space = rse.limits.get("MinFreeSpace", 0) + + # Check from which sources to get used and total spaces (default storage) + # If specified sources do not exist, only delete obsolete + source_for_total_space = rse.attributes.get("source_for_total_space", "storage") + if source_for_total_space not in available_sources["total"]: + logger( + logging.WARNING, + "RSE: %s, '%s' requested for source_for_total_space but cannot be found. Will only delete obsolete", + rse.name, + source_for_total_space, + ) + return 0, True + source_for_used_space = rse.attributes.get("source_for_used_space", "storage") + if source_for_used_space not in available_sources["used"]: + logger( + logging.WARNING, + "RSE: %s, '%s' requested for source_for_used_space but cannot be found. Will only delete obsolete", + rse.name, + source_for_used_space, + ) + return 0, True + + logger( + logging.DEBUG, + "RSE: %s, source_for_total_space: %s, source_for_used_space: %s", + rse.name, + source_for_total_space, + source_for_used_space, + ) - work_remaining_by_rse = {} - paused_rses = [] - for rse, needed_free_space, only_delete_obsolete, enable_greedy in rses_with_params: - result = REGION.get('pause_deletion_%s' % rse.id, expiration_time=120) - if not isinstance(result, NoValue): - paused_rses.append(rse.name) - logger(logging.DEBUG, 'Not enough replicas to delete on %s during the previous cycle. Deletion paused for a while', rse.name) - continue + # Get total and used space + total = available_sources["total"][source_for_total_space] + used = available_sources["used"][source_for_used_space] - result = REGION.get('temporary_exclude_%s' % rse.id, expiration_time=auto_exclude_timeout) - if not isinstance(result, NoValue): - logger(logging.WARNING, 'Too many failed attempts for %s in last cycle. RSE is temporarly excluded.', rse.name) - EXCLUDED_RSE_GAUGE.labels(rse=rse.name).set(1) - continue - EXCLUDED_RSE_GAUGE.labels(rse=rse.name).set(0) - - percent = 0 - if tot_needed_free_space: - percent = needed_free_space / tot_needed_free_space * 100 - logger(logging.DEBUG, 'Working on %s. Percentage of the total space needed %.2f', rse.name, percent) - - rse_hostname = _rse_deletion_hostname(rse, scheme) - if not rse_hostname: - if scheme: - logger(logging.WARNING, 'Protocol %s not supported on %s', scheme, rse.name) - else: - logger(logging.WARNING, 'No default delete protocol for %s', rse.name) - REGION.set('pause_deletion_%s' % rse.id, True) - continue + free = total - used + if min_free_space: + needed_free_space = min_free_space - free - hb_payload = __try_reserve_worker_slot(heartbeat_handler=heartbeat_handler, rse=rse, hostname=rse_hostname, logger=logger) - if not hb_payload: - # Might need to reschedule a try on this RSE later in the same cycle - continue + # If needed_free_space negative, nothing to delete except if some Epoch tombstoned replicas + if needed_free_space > 0: + return needed_free_space, False - # List and mark BEING_DELETED the files to delete - del_start_time = time.time() - try: - with METRICS.timer('list_unlocked_replicas'): - if only_delete_obsolete: - logger(logging.DEBUG, 'Will run list_and_mark_unlocked_replicas on %s. No space needed, will only delete EPOCH tombstoned replicas', rse.name) - replicas = list_and_mark_unlocked_replicas(limit=chunk_size, - bytes_=needed_free_space, - rse_id=rse.id, - delay_seconds=delay_seconds, - only_delete_obsolete=only_delete_obsolete, - session=None) - logger(logging.DEBUG, 'list_and_mark_unlocked_replicas on %s for %s bytes in %s seconds: %s replicas', rse.name, needed_free_space, time.time() - del_start_time, len(replicas)) - if (len(replicas) == 0 and enable_greedy) or (len(replicas) < chunk_size and not enable_greedy): - logger(logging.DEBUG, 'Not enough replicas to delete on %s (%s requested vs %s returned). Will skip any new attempts on this RSE until next cycle', rse.name, chunk_size, len(replicas)) - REGION.set('pause_deletion_%s' % rse.id, True) - work_remaining_by_rse[rse] = False - else: - work_remaining_by_rse[rse] = True - - except (DatabaseException, IntegrityError, DatabaseError) as error: - logger(logging.ERROR, '%s', str(error)) - continue - except Exception: - logger(logging.CRITICAL, 'Exception', exc_info=True) - continue - # Physical deletion will take place there - try: - rse.ensure_loaded(load_info=True, load_attributes=True) - prot = rsemgr.create_protocol(rse.info, 'delete', scheme=scheme, logger=logger) - if rse.attributes.get('oidc_support') is True and prot.attributes['scheme'] == 'davs': - audience = config_get('reaper', 'oidc_audience', False) or determine_audience_for_rse(rse.id) - # FIXME: At the time of writing, StoRM requires `storage.read` - # in order to perform a stat operation. - scope = determine_scope_for_rse(rse.id, scopes=['storage.modify', 'storage.read']) - auth_token = request_token(audience, scope) - if auth_token: - logger(logging.INFO, 'Using a token to delete on RSE %s', rse.name) - prot = rsemgr.create_protocol(rse.info, 'delete', scheme=scheme, auth_token=auth_token, logger=logger) - else: - logger(logging.WARNING, 'Failed to procure a token to delete on RSE %s', rse.name) - for file_replicas in chunks(replicas, chunk_size): - # Refresh heartbeat - _, total_workers, logger = heartbeat_handler.live(payload=hb_payload) - del_start_time = time.time() - for replica in file_replicas: - try: - replica['pfn'] = str(list(rsemgr.lfns2pfns(rse_settings=rse.info, - lfns=[{'scope': replica['scope'].external, 'name': replica['name'], 'path': replica['path']}], - operation='delete', scheme=scheme).values())[0]) - except (ReplicaUnAvailable, ReplicaNotFound) as error: - logger(logging.WARNING, 'Failed get pfn UNAVAILABLE replica %s:%s on %s with error %s', replica['scope'], replica['name'], rse.name, str(error)) - replica['pfn'] = None - - except Exception: - logger(logging.CRITICAL, 'Exception', exc_info=True) - - is_staging = rse.columns['staging_area'] - deleted_files = delete_from_storage(heartbeat_handler, hb_payload, file_replicas, prot, rse.info, is_staging, auto_exclude_threshold, logger=logger) - logger(logging.INFO, '%i files processed in %s seconds', len(file_replicas), time.time() - del_start_time) - - # Then finally delete the replicas - del_start = time.time() - delete_replicas(rse_id=rse.id, files=deleted_files) - logger(logging.DEBUG, 'delete_replicas successed on %s : %s replicas in %s seconds', rse.name, len(deleted_files), time.time() - del_start) - METRICS.counter('deletion.done').inc(len(deleted_files)) - except RSEProtocolNotSupported: - logger(logging.WARNING, 'Protocol %s not supported on %s', scheme, rse.name) - except Exception: - logger(logging.CRITICAL, 'Exception', exc_info=True) - - if paused_rses: - logger(logging.INFO, 'Deletion paused for a while for following RSEs: %s', ', '.join(paused_rses)) - - rses_with_more_work = [rse for rse, has_more_work in work_remaining_by_rse.items() if has_more_work] - return rses_with_more_work - - -def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None: - """ - Graceful exit. - """ - GRACEFUL_STOP.set() - - -def run(threads=1, chunk_size=100, once=False, greedy=False, rses=None, scheme=None, exclude_rses=None, include_rses=None, vos=None, delay_seconds=0, sleep_time=60, auto_exclude_threshold=100, auto_exclude_timeout=600): - """ - Starts up the reaper threads. - - :param threads: The total number of workers. - :param chunk_size: The size of chunk for deletion. - :param threads_per_worker: Total number of threads created by each worker. - :param once: If True, only runs one iteration of the main loop. - :param greedy: If True, delete right away replicas with tombstone. - :param rses: List of RSEs the reaper should work against. - If empty, it considers all RSEs. - :param scheme: Force the reaper to use a particular protocol/scheme, e.g., mock. - :param exclude_rses: RSE expression to exclude RSEs from the Reaper. - :param include_rses: RSE expression to include RSEs. - :param vos: VOs on which to look for RSEs. Only used in multi-VO mode. - If None, we either use all VOs if run from "def", - or the current VO otherwise. - :param delay_seconds: The delay to query replicas in BEING_DELETED state. - :param sleep_time: Time between two cycles. - :param auto_exclude_threshold: Number of service unavailable exceptions after which the RSE gets temporarily excluded. - :param auto_exclude_timeout: Timeout for temporarily excluded RSEs. - """ - setup_logging(process_name=DAEMON_NAME) - - if rucio.db.sqla.util.is_old_db(): - raise DatabaseException('Database was not updated, daemon won\'t start') - - logging.log(logging.INFO, 'main: starting processes') - rses_to_process = get_rses_to_process(rses, include_rses, exclude_rses, vos) - if not rses_to_process: - logging.log(logging.ERROR, 'Reaper: No RSEs found. Exiting.') - return - - logging.log(logging.INFO, 'Reaper: This instance will work on RSEs: %s', ', '.join([rse['rse'] for rse in rses_to_process])) - - logging.log(logging.INFO, 'starting reaper threads') - threads_list = [threading.Thread(target=reaper, kwargs={'once': once, - 'rses': rses, - 'include_rses': include_rses, - 'exclude_rses': exclude_rses, - 'vos': vos, - 'chunk_size': chunk_size, - 'greedy': greedy, - 'sleep_time': sleep_time, - 'delay_seconds': delay_seconds, - 'scheme': scheme, - 'auto_exclude_threshold': auto_exclude_threshold, - 'auto_exclude_timeout': auto_exclude_timeout}) for _ in range(0, threads)] - - for thread in threads_list: - thread.start() - - logging.log(logging.INFO, 'waiting for interrupts') - - # Interruptible joins require a timeout. - while threads_list: - threads_list = [thread.join(timeout=3.14) for thread in threads_list if thread and thread.is_alive()] + return 0, True diff --git a/lib/rucio/daemons/undertaker/undertaker.py b/lib/rucio/daemons/undertaker/undertaker.py index b1060a0ef9..634a701a4c 100644 --- a/lib/rucio/daemons/undertaker/undertaker.py +++ b/lib/rucio/daemons/undertaker/undertaker.py @@ -24,6 +24,7 @@ from datetime import datetime, timedelta from random import randint from re import match +from typing import Any from sqlalchemy.exc import DatabaseError from rucio.db.sqla.constants import ( @@ -43,18 +44,22 @@ METRICS = MetricManager(module=__name__) graceful_stop = threading.Event() -DAEMON_NAME = "undertaker" class Undertaker(Daemon): def __init__(self, chunk_size: int = 10, **_kwargs) -> None: + """ + :param chunk_size: Size of each chunk of DIDs to process + """ super().__init__(daemon_name="undertaker", **_kwargs) self.chunk_size = chunk_size self.paused_dids = {} - def _run_once(self, heartbeat_handler: HeartbeatHandler, **_kwargs) -> None: + def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> tuple[bool, Any]: worker_number, total_workers, logger = heartbeat_handler.live() + must_sleep = False + try: # Refresh paused dids iter_paused_dids = deepcopy(self.paused_dids) @@ -74,7 +79,7 @@ def _run_once(self, heartbeat_handler: HeartbeatHandler, **_kwargs) -> None: if not dids: logger(logging.INFO, "did not get any work") - return + return must_sleep, None for chunk in chunks(dids, self.chunk_size): _, _, logger = heartbeat_handler.live() @@ -109,3 +114,4 @@ def _run_once(self, heartbeat_handler: HeartbeatHandler, **_kwargs) -> None: logger(logging.ERROR, "Got database error %s.", str(e)) except: logging.critical(traceback.format_exc()) + return must_sleep, None diff --git a/tests/test_abacus_account.py b/tests/test_abacus_account.py index f9ab722e8f..5dd29d06e3 100644 --- a/tests/test_abacus_account.py +++ b/tests/test_abacus_account.py @@ -22,7 +22,7 @@ from rucio.core.account_limit import get_local_account_usage, set_local_account_limit from rucio.daemons.abacus.account import account_update from rucio.daemons.judge import cleaner -from rucio.daemons.reaper import reaper +from rucio.daemons.reaper.reaper import Reaper from rucio.db.sqla import models from rucio.db.sqla.session import get_session @@ -71,6 +71,8 @@ def test_abacus_account(self, vo, root_account, mock_scope, rse_factory, did_fac assert account_usages['files'] == 0 if vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (str(vo), rse), greedy=True) + reaper = Reaper(once=True, include_rses='vo=%s&(%s)' % (str(vo), rse), greedy=True) + reaper.run() else: - reaper.run(once=True, include_rses=rse, greedy=True) + reaper = Reaper(once=True, include_rses='vo=%s&(%s)' % (str(vo), rse), greedy=True) + reaper.run() diff --git a/tests/test_abacus_collection_replica.py b/tests/test_abacus_collection_replica.py index 5193ab5ad4..6ff405e879 100644 --- a/tests/test_abacus_collection_replica.py +++ b/tests/test_abacus_collection_replica.py @@ -23,7 +23,8 @@ from rucio.core.replica import delete_replicas, get_cleaned_updated_collection_replicas from rucio.daemons.abacus import collection_replica from rucio.daemons.judge import cleaner -from rucio.daemons.reaper import reaper +import rucio.daemons.reaper.reaper +from rucio.daemons.reaper.reaper import Reaper from rucio.daemons.undertaker.undertaker import Undertaker from rucio.db.sqla import models, session from rucio.db.sqla.constants import DIDType, ReplicaState @@ -61,9 +62,12 @@ def test_abacus_collection_replica_cleanup(self, vo, mock_scope, rse_factory, di undertaker.run() cleaner.run(once=True) if vo: - reaper.run(once=True, include_rses='vo=%s&(%s|%s)' % (str(vo), rse1, rse2), greedy=True) + + reaper = Reaper(once=True, include_rses='vo=%s&(%s|%s)' % (str(vo), rse1, rse2), greedy=True) + reaper.run() else: - reaper.run(once=True, include_rses='(%s|%s)' % (rse1, rse2), greedy=True) + reaper = Reaper(once=True, include_rses='(%s|%s)' % (rse1, rse2), greedy=True) + reaper.run() def test_abacus_collection_replica(self, vo, mock_scope, rse_factory, did_factory, rucio_client): """ ABACUS (COLLECTION REPLICA): Test update of collection replica. """ @@ -113,11 +117,13 @@ def test_abacus_collection_replica(self, vo, mock_scope, rse_factory, did_factor # Delete all files -> collection replica should be deleted # Old behaviour (doesn't delete the DID) cleaner.run(once=True) - reaper.REGION.invalidate() + rucio.daemons.reaper.reaper.REGION.invalidate() if vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (str(vo), rse), greedy=True) + reaper = Reaper(once=True, include_rses='vo=%s&(%s)' % (str(vo), rse), greedy=True) + reaper.run() else: - reaper.run(once=True, include_rses=rse, greedy=True) + reaper = Reaper(once=True, include_rses=rse, greedy=True) + reaper.run() activity = get_schema_value('ACTIVITY')['enum'][0] rucio_client.add_replication_rule([{'scope': mock_scope.external, 'name': dataset}], 1, rse, lifetime=-1, activity=activity) collection_replica.run(once=True) diff --git a/tests/test_abacus_rse.py b/tests/test_abacus_rse.py index b1e276fccb..af2757d86c 100644 --- a/tests/test_abacus_rse.py +++ b/tests/test_abacus_rse.py @@ -19,7 +19,7 @@ from rucio.core.rse import get_rse_usage from rucio.daemons.abacus.rse import rse_update from rucio.daemons.judge import cleaner -from rucio.daemons.reaper import reaper +from rucio.daemons.reaper.reaper import Reaper from rucio.db.sqla import models from rucio.db.sqla.session import get_session @@ -57,9 +57,11 @@ def test_abacus_rse(self, vo, mock_scope, rse_factory, did_factory, rucio_client rucio_client.add_replication_rule([{'scope': mock_scope.external, 'name': dataset}], 1, rse, lifetime=-1, activity=activity) cleaner.run(once=True) if vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (str(vo), rse), greedy=True) + reaper = Reaper(once=True, include_rses='vo=%s&(%s)' % (str(vo), rse), greedy=True) + reaper.run() else: - reaper.run(once=True, include_rses=rse, greedy=True) + reaper = Reaper(once=True, include_rses=rse, greedy=True) + reaper.run() rse_update(once=True) rse_usage = get_rse_usage(rse_id=rse_id)[0] assert rse_usage['used'] == 0 diff --git a/tests/test_api_external_representation.py b/tests/test_api_external_representation.py index 4fa6e23e3f..24f7370c2a 100644 --- a/tests/test_api_external_representation.py +++ b/tests/test_api_external_representation.py @@ -39,7 +39,7 @@ from rucio.core.vo import add_vo, vo_exists from rucio.daemons.abacus import rse as abacus_rse from rucio.daemons.judge import cleaner -from rucio.daemons.reaper import reaper +from rucio.daemons.reaper.reaper import Reaper from rucio.db.sqla import constants from rucio.tests.common import rse_name_generator, did_name_generator @@ -398,9 +398,11 @@ def test_api_rse(self, vo, rse_factory, vo2, account, account_name, scope_name): # clean up files cleaner.run(once=True) if vo2: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (vo, rse_mock), greedy=True) + reaper = Reaper(once=True, include_rses='vo=%s&(%s)' % (vo, rse_mock), greedy=True) + reaper.run() else: - reaper.run(once=True, include_rses=rse_mock, greedy=True) + reaper = Reaper(once=True, include_rses=rse_mock, greedy=True) + reaper.run() abacus_rse.run(once=True) out = api_rse.parse_rse_expression(f'{rse1}|{rse2}', vo=vo) diff --git a/tests/test_conveyor.py b/tests/test_conveyor.py index 4a21a2d93c..c241728c8a 100644 --- a/tests/test_conveyor.py +++ b/tests/test_conveyor.py @@ -43,7 +43,7 @@ from rucio.daemons.conveyor.stager import stager from rucio.daemons.conveyor.throttler import throttler from rucio.daemons.conveyor.receiver import receiver, GRACEFUL_STOP as receiver_graceful_stop, Receiver -from rucio.daemons.reaper.reaper import reaper +from rucio.daemons.reaper.reaper import Reaper from rucio.db.sqla import models from rucio.db.sqla.constants import LockState, RequestState, RequestType, ReplicaState, RSEType, RuleState from rucio.db.sqla.session import read_session, transactional_session @@ -132,9 +132,9 @@ def do_GET(self): self.send_code_and_message(200, {'Content-Type': 'application/json'}, file_content) with MockServer(_SendScitagsJson) as mock_server: - core_config.set('packet-marking', 'enabled', True) - core_config.set('packet-marking', 'fetch_url', mock_server.base_url) - core_config.set('packet-marking', 'exp_name', 'atlas') + core_config.set('packet-marking', 'enabled', True) + core_config.set('packet-marking', 'fetch_url', mock_server.base_url) + core_config.set('packet-marking', 'exp_name', 'atlas') yield mock_server @@ -209,7 +209,8 @@ def __fake_source_ranking(*, session=None): # The intermediate replica is protected by its state (Copying) rucio.daemons.reaper.reaper.REGION.invalidate() - reaper(once=True, rses=[], include_rses=jump_rse_name, exclude_rses=None) + reaper = Reaper(once=True, rses=[], include_rses=jump_rse_name, exclude_rses=None) + reaper._call_daemon() replica = replica_core.get_replica(rse_id=jump_rse_id, **did) assert replica['state'] == ReplicaState.COPYING @@ -230,7 +231,8 @@ def __fake_source_ranking(*, session=None): assert replica['state'] == ReplicaState.AVAILABLE rucio.daemons.reaper.reaper.REGION.invalidate() - reaper(once=True, rses=[], include_rses='test_container_xrd=True', exclude_rses=None) + reaper2 = Reaper(once=True, rses=[], include_rses='test_container_xrd=True', exclude_rses=None) + reaper2._call_daemon() with pytest.raises(ReplicaNotFound): replica_core.get_replica(rse_id=jump_rse_id, **did) @@ -1496,7 +1498,8 @@ def on_submit(file): # One of the intermediate replicas is eligible for deletion. Others are blocked by entries in source table reaper_cache_region.invalidate() - reaper(once=True, rses=[], include_rses='|'.join([rse2, rse3, rse4, rse6]), exclude_rses=None) + reaper = Reaper(once=True, rses=[], include_rses='|'.join([rse2, rse3, rse4, rse6]), exclude_rses=None) + reaper._call_daemon() with pytest.raises(ReplicaNotFound): replica_core.get_replica(rse_id=rse_id_second_to_last_submit, **did) for rse_id in [rse2_id, rse3_id, rse_id_second_to_last_queued]: @@ -1509,7 +1512,7 @@ def on_submit(file): # All intermediate replicas can be deleted reaper_cache_region.invalidate() - reaper(once=True, rses=[], include_rses='|'.join([rse2, rse3, rse4, rse6]), exclude_rses=None) + reaper._call_daemon() for rse_id in [rse2_id, rse3_id, rse4_id, rse6_id]: with pytest.raises(ReplicaNotFound): replica_core.get_replica(rse_id=rse_id, **did) diff --git a/tests/test_conveyor_submitter.py b/tests/test_conveyor_submitter.py index 03187eccdf..06e1c9584c 100644 --- a/tests/test_conveyor_submitter.py +++ b/tests/test_conveyor_submitter.py @@ -29,7 +29,7 @@ from rucio.core import rule as rule_core from rucio.core import config as core_config from rucio.daemons.conveyor.submitter import submitter -from rucio.daemons.reaper.reaper import reaper +from rucio.daemons.reaper.reaper import Reaper from rucio.db.sqla.models import Request, Source from rucio.db.sqla.constants import RequestState from rucio.db.sqla.session import read_session, transactional_session @@ -218,7 +218,8 @@ def test_source_avoid_deletion(caches_mock, rse_factory, did_factory, root_accou # Reaper will not delete a file which only has one replica if there is any pending transfer for it reaper_region.invalidate() - reaper(once=True, rses=[], include_rses=any_source, exclude_rses=None) + reaper = Reaper(once=True, rses=[], include_rses=any_source, exclude_rses=None) + reaper._call_daemon() replica = next(iter(replica_core.list_replicas(dids=[did], rse_expression=any_source))) assert len(replica['pfns']) == 1 @@ -232,7 +233,7 @@ def test_source_avoid_deletion(caches_mock, rse_factory, did_factory, root_accou # None of the replicas will be removed. They are protected by an entry in the sources table reaper_region.invalidate() - reaper(once=True, rses=[], include_rses=any_source, exclude_rses=None) + reaper._call_daemon() replica = next(iter(replica_core.list_replicas(dids=[did], rse_expression=any_source))) assert len(replica['pfns']) == 2 @@ -247,7 +248,7 @@ def __delete_sources(rse_id, scope, name, *, session=None): __delete_sources(src_rse1_id, **did) __delete_sources(src_rse2_id, **did) reaper_region.invalidate() - reaper(once=True, rses=[], include_rses=any_source, exclude_rses=None) + reaper._call_daemon() replica = next(iter(replica_core.list_replicas(dids=[did], rse_expression=any_source))) assert len(replica['pfns']) == 1 diff --git a/tests/test_daemons.py b/tests/test_daemons.py index a67fdab915..e3178cbe78 100644 --- a/tests/test_daemons.py +++ b/tests/test_daemons.py @@ -30,7 +30,7 @@ from rucio.daemons.hermes import hermes from rucio.daemons.judge import cleaner, evaluator, injector, repairer from rucio.daemons.oauthmanager import oauthmanager -from rucio.daemons.reaper import dark_reaper, reaper +from rucio.daemons.reaper import dark_reaper from rucio.daemons.replicarecoverer import suspicious_replica_recoverer from rucio.daemons.tracer import kronos from rucio.daemons.transmogrifier import transmogrifier @@ -62,7 +62,6 @@ repairer, oauthmanager, dark_reaper, - reaper, suspicious_replica_recoverer, kronos, transmogrifier, @@ -85,10 +84,11 @@ def test_fail_on_old_database_parametrized(mock_is_old_db, daemon): assert mock_is_old_db.call_count > 1 -class TestDaemon(Daemon): +class DaemonTest(Daemon): def _run_once(self, heartbeat_handler, **_kwargs): pass + @mock.patch('rucio.db.sqla.util.is_old_db') def test_fail_on_old_database(mock_is_old_db): """ DAEMON: Test daemon failure on old database """ @@ -96,6 +96,6 @@ def test_fail_on_old_database(mock_is_old_db): assert rucio.db.sqla.util.is_old_db() is True with pytest.raises(exception.DatabaseException, match='Database was not updated, daemon won\'t start'): - TestDaemon().run() + DaemonTest().run() assert mock_is_old_db.call_count > 1 diff --git a/tests/test_reaper.py b/tests/test_reaper.py index a3674eeb79..0d5b0ad5f6 100644 --- a/tests/test_reaper.py +++ b/tests/test_reaper.py @@ -31,9 +31,8 @@ from rucio.core import replica as replica_core from rucio.core import rse as rse_core from rucio.core import rule as rule_core -from rucio.daemons.reaper.reaper import reaper +from rucio.daemons.reaper.reaper import Reaper from rucio.daemons.reaper.dark_reaper import reaper as dark_reaper -from rucio.daemons.reaper.reaper import run as run_reaper from rucio.db.sqla.models import ConstituentAssociationHistory from rucio.db.sqla.session import read_session from rucio.tests.common import rse_name_generator, skip_rse_tests_with_accounts @@ -90,14 +89,15 @@ def test_reaper(vo, caches_mock, message_mock): # Check first if the reaper does not delete anything if no space is needed cache_region.invalidate() rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=323000000000) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + reaper = Reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + reaper._call_daemon() assert len(list(replica_core.list_replicas(dids=dids, rse_expression=rse_name))) == nb_files # Now put it over threshold and delete cache_region.invalidate() rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=1) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + reaper._call_daemon() + reaper._call_daemon() assert len(list(replica_core.list_replicas(dids, rse_expression=rse_name))) == 200 msgs = message_core.retrieve_messages() @@ -128,14 +128,15 @@ def test_reaper_bulk_delete(vo, caches_mock): # Check first if the reaper does not delete anything if no space is needed cache_region.invalidate() rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=323000000000) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='MOCK') + reaper = Reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='MOCK') + reaper._call_daemon() assert len(list(replica_core.list_replicas(dids=dids, rse_expression=rse_name))) == nb_files # Now put it over threshold and delete cache_region.invalidate() rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=1) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='MOCK') - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='MOCK') + reaper._call_daemon() + reaper._call_daemon() assert len(list(replica_core.list_replicas(dids, rse_expression=rse_name))) == 200 @@ -167,7 +168,8 @@ def test_reaper_multi_vo_via_run(vo, second_vo, scope_factory, caches_mock): # Check we reap all VOs by default cache_region.invalidate() - run_reaper(once=True, rses=[rse_name]) + reaper = Reaper(once=True, rses=[rse_name]) + reaper.run() assert len(list(replica_api.list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, vo=vo))) == 25 assert len(list(replica_api.list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, vo=new_vo))) == 25 @@ -200,7 +202,8 @@ def test_reaper_affect_other_vo_via_run(vo, second_vo, scope_factory, caches_moc # Check we don't affect a second VO that isn't specified cache_region.invalidate() - run_reaper(once=True, rses=[rse_name], vos=['new']) + reaper = Reaper(once=True, rses=[rse_name], vos=['new']) + reaper.run() assert len(list(replica_api.list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, vo=vo))) == nb_files assert len(list(replica_api.list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, vo=new_vo))) == 25 @@ -229,8 +232,9 @@ def test_reaper_multi_vo(vo, second_vo, scope_factory, caches_mock): rse_core.set_rse_usage(rse_id=rse1_id, source='storage', used=nb_files * file_size, free=1) rse_core.set_rse_usage(rse_id=rse2_id, source='storage', used=nb_files * file_size, free=1) both_rses = '%s|%s' % (rse1_name, rse2_name) - reaper(once=True, rses=[], include_rses=both_rses, exclude_rses=None) - reaper(once=True, rses=[], include_rses=both_rses, exclude_rses=None) + reaper = Reaper(once=True, rses=[], include_rses=both_rses, exclude_rses=None) + reaper._call_daemon() + reaper._call_daemon() assert len(list(replica_core.list_replicas(dids=dids1, rse_expression=both_rses))) == 200 assert len(list(replica_core.list_replicas(dids=dids2, rse_expression=both_rses))) == 200 @@ -295,7 +299,8 @@ def __get_archive_contents_history_count(archive, *, session=None): cache_region.invalidate() rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=2 * archive_size + nb_c_outside_archive * constituent_size) rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=2 * archive_size + nb_c_outside_archive * constituent_size, free=1) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + reaper = Reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + reaper._call_daemon() for did in constituents + [archive1, archive2]: assert did_core.get_did(**did) for did in [archive1, archive2, c_with_replica, c_with_replica_and_rule]: @@ -322,7 +327,7 @@ def __get_archive_contents_history_count(archive, *, session=None): cache_region.invalidate() rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=2 * archive_size + nb_c_outside_archive * constituent_size) rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=2 * archive_size + nb_c_outside_archive * constituent_size, free=1) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + reaper._call_daemon() with pytest.raises(DataIdentifierNotFound): assert did_core.get_did(**archive1) with pytest.raises(DataIdentifierNotFound): @@ -343,7 +348,7 @@ def __get_archive_contents_history_count(archive, *, session=None): cache_region.invalidate() rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=archive_size + nb_c_outside_archive * constituent_size) rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=archive_size + nb_c_outside_archive * constituent_size, free=1) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + reaper._call_daemon() # The archive must be removed with pytest.raises(DataIdentifierNotFound): assert did_core.get_did(**archive2) @@ -394,7 +399,8 @@ def test_archive_of_deleted_dids(vo, did_factory, root_account, core_config_mock reaper_cache_region.invalidate() rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=323000000000) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, greedy=True) + reaper = Reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, greedy=True) + reaper._call_daemon() assert len(list(replica_core.list_replicas(dids=dids, rse_expression=rse_name))) == 0 file_clause = [] @@ -446,7 +452,8 @@ def test_run_on_non_existing_scheme(vo, caches_mock): # The reaper will set a flag pause_deletion_ cache_region.invalidate() rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=1) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='https') + reaper = Reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='https') + reaper._call_daemon() assert len(list(replica_core.list_replicas(dids, rse_expression=rse_name))) == 250 assert cache_region.get('pause_deletion_%s' % rse_id) @@ -470,7 +477,8 @@ def test_reaper_without_rse_usage(vo, caches_mock): # Check first if the reaper does not delete anything if there's nothing obsolete cache_region.invalidate() - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='MOCK') + reaper = Reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='MOCK') + reaper._call_daemon() assert len(list(replica_core.list_replicas(dids=dids, rse_expression=rse_name))) == nb_files # Now set Epoch tombstone for a few replicas @@ -480,7 +488,7 @@ def test_reaper_without_rse_usage(vo, caches_mock): # The reaper should delete the replica with Epoch tombstone even if the rse_usage is not set cache_region.invalidate() - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='MOCK') + reaper._call_daemon() assert len(list(replica_core.list_replicas(dids, rse_expression=rse_name))) == nb_files - nb_epoch_tombstone @@ -499,4 +507,5 @@ def test_deletion_with_tokens(vo, did_factory, root_account, caches_mock, file_c for rule in list(rule_core.list_associated_rules_for_file(**did)): rule_core.delete_rule(rule['id']) - reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, greedy=True, scheme='davs') + reaper = Reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, greedy=True, scheme='davs') + reaper._call_daemon() diff --git a/tools/count_missing_type_annotations_utils.sh b/tools/count_missing_type_annotations_utils.sh index 3dea91b802..a10e0e185e 100755 --- a/tools/count_missing_type_annotations_utils.sh +++ b/tools/count_missing_type_annotations_utils.sh @@ -15,14 +15,14 @@ # limitations under the License. # Script with all tools to count the missing python type annotations in the -# project. Installes all necessary python packages temporarly if needed. To use +# project. Installs all necessary python packages temporarily if needed. To use # it run: `scource count_missing_annotations_utils.sh`. set -e ensure_install() { - # Checks if a python package is installed via pip. It installes the package, + # Checks if a python package is installed via pip. It installs the package, # and removes it after the script run automatically. # # All debug output is redirected to the stderr stream, to not interfear with