Skip to content

Commit

Permalink
Refactor Abacus Account to inherit from base Daemon class; #6478
Browse files Browse the repository at this point in the history
  • Loading branch information
rdimaio committed Feb 15, 2024
1 parent 5fcff6c commit d45c5ad
Show file tree
Hide file tree
Showing 14 changed files with 216 additions and 258 deletions.
15 changes: 7 additions & 8 deletions bin/rucio-abacus-account
Expand Up @@ -20,8 +20,8 @@ Abacus account is a daemon to update account counters.

import argparse
import signal

from rucio.daemons.abacus.account import run, stop
from rucio.daemons.abacus.account import AbacusAccount
from rucio.daemons.abacus.common import ABACUS_HISTORY_TABLE_INTERVAL


def get_parser():
Expand Down Expand Up @@ -57,19 +57,18 @@ Check account usage again::
''', formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only')
parser.add_argument("--threads", action="store", default=1, type=int, help='Concurrency control: total number of threads on this process')
parser.add_argument("--enable-history", action="store_true", default=False, help='Record account usage into history table every hour.')
parser.add_argument("--enable-history", action="store_true", default=False, help=f'Record account usage into history table every {ABACUS_HISTORY_TABLE_INTERVAL} seconds.')
parser.add_argument('--sleep-time', action="store", default=10, type=int, help='Concurrency control: thread sleep time after each chunk of work')

return parser


if __name__ == "__main__":

signal.signal(signal.SIGTERM, stop)

parser = get_parser()
args = parser.parse_args()
abacus_account = AbacusAccount(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, abacus_account.stop)
try:
run(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time)
abacus_account.run()
except KeyboardInterrupt:
stop()
abacus_account.stop()
2 changes: 2 additions & 0 deletions bin/rucio-judge-repairer
Expand Up @@ -19,6 +19,7 @@ Judge-Repairer is a daemon to repair stuck replication rules.
"""

import argparse
import signal

from rucio.daemons.judge.repairer import JudgeRepairer

Expand All @@ -38,6 +39,7 @@ if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
judge_repairer = JudgeRepairer(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, judge_repairer.stop)
try:
judge_repairer.run()
except KeyboardInterrupt:
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/core/rse_expression_parser.py
Expand Up @@ -42,7 +42,7 @@


@transactional_session
def parse_expression(expression: str, filter_=None, *, session: "Session"):
def parse_expression(expression, filter_=None, *, session: "Session"):
"""
Parse a RSE expression and return the list of RSE dictionaries.
Expand Down
108 changes: 28 additions & 80 deletions lib/rucio/daemons/abacus/account.py
Expand Up @@ -18,89 +18,37 @@
"""

import logging
import threading
import time
from typing import TYPE_CHECKING
from typing import Any

import rucio.db.sqla.util
from rucio.common import exception
from rucio.common.logging import setup_logging
from rucio.common.utils import get_thread_with_periodic_running_function
from rucio.core.account_counter import get_updated_account_counters, update_account_counter, fill_account_counter_history_table
from rucio.daemons.common import run_daemon
from rucio.core.account_counter import get_updated_account_counters, update_account_counter
from rucio.daemons.common import HeartbeatHandler
from rucio.daemons.abacus.common import AbacusDaemon

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

graceful_stop = threading.Event()
DAEMON_NAME = 'abacus-account'
class AbacusAccount(AbacusDaemon):
def __init__(self, **_kwargs) -> None:
super().__init__(daemon_name="abacus-account", **_kwargs)


def account_update(once=False, sleep_time=10):
"""
Main loop to check and update the Account Counters.
"""
run_daemon(
once=once,
graceful_stop=graceful_stop,
executable=DAEMON_NAME,
partition_wait_time=1,
sleep_time=sleep_time,
run_once_fnc=run_once,
)


def run_once(heartbeat_handler, **_kwargs):
worker_number, total_workers, logger = heartbeat_handler.live()

start = time.time() # NOQA
account_rse_ids = get_updated_account_counters(total_workers=total_workers,
worker_number=worker_number)
logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(account_rse_ids)))

# If the list is empty, sent the worker to sleep
if not account_rse_ids:
logger(logging.INFO, 'did not get any work')
return

for account_rse_id in account_rse_ids:
def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> tuple[bool, Any]:
worker_number, total_workers, logger = heartbeat_handler.live()
if graceful_stop.is_set():
break
start_time = time.time()
update_account_counter(account=account_rse_id[0], rse_id=account_rse_id[1])
logger(logging.DEBUG, 'update of account-rse counter "%s-%s" took %f' % (account_rse_id[0], account_rse_id[1], time.time() - start_time))


def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""

graceful_stop.set()


def run(once=False, threads=1, fill_history_table=False, sleep_time=10):
"""
Starts up the Abacus-Account threads.
"""
setup_logging(process_name=DAEMON_NAME)

if rucio.db.sqla.util.is_old_db():
raise exception.DatabaseException('Database was not updated, daemon won\'t start')

if once:
logging.info('main: executing one iteration only')
account_update(once)
else:
logging.info('main: starting threads')
threads = [threading.Thread(target=account_update, kwargs={'once': once, 'sleep_time': sleep_time}) for i in
range(0, threads)]
if fill_history_table:
threads.append(get_thread_with_periodic_running_function(3600, fill_account_counter_history_table, graceful_stop))
[t.start() for t in threads]
logging.info('main: waiting for interrupts')
# Interruptible joins require a timeout.
while threads[0].is_alive():
[t.join(timeout=3.14) for t in threads]
must_sleep = False

start = time.time() # NOQA
account_rse_ids = get_updated_account_counters(total_workers=total_workers,
worker_number=worker_number)
logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(account_rse_ids)))

# If the list is empty, sent the worker to sleep
if not account_rse_ids:
logger(logging.INFO, 'did not get any work')
return must_sleep, None

for account_rse_id in account_rse_ids:
worker_number, total_workers, logger = heartbeat_handler.live()
if self.graceful_stop.is_set():
break
start_time = time.time()
update_account_counter(account=account_rse_id[0], rse_id=account_rse_id[1])
logger(logging.DEBUG, 'update of account-rse counter "%s-%s" took %f' % (account_rse_id[0], account_rse_id[1], time.time() - start_time))
return must_sleep, None
64 changes: 64 additions & 0 deletions lib/rucio/daemons/abacus/common.py
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rucio.daemons.common import Daemon, HeartbeatHandler
from rucio.common.utils import get_thread_with_periodic_running_function
from rucio.core.account_counter import fill_account_counter_history_table
import logging
import threading
from typing import Any
from abc import abstractmethod

ABACUS_HISTORY_TABLE_INTERVAL = 3600


class AbacusDaemon(Daemon):
"""
Common daemon logic for multiple Abacus daemons.
"""
def __init__(self, fill_history_table: bool = False, **_kwargs) -> None:
f"""
:param fill_history_table: Set to True to record account usage into history table every {ABACUS_HISTORY_TABLE_INTERVAL} seconds.
"""
super().__init__(**_kwargs)
self.fill_history_table = fill_history_table
self.paused_dids = {}

@abstractmethod
def _run_once(
self, heartbeat_handler: "HeartbeatHandler", **_kwargs
) -> tuple[bool, Any]:
pass

def run(self) -> None:
self._pre_run_checks()

if self.once:
logging.info("%s: executing one iteration only", self.daemon_name)
self._call_daemon()
else:
logging.info("%s: starting threads", self.daemon_name)
thread_list = [threading.Thread(target=self._call_daemon) for _ in
range(0, self.total_workers)]
if self.fill_history_table:
thread_list.append(get_thread_with_periodic_running_function(ABACUS_HISTORY_TABLE_INTERVAL, fill_account_counter_history_table, self.graceful_stop))
[t.start() for t in thread_list]
logging.info("%s: waiting for interrupts", self.daemon_name)
# Interruptible joins require a timeout.
while thread_list:
thread_list = [
thread.join(timeout=3.14)
for thread in thread_list
if thread and thread.is_alive()
]

0 comments on commit d45c5ad

Please sign in to comment.