diff --git a/bin/rucio-abacus-account b/bin/rucio-abacus-account index 5070087e24..cba5105b3a 100755 --- a/bin/rucio-abacus-account +++ b/bin/rucio-abacus-account @@ -20,8 +20,8 @@ Abacus account is a daemon to update account counters. import argparse import signal - -from rucio.daemons.abacus.account import run, stop +from rucio.daemons.abacus.account import AbacusAccount +from rucio.daemons.abacus.common import ABACUS_HISTORY_TABLE_INTERVAL def get_parser(): @@ -57,19 +57,18 @@ Check account usage again:: ''', formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only') parser.add_argument("--threads", action="store", default=1, type=int, help='Concurrency control: total number of threads on this process') - parser.add_argument("--enable-history", action="store_true", default=False, help='Record account usage into history table every hour.') + parser.add_argument("--enable-history", action="store_true", default=False, help=f'Record account usage into history table every {ABACUS_HISTORY_TABLE_INTERVAL} seconds.') parser.add_argument('--sleep-time', action="store", default=10, type=int, help='Concurrency control: thread sleep time after each chunk of work') return parser if __name__ == "__main__": - - signal.signal(signal.SIGTERM, stop) - parser = get_parser() args = parser.parse_args() + abacus_account = AbacusAccount(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time) + signal.signal(signal.SIGTERM, abacus_account.stop) try: - run(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time) + abacus_account.run() except KeyboardInterrupt: - stop() + abacus_account.stop() diff --git a/bin/rucio-judge-repairer b/bin/rucio-judge-repairer index 7066eb9f0d..07eaa6b61f 100755 --- a/bin/rucio-judge-repairer +++ b/bin/rucio-judge-repairer @@ -19,6 +19,7 @@ Judge-Repairer is a daemon to repair stuck replication rules. """ import argparse +import signal from rucio.daemons.judge.repairer import JudgeRepairer @@ -38,6 +39,7 @@ if __name__ == "__main__": parser = get_parser() args = parser.parse_args() judge_repairer = JudgeRepairer(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time) + signal.signal(signal.SIGTERM, judge_repairer.stop) try: judge_repairer.run() except KeyboardInterrupt: diff --git a/lib/rucio/core/rse_expression_parser.py b/lib/rucio/core/rse_expression_parser.py index a739d4e678..5a87d937fd 100644 --- a/lib/rucio/core/rse_expression_parser.py +++ b/lib/rucio/core/rse_expression_parser.py @@ -42,7 +42,7 @@ @transactional_session -def parse_expression(expression: str, filter_=None, *, session: "Session"): +def parse_expression(expression, filter_=None, *, session: "Session"): """ Parse a RSE expression and return the list of RSE dictionaries. diff --git a/lib/rucio/daemons/abacus/account.py b/lib/rucio/daemons/abacus/account.py index 78de5d45c4..e19bf98bb5 100644 --- a/lib/rucio/daemons/abacus/account.py +++ b/lib/rucio/daemons/abacus/account.py @@ -18,89 +18,37 @@ """ import logging -import threading import time -from typing import TYPE_CHECKING +from typing import Any -import rucio.db.sqla.util -from rucio.common import exception -from rucio.common.logging import setup_logging -from rucio.common.utils import get_thread_with_periodic_running_function -from rucio.core.account_counter import get_updated_account_counters, update_account_counter, fill_account_counter_history_table -from rucio.daemons.common import run_daemon +from rucio.core.account_counter import get_updated_account_counters, update_account_counter +from rucio.daemons.common import HeartbeatHandler +from rucio.daemons.abacus.common import AbacusDaemon -if TYPE_CHECKING: - from types import FrameType - from typing import Optional -graceful_stop = threading.Event() -DAEMON_NAME = 'abacus-account' +class AbacusAccount(AbacusDaemon): + def __init__(self, **_kwargs) -> None: + super().__init__(daemon_name="abacus-account", **_kwargs) - -def account_update(once=False, sleep_time=10): - """ - Main loop to check and update the Account Counters. - """ - run_daemon( - once=once, - graceful_stop=graceful_stop, - executable=DAEMON_NAME, - partition_wait_time=1, - sleep_time=sleep_time, - run_once_fnc=run_once, - ) - - -def run_once(heartbeat_handler, **_kwargs): - worker_number, total_workers, logger = heartbeat_handler.live() - - start = time.time() # NOQA - account_rse_ids = get_updated_account_counters(total_workers=total_workers, - worker_number=worker_number) - logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(account_rse_ids))) - - # If the list is empty, sent the worker to sleep - if not account_rse_ids: - logger(logging.INFO, 'did not get any work') - return - - for account_rse_id in account_rse_ids: + def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> tuple[bool, Any]: worker_number, total_workers, logger = heartbeat_handler.live() - if graceful_stop.is_set(): - break - start_time = time.time() - update_account_counter(account=account_rse_id[0], rse_id=account_rse_id[1]) - logger(logging.DEBUG, 'update of account-rse counter "%s-%s" took %f' % (account_rse_id[0], account_rse_id[1], time.time() - start_time)) - - -def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None: - """ - Graceful exit. - """ - - graceful_stop.set() - - -def run(once=False, threads=1, fill_history_table=False, sleep_time=10): - """ - Starts up the Abacus-Account threads. - """ - setup_logging(process_name=DAEMON_NAME) - - if rucio.db.sqla.util.is_old_db(): - raise exception.DatabaseException('Database was not updated, daemon won\'t start') - - if once: - logging.info('main: executing one iteration only') - account_update(once) - else: - logging.info('main: starting threads') - threads = [threading.Thread(target=account_update, kwargs={'once': once, 'sleep_time': sleep_time}) for i in - range(0, threads)] - if fill_history_table: - threads.append(get_thread_with_periodic_running_function(3600, fill_account_counter_history_table, graceful_stop)) - [t.start() for t in threads] - logging.info('main: waiting for interrupts') - # Interruptible joins require a timeout. - while threads[0].is_alive(): - [t.join(timeout=3.14) for t in threads] + must_sleep = False + + start = time.time() # NOQA + account_rse_ids = get_updated_account_counters(total_workers=total_workers, + worker_number=worker_number) + logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(account_rse_ids))) + + # If the list is empty, sent the worker to sleep + if not account_rse_ids: + logger(logging.INFO, 'did not get any work') + return must_sleep, None + + for account_rse_id in account_rse_ids: + worker_number, total_workers, logger = heartbeat_handler.live() + if self.graceful_stop.is_set(): + break + start_time = time.time() + update_account_counter(account=account_rse_id[0], rse_id=account_rse_id[1]) + logger(logging.DEBUG, 'update of account-rse counter "%s-%s" took %f' % (account_rse_id[0], account_rse_id[1], time.time() - start_time)) + return must_sleep, None diff --git a/lib/rucio/daemons/abacus/common.py b/lib/rucio/daemons/abacus/common.py new file mode 100644 index 0000000000..08bfad5ca6 --- /dev/null +++ b/lib/rucio/daemons/abacus/common.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from rucio.daemons.common import Daemon, HeartbeatHandler +from rucio.common.utils import get_thread_with_periodic_running_function +from rucio.core.account_counter import fill_account_counter_history_table +import logging +import threading +from typing import Any +from abc import abstractmethod + +ABACUS_HISTORY_TABLE_INTERVAL = 3600 + + +class AbacusDaemon(Daemon): + """ + Common daemon logic for multiple Abacus daemons. + """ + def __init__(self, fill_history_table: bool = False, **_kwargs) -> None: + f""" + :param fill_history_table: Set to True to record account usage into history table every {ABACUS_HISTORY_TABLE_INTERVAL} seconds. + """ + super().__init__(**_kwargs) + self.fill_history_table = fill_history_table + self.paused_dids = {} + + @abstractmethod + def _run_once( + self, heartbeat_handler: "HeartbeatHandler", **_kwargs + ) -> tuple[bool, Any]: + pass + + def run(self) -> None: + self._pre_run_checks() + + if self.once: + logging.info(f'{self.daemon_name}: executing one iteration only') + self._call_daemon() + else: + logging.info(f'{self.daemon_name}: starting threads') + thread_list = [threading.Thread(target=self._call_daemon) for _ in + range(0, self.total_workers)] + if self.fill_history_table: + thread_list.append(get_thread_with_periodic_running_function(ABACUS_HISTORY_TABLE_INTERVAL, fill_account_counter_history_table, self.graceful_stop)) + [t.start() for t in thread_list] + logging.info(f'{self.daemon_name}: waiting for interrupts') + # Interruptible joins require a timeout. + while thread_list: + thread_list = [ + thread.join(timeout=3.14) + for thread in thread_list + if thread and thread.is_alive() + ] diff --git a/lib/rucio/daemons/common.py b/lib/rucio/daemons/common.py index d4478c8810..6a11efaa54 100644 --- a/lib/rucio/daemons/common.py +++ b/lib/rucio/daemons/common.py @@ -19,25 +19,23 @@ import os import queue import socket -import signal import threading import time -from collections.abc import Callable, Generator, Iterator, Sequence -from typing import Any, Generic, Optional, TypeVar, Union -from types import TracebackType, FrameType from abc import ABC, abstractmethod +from collections.abc import Callable, Generator, Iterator, Sequence +from types import FrameType, TracebackType +from typing import Any, Final, Generic, Optional, TypeVar, Union -import rucio.db.sqla.util +from rucio.common.exception import DatabaseException from rucio.common.logging import formatted_logger, setup_logging from rucio.common.utils import PriorityQueue +import rucio.db.sqla.util from rucio.core import heartbeat as heartbeat_core from rucio.core.monitor import MetricManager -from rucio.common.exception import DatabaseException -T = TypeVar("T") +T = TypeVar('T') METRICS = MetricManager(module=__name__) - class Daemon(ABC): """ Base daemon class @@ -58,13 +56,12 @@ def __init__( :param partition_wait_time: Time to wait for database partition rebalancing before starting the actual daemon loop. :param daemon_name: Name of daemon that is constructed. """ - self.once = once - self.total_workers = total_workers - self.sleep_time = sleep_time - self.partition_wait_time = partition_wait_time + self.once: Final[bool] = once + self.total_workers: Final[int] = total_workers + self.sleep_time: Final[int] = sleep_time + self.partition_wait_time: Final[int] = partition_wait_time self.daemon_name = daemon_name self.graceful_stop = threading.Event() - signal.signal(signal.SIGTERM, self.stop) setup_logging(process_name=daemon_name) @staticmethod @@ -118,9 +115,11 @@ def run(self) -> None: logging.info(f"{self.daemon_name}: starting threads") thread_list = [ threading.Thread(target=self._call_daemon) - for _ in range(0, self.total_workers) + for _ in range(self.total_workers) ] - [t.start() for t in thread_list] + + for t in thread_list: + t.start() if not self.once: logging.info(f"{self.daemon_name}: waiting for interrupts") @@ -134,7 +133,7 @@ def run(self) -> None: ] def stop( - self, signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None + self, signum: Optional[int] = None, frame: Optional[FrameType] = None ) -> None: """ Graceful exit the daemon. Used as handler for SIGTERM. @@ -145,7 +144,6 @@ def stop( """ self.graceful_stop.set() - class HeartbeatHandler: """ Simple contextmanager which sets a heartbeat and associated logger on entry and cleans up the heartbeat on exit. @@ -160,9 +158,7 @@ def __init__(self, executable: str, renewal_interval: int) -> None: self.executable = executable self._hash_executable = None self.renewal_interval = renewal_interval - self.older_than = ( - renewal_interval * 10 if renewal_interval and renewal_interval > 0 else None - ) # 10 was chosen without any particular reason + self.older_than = renewal_interval * 10 if renewal_interval > 0 else None # 10 was chosen without any particular reason self.hostname = socket.getfqdn() self.pid = os.getpid() @@ -178,16 +174,11 @@ def __enter__(self) -> "HeartbeatHandler": self.live() return self - def __exit__( - self, - exc_type: type[BaseException], - exc_val: BaseException, - exc_tb: TracebackType, - ) -> None: + def __exit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType): if self.last_heart_beat: heartbeat_core.die(self.executable, self.hostname, self.pid, self.hb_thread) if self.logger: - self.logger(logging.INFO, "Heartbeat cleaned up") + self.logger(logging.INFO, 'Heartbeat cleaned up') @property def hash_executable(self) -> str: @@ -199,64 +190,38 @@ def hash_executable(self) -> str: def short_executable(self) -> str: return min(self.executable, self.hash_executable, key=len) - def live( - self, force_renew: bool = False, payload: Optional[str] = None - ) -> tuple[int, int, Callable]: + def live(self, force_renew: bool = False, payload: Optional[str] = None): """ :return: a tuple: , , """ - if ( - force_renew - or not self.last_time - or not self.last_heart_beat - or self.last_time - < datetime.datetime.now() - - datetime.timedelta(seconds=self.renewal_interval) - or self.last_payload != payload - ): + if force_renew \ + or not self.last_time \ + or not self.last_heart_beat \ + or self.last_time < datetime.datetime.now() - datetime.timedelta(seconds=self.renewal_interval) \ + or self.last_payload != payload: if self.older_than: - self.last_heart_beat = heartbeat_core.live( - self.executable, - self.hostname, - self.pid, - self.hb_thread, - payload=payload, - older_than=self.older_than, - ) + self.last_heart_beat = heartbeat_core.live(self.executable, self.hostname, self.pid, self.hb_thread, payload=payload, older_than=self.older_than) else: - self.last_heart_beat = heartbeat_core.live( - self.executable, - self.hostname, - self.pid, - self.hb_thread, - payload=payload, - ) - - prefix = "[%i/%i]: " % ( - self.last_heart_beat["assign_thread"], - self.last_heart_beat["nr_threads"], - ) - self.logger = formatted_logger(logging.log, prefix + "%s") + self.last_heart_beat = heartbeat_core.live(self.executable, self.hostname, self.pid, self.hb_thread, payload=payload) + + prefix = '[%i/%i]: ' % (self.last_heart_beat['assign_thread'], self.last_heart_beat['nr_threads']) + self.logger = formatted_logger(logging.log, prefix + '%s') if not self.last_time: - self.logger(logging.DEBUG, "First heartbeat set") + self.logger(logging.DEBUG, 'First heartbeat set') else: - self.logger(logging.DEBUG, "Heartbeat renewed") + self.logger(logging.DEBUG, 'Heartbeat renewed') self.last_time = datetime.datetime.now() self.last_payload = payload - return ( - self.last_heart_beat["assign_thread"], - self.last_heart_beat["nr_threads"], - self.logger, - ) + return self.last_heart_beat['assign_thread'], self.last_heart_beat['nr_threads'], self.logger def _activity_looper( - once: bool, - sleep_time: int, - activities: Optional[Sequence[str]], - heartbeat_handler: HeartbeatHandler, + once: bool, + sleep_time: int, + activities: Optional[Sequence[str]], + heartbeat_handler: HeartbeatHandler, ) -> Generator[tuple[str, float], tuple[float, bool], None]: """ Generator which loops (either once, or indefinitely) over all activities while ensuring that `sleep_time` @@ -289,19 +254,14 @@ def _activity_looper( logger = heartbeat_handler.logger if time_to_sleep > 0: if activity: - logger( - logging.DEBUG, - "Switching to activity %s and sleeping %s seconds", - activity, - time_to_sleep, - ) + logger(logging.DEBUG, 'Switching to activity %s and sleeping %s seconds', activity, time_to_sleep) else: - logger(logging.DEBUG, "Sleeping %s seconds", time_to_sleep) + logger(logging.DEBUG, 'Sleeping %s seconds', time_to_sleep) else: if activity: - logger(logging.DEBUG, "Switching to activity %s", activity) + logger(logging.DEBUG, 'Switching to activity %s', activity) else: - logger(logging.DEBUG, "Starting next iteration") + logger(logging.DEBUG, 'Starting next iteration') # The calling context notifies us when the activity actually got handled. And if sleeping is desired. actual_exe_time, must_sleep = yield activity, time_to_sleep @@ -316,12 +276,12 @@ def _activity_looper( def db_workqueue( - once: bool, - graceful_stop: threading.Event, - executable: str, - partition_wait_time: int, - sleep_time: int, - activities: Optional[Sequence[str]] = None, + once: bool, + graceful_stop: threading.Event, + executable: str, + partition_wait_time: int, + sleep_time: int, + activities: Optional[Sequence[str]] = None, ): """ Used to wrap a function for interacting with the database as a work queue: i.e. to select @@ -337,29 +297,20 @@ def db_workqueue( :param activities: optional list of activities on which to work. The run_once_fnc will be called on activities one by one. """ - def _decorate( - run_once_fnc: Callable[..., Optional[Union[bool, tuple[bool, T]]]] - ) -> Callable[[], Iterator[Optional[T]]]: + def _decorate(run_once_fnc: Callable[..., Optional[Union[bool, tuple[bool, T]]]]) -> Callable[[], Iterator[Optional[T]]]: @functools.wraps(run_once_fnc) def _generator(): - with HeartbeatHandler( - executable=executable, renewal_interval=sleep_time - 1 - ) as heartbeat_handler: + with HeartbeatHandler(executable=executable, renewal_interval=sleep_time - 1) as heartbeat_handler: logger = heartbeat_handler.logger - logger(logging.INFO, "started") + logger(logging.INFO, 'started') if partition_wait_time: graceful_stop.wait(partition_wait_time) _, _, logger = heartbeat_handler.live(force_renew=True) - activity_loop = _activity_looper( - once=once, - sleep_time=sleep_time, - activities=activities, - heartbeat_handler=heartbeat_handler, - ) + activity_loop = _activity_looper(once=once, sleep_time=sleep_time, activities=activities, heartbeat_handler=heartbeat_handler) activity, time_to_sleep = next(activity_loop, (None, None)) while time_to_sleep is not None: if graceful_stop.is_set(): @@ -373,9 +324,7 @@ def _generator(): must_sleep = True start_time = time.time() try: - result = run_once_fnc( - heartbeat_handler=heartbeat_handler, activity=activity - ) + result = run_once_fnc(heartbeat_handler=heartbeat_handler, activity=activity) # Handle return values already existing in the code # TODO: update all existing daemons to always explicitly return (must_sleep, ret_value) @@ -391,22 +340,18 @@ def _generator(): if ret_value is not None: yield ret_value except Exception as e: - METRICS.counter("exceptions.{exception}").labels( - exception=e.__class__.__name__ - ).inc() + METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc() logger(logging.CRITICAL, "Exception", exc_info=True) if once: raise try: - activity, time_to_sleep = activity_loop.send( - (start_time, must_sleep) - ) + activity, time_to_sleep = activity_loop.send((start_time, must_sleep)) except StopIteration: break if not once: - logger(logging.INFO, "Graceful stop requested") + logger(logging.INFO, 'Graceful stop requested') return _generator @@ -414,14 +359,13 @@ def _generator(): def run_daemon( - once: bool, - graceful_stop: threading.Event, - executable: str, - partition_wait_time: int, - sleep_time: int, - run_once_fnc: Callable[..., Optional[Union[bool, tuple[bool, Any]]]], - activities: Optional[list[str]] = None, -) -> None: + once: bool, + graceful_stop: threading.Event, + executable: str, + partition_wait_time: int, + sleep_time: int, + run_once_fnc: Callable[..., Optional[Union[bool, tuple[bool, Any]]]], + activities: Optional[list[str]] = None): """ Run the daemon loop and call the function run_once_fnc at each iteration """ @@ -455,7 +399,11 @@ def __init__(self, producers, consumers, graceful_stop, logger=logging.log): self.producers_done_event = threading.Event() self.logger = logger - def _produce(self, it: Callable[[], Iterator[T]], wait_for_consumers: bool = False): + def _produce( + self, + it: Callable[[], Iterator[T]], + wait_for_consumers: bool = False + ): """ Iterate over the generator function and put the extracted elements into the queue. @@ -477,9 +425,7 @@ def _produce(self, it: Callable[[], Iterator[T]], wait_for_consumers: bool = Fal except StopIteration: break except Exception as e: - METRICS.counter("exceptions.{exception}").labels( - exception=e.__class__.__name__ - ).inc() + METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc() self.logger(logging.CRITICAL, "Exception", exc_info=True) finally: with self.lock: @@ -490,7 +436,10 @@ def _produce(self, it: Callable[[], Iterator[T]], wait_for_consumers: bool = Fal if wait_for_consumers: self.queue.join() - def _consume(self, fnc: Callable[[T], Any]): + def _consume( + self, + fnc: Callable[[T], Any] + ): """ Wait for elements to arrive via the queue and call the given function on each element. @@ -506,9 +455,7 @@ def _consume(self, fnc: Callable[[T], Any]): try: fnc(product) except Exception as e: - METRICS.counter("exceptions.{exception}").labels( - exception=e.__class__.__name__ - ).inc() + METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc() self.logger(logging.CRITICAL, "Exception", exc_info=True) finally: self.queue.task_done() @@ -519,8 +466,11 @@ def run(self): for i, producer in enumerate(self.producers): thread = threading.Thread( target=self._produce, - name=f"producer-{i}-{producer.__name__}", - kwargs={"it": producer, "wait_for_consumers": True}, + name=f'producer-{i}-{producer.__name__}', + kwargs={ + 'it': producer, + 'wait_for_consumers': True + } ) thread.start() producer_threads.append(thread) @@ -529,28 +479,24 @@ def run(self): for i, consumer in enumerate(self.consumers): thread = threading.Thread( target=self._consume, - name=f"consumer-{i}-{consumer.__name__}", + name=f'consumer-{i}-{consumer.__name__}', kwargs={ - "fnc": consumer, - }, + 'fnc': consumer, + } ) thread.start() consumer_threads.append(thread) - logging.info("waiting for interrupts") + logging.info('waiting for interrupts') while producer_threads: for thread in producer_threads: thread.join(timeout=3.14) - producer_threads = [ - thread for thread in producer_threads if thread.is_alive() - ] + producer_threads = [thread for thread in producer_threads if thread.is_alive()] self.producers_done_event.set() while consumer_threads: for thread in consumer_threads: thread.join(timeout=3.14) - consumer_threads = [ - thread for thread in consumer_threads if thread.is_alive() - ] + consumer_threads = [thread for thread in consumer_threads if thread.is_alive()] diff --git a/tests/test_abacus_account.py b/tests/test_abacus_account.py index 5dd29d06e3..f313487c24 100644 --- a/tests/test_abacus_account.py +++ b/tests/test_abacus_account.py @@ -20,7 +20,7 @@ from rucio.core.account import get_usage_history from rucio.core.account_counter import update_account_counter_history from rucio.core.account_limit import get_local_account_usage, set_local_account_limit -from rucio.daemons.abacus.account import account_update +from rucio.daemons.abacus.account import AbacusAccount from rucio.daemons.judge import cleaner from rucio.daemons.reaper.reaper import Reaper from rucio.db.sqla import models @@ -45,7 +45,7 @@ def test_abacus_account(self, vo, root_account, mock_scope, rse_factory, did_fac dataset = dids[0]['dataset_name'] activity = get_schema_value('ACTIVITY')['enum'][0] rucio_client.add_replication_rule([{'scope': mock_scope.external, 'name': dataset}], 1, rse, lifetime=-1, activity=activity) - account_update(once=True) + AbacusAccount(once=True).run() account_usage = get_local_account_usage(account=root_account, rse_id=rse_id)[0] assert account_usage['bytes'] == nfiles * file_sizes assert account_usage['files'] == nfiles @@ -63,7 +63,7 @@ def test_abacus_account(self, vo, root_account, mock_scope, rse_factory, did_fac # Delete rules -> account usage should decrease cleaner.run(once=True) - account_update(once=True) + AbacusAccount(once=True).run() # set account limit because return value of get_local_account_usage differs if a limit is set or not set_local_account_limit(account=root_account, rse_id=rse_id, bytes_=10) account_usages = get_local_account_usage(account=root_account, rse_id=rse_id)[0] diff --git a/tests/test_bin_rucio.py b/tests/test_bin_rucio.py index 663ba912ca..acd80eb56b 100755 --- a/tests/test_bin_rucio.py +++ b/tests/test_bin_rucio.py @@ -1834,7 +1834,7 @@ def test_list_account_usage(self): """ CLIENT (USER): list account usage. """ from rucio.db.sqla import session, models from rucio.core.account_counter import increase - from rucio.daemons.abacus import account as abacus_account + from rucio.daemons.abacus.account import AbacusAccount db_session = session.get_session() db_session.query(models.AccountUsage).delete() @@ -1854,7 +1854,7 @@ def test_list_account_usage(self): self.account_client.set_local_account_limit(account, rse, local_limit) self.account_client.set_global_account_limit(account, rse_exp, global_limit) increase(rse_id, InternalAccount(account, **self.vo), 1, usage) - abacus_account.run(once=True) + AbacusAccount(once=True).run() cmd = 'rucio list-account-usage {0}'.format(account) exitcode, out, err = execute(cmd) assert re.search('.*{0}.*{1}.*{2}.*{3}'.format(rse, usage, local_limit, local_left), out) is not None diff --git a/tests/test_counter.py b/tests/test_counter.py index 0501247d17..19887ae532 100644 --- a/tests/test_counter.py +++ b/tests/test_counter.py @@ -19,7 +19,7 @@ from rucio.core import account_counter, rse_counter from rucio.core.account import get_usage -from rucio.daemons.abacus.account import account_update +from rucio.daemons.abacus.account import AbacusAccount from rucio.daemons.abacus.rse import rse_update from rucio.db.sqla import models @@ -91,7 +91,7 @@ class TestCoreAccountCounter: def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session): """ACCOUNT COUNTER (CORE): Increase, decrease and get counter """ db_session.commit() - account_update(once=True) + AbacusAccount(once=True).run() _, rse_id = rse_factory.make_mock_rse(session=db_session) db_session.commit() account = jdoe_account @@ -104,7 +104,7 @@ def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session): count, sum_ = 0, 0 for i in range(10): account_counter.increase(rse_id=rse_id, account=account, files=1, bytes_=2.147e+9) - account_update(once=True) + AbacusAccount(once=True).run() count += 1 sum_ += 2.147e+9 cnt = get_usage(rse_id=rse_id, account=account) @@ -113,7 +113,7 @@ def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session): for i in range(4): account_counter.decrease(rse_id=rse_id, account=account, files=1, bytes_=2.147e+9) - account_update(once=True) + AbacusAccount(once=True).run() count -= 1 sum_ -= 2.147e+9 cnt = get_usage(rse_id=rse_id, account=account) @@ -122,7 +122,7 @@ def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session): for i in range(5): account_counter.increase(rse_id=rse_id, account=account, files=1, bytes_=2.147e+9) - account_update(once=True) + AbacusAccount(once=True).run() count += 1 sum_ += 2.147e+9 cnt = get_usage(rse_id=rse_id, account=account) @@ -131,7 +131,7 @@ def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session): for i in range(8): account_counter.decrease(rse_id=rse_id, account=account, files=1, bytes_=2.147e+9) - account_update(once=True) + AbacusAccount(once=True).run() count -= 1 sum_ -= 2.147e+9 cnt = get_usage(rse_id=rse_id, account=account) diff --git a/tests/test_daemons.py b/tests/test_daemons.py index 370bb923c6..d33254204b 100644 --- a/tests/test_daemons.py +++ b/tests/test_daemons.py @@ -19,7 +19,7 @@ import rucio.db.sqla.util from rucio.common import exception -from rucio.daemons.abacus import account, collection_replica, rse +from rucio.daemons.abacus import collection_replica, rse from rucio.daemons.atropos import atropos from rucio.daemons.automatix import automatix from rucio.daemons.badreplicas import minos, minos_temporary_expiration, necromancer @@ -37,7 +37,6 @@ from rucio.daemons.common import Daemon DAEMONS = [ - account, collection_replica, rse, atropos, diff --git a/tests/test_judge_evaluator.py b/tests/test_judge_evaluator.py index e40352c829..6e88a5e071 100644 --- a/tests/test_judge_evaluator.py +++ b/tests/test_judge_evaluator.py @@ -25,7 +25,7 @@ from rucio.core.replica import add_replica from rucio.core.rse import add_rse_attribute from rucio.core.rule import add_rule, get_rule -from rucio.daemons.abacus.account import account_update +from rucio.daemons.abacus.account import AbacusAccount from rucio.daemons.judge.evaluator import re_evaluator from rucio.db.sqla.constants import DIDType, LockState from rucio.db.sqla.models import UpdatedDID @@ -198,7 +198,7 @@ def test_judge_dataset_grouping_all(self): def test_account_counter_judge_evaluate_attach(self): """ JUDGE EVALUATOR: Test if the account counter is updated correctly when a file is added to a DS""" re_evaluator(once=True, did_limit=None) - account_update(once=True) + AbacusAccount(once=True).run() scope = InternalScope('mock', **self.vo) files = create_files(3, scope, self.rse1_id, bytes_=100) @@ -213,7 +213,7 @@ def test_account_counter_judge_evaluate_attach(self): # Fake judge re_evaluator(once=True, did_limit=None) - account_update(once=True) + AbacusAccount(once=True).run() account_counter_after = get_usage(self.rse1_id, self.jdoe) assert account_counter_before['bytes'] + 3 * 100 == account_counter_after['bytes'] @@ -223,7 +223,7 @@ def test_account_counter_judge_evaluate_attach(self): def test_account_counter_judge_evaluate_detach(self): """ JUDGE EVALUATOR: Test if the account counter is updated correctly when a file is removed from a DS""" re_evaluator(once=True, did_limit=None) - account_update(once=True) + AbacusAccount(once=True).run() scope = InternalScope('mock', **self.vo) files = create_files(3, scope, self.rse1_id, bytes_=100) @@ -234,7 +234,7 @@ def test_account_counter_judge_evaluate_detach(self): # Add a first rule to the DS add_rule(dids=[{'scope': scope, 'name': dataset}], account=self.jdoe, copies=1, rse_expression=self.rse1, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None) - account_update(once=True) + AbacusAccount(once=True).run() account_counter_before = get_usage(self.rse1_id, self.jdoe) @@ -242,7 +242,7 @@ def test_account_counter_judge_evaluate_detach(self): # Fake judge re_evaluator(once=True, did_limit=None) - account_update(once=True) + AbacusAccount(once=True).run() account_counter_after = get_usage(self.rse1_id, self.jdoe) assert account_counter_before['bytes'] - 100 == account_counter_after['bytes'] diff --git a/tests/test_rse.py b/tests/test_rse.py index d8a7fe52df..36a3a8644e 100644 --- a/tests/test_rse.py +++ b/tests/test_rse.py @@ -36,7 +36,7 @@ parse_checksum_support_attribute, get_rse_supported_checksums_from_attributes, update_rse) -from rucio.daemons.abacus.account import account_update +from rucio.daemons.abacus.account import AbacusAccount from rucio.db.sqla import session, models from rucio.db.sqla.constants import RSEType, DIDType from rucio.rse import rsemanager as mgr @@ -1372,7 +1372,7 @@ def test_get_rse_usage(self, vo, rucio_client, rse_factory, jdoe_account, root_a attach_dids(tmp_scope, dataset, files, jdoe_account) rules = add_rule(dids=[{'scope': tmp_scope, 'name': dataset}], account=jdoe_account, copies=1, rse_expression=rse, grouping='DATASET', weight=None, lifetime=None, locked=False, subscription_id=None, activity=activity) assert rules - account_update(once=True) + AbacusAccount(once=True).run() usages = rucio_client.get_rse_usage(rse=rse, filters={'per_account': True}) for usage in usages: assert usage['account_usages'] diff --git a/tests/test_rule.py b/tests/test_rule.py index e62093d43e..e1a32ef4fc 100644 --- a/tests/test_rule.py +++ b/tests/test_rule.py @@ -42,7 +42,7 @@ from rucio.core.rse_counter import get_counter as get_rse_counter from rucio.core.rule import add_rule, get_rule, delete_rule, add_rules, update_rule, reduce_rule, move_rule, list_rules from rucio.core.scope import add_scope -from rucio.daemons.abacus.account import account_update +from rucio.daemons.abacus.account import AbacusAccount from rucio.daemons.abacus.rse import rse_update from rucio.daemons.judge.evaluator import re_evaluator from rucio.db.sqla import models @@ -606,7 +606,7 @@ def test_locked_rule(self, mock_scope, did_factory, jdoe_account): def test_account_counter_rule_create(self, mock_scope, did_factory, jdoe_account): """ REPLICATION RULE (CORE): Test if the account counter is updated correctly when new rule is created""" - account_update(once=True) + AbacusAccount(once=True).run() account_counter_before = get_usage(self.rse1_id, jdoe_account) files = create_files(3, mock_scope, self.rse1_id, bytes_=100) @@ -617,7 +617,7 @@ def test_account_counter_rule_create(self, mock_scope, did_factory, jdoe_account add_rule(dids=[dataset], account=jdoe_account, copies=1, rse_expression=self.rse1, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None) # Check if the counter has been updated correctly - account_update(once=True) + AbacusAccount(once=True).run() account_counter_after = get_usage(self.rse1_id, jdoe_account) assert (account_counter_before['bytes'] + 3 * 100 == account_counter_after['bytes']) assert (account_counter_before['files'] + 3 == account_counter_after['files']) @@ -633,11 +633,11 @@ def test_account_counter_rule_delete(self, mock_scope, did_factory, jdoe_account rule_id = add_rule(dids=[dataset], account=jdoe_account, copies=1, rse_expression=self.rse1, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None)[0] - account_update(once=True) + AbacusAccount(once=True).run() account_counter_before = get_usage(self.rse1_id, jdoe_account) delete_rule(rule_id) - account_update(once=True) + AbacusAccount(once=True).run() # Check if the counter has been updated correctly account_counter_after = get_usage(self.rse1_id, jdoe_account) @@ -655,12 +655,12 @@ def test_account_counter_rule_update(self, vo, mock_scope, did_factory, jdoe_acc rule_id = add_rule(dids=[dataset], account=jdoe_account, copies=1, rse_expression=self.rse1, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None)[0] - account_update(once=True) + AbacusAccount(once=True).run() account_counter_before_1 = get_usage(self.rse1_id, jdoe_account) account_counter_before_2 = get_usage(self.rse1_id, root_account) rucio.api.rule.update_replication_rule(rule_id, {'account': 'root'}, issuer='root', vo=vo) - account_update(once=True) + AbacusAccount(once=True).run() # Check if the counter has been updated correctly account_counter_after_1 = get_usage(self.rse1_id, jdoe_account) diff --git a/tools/count_missing_type_annotations_utils.sh b/tools/count_missing_type_annotations_utils.sh index a10e0e185e..3dea91b802 100755 --- a/tools/count_missing_type_annotations_utils.sh +++ b/tools/count_missing_type_annotations_utils.sh @@ -15,14 +15,14 @@ # limitations under the License. # Script with all tools to count the missing python type annotations in the -# project. Installs all necessary python packages temporarily if needed. To use +# project. Installes all necessary python packages temporarly if needed. To use # it run: `scource count_missing_annotations_utils.sh`. set -e ensure_install() { - # Checks if a python package is installed via pip. It installs the package, + # Checks if a python package is installed via pip. It installes the package, # and removes it after the script run automatically. # # All debug output is redirected to the stderr stream, to not interfear with