Skip to content

Commit

Permalink
Refactor Abacus Account to inherit from base Daemon class; #6478
Browse files Browse the repository at this point in the history
  • Loading branch information
rdimaio committed Feb 2, 2024
1 parent 48fc910 commit 8eb51c8
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 118 deletions.
15 changes: 7 additions & 8 deletions bin/rucio-abacus-account
Expand Up @@ -20,8 +20,8 @@ Abacus account is a daemon to update account counters.

import argparse
import signal

from rucio.daemons.abacus.account import run, stop
from rucio.daemons.abacus.account import AbacusAccount
from rucio.daemons.abacus.common import ABACUS_HISTORY_TABLE_INTERVAL


def get_parser():
Expand Down Expand Up @@ -57,19 +57,18 @@ Check account usage again::
''', formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only')
parser.add_argument("--threads", action="store", default=1, type=int, help='Concurrency control: total number of threads on this process')
parser.add_argument("--enable-history", action="store_true", default=False, help='Record account usage into history table every hour.')
parser.add_argument("--enable-history", action="store_true", default=False, help=f'Record account usage into history table every {ABACUS_HISTORY_TABLE_INTERVAL} seconds.')
parser.add_argument('--sleep-time', action="store", default=10, type=int, help='Concurrency control: thread sleep time after each chunk of work')

return parser


if __name__ == "__main__":

signal.signal(signal.SIGTERM, stop)

parser = get_parser()
args = parser.parse_args()
abacus_account = AbacusAccount(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, abacus_account.stop)
try:
run(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time)
abacus_account.run()
except KeyboardInterrupt:
stop()
abacus_account.stop()
2 changes: 2 additions & 0 deletions bin/rucio-judge-repairer
Expand Up @@ -19,6 +19,7 @@ Judge-Repairer is a daemon to repair stuck replication rules.
"""

import argparse
import signal

from rucio.daemons.judge.repairer import JudgeRepairer

Expand All @@ -38,6 +39,7 @@ if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
judge_repairer = JudgeRepairer(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, judge_repairer.stop)
try:
judge_repairer.run()
except KeyboardInterrupt:
Expand Down
2 changes: 2 additions & 0 deletions bin/rucio-reaper
Expand Up @@ -18,6 +18,7 @@
'''

import argparse
import signal

from rucio.common.utils import StoreAndDeprecateWarningAction
from rucio.daemons.reaper.reaper import Reaper
Expand Down Expand Up @@ -75,6 +76,7 @@ if __name__ == "__main__":
sleep_time=args.sleep_time,
auto_exclude_threshold=args.auto_exclude_threshold,
auto_exclude_timeout=args.auto_exclude_timeout)
signal.signal(signal.SIGTERM, reaper.stop)
try:
reaper.run()
except KeyboardInterrupt:
Expand Down
2 changes: 2 additions & 0 deletions bin/rucio-undertaker
Expand Up @@ -19,6 +19,7 @@ Undertaker is a daemon to manage expired did.
"""

import argparse
import signal

from rucio.daemons.undertaker.undertaker import Undertaker

Expand Down Expand Up @@ -68,6 +69,7 @@ if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
undertaker = Undertaker(total_workers=args.total_workers, chunk_size=args.chunk_size, once=args.run_once, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, undertaker.stop)
try:
undertaker.run()
except KeyboardInterrupt:
Expand Down
108 changes: 28 additions & 80 deletions lib/rucio/daemons/abacus/account.py
Expand Up @@ -18,89 +18,37 @@
"""

import logging
import threading
import time
from typing import TYPE_CHECKING
from typing import Any

import rucio.db.sqla.util
from rucio.common import exception
from rucio.common.logging import setup_logging
from rucio.common.utils import get_thread_with_periodic_running_function
from rucio.core.account_counter import get_updated_account_counters, update_account_counter, fill_account_counter_history_table
from rucio.daemons.common import run_daemon
from rucio.core.account_counter import get_updated_account_counters, update_account_counter
from rucio.daemons.common import HeartbeatHandler
from rucio.daemons.abacus.common import AbacusDaemon

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

graceful_stop = threading.Event()
DAEMON_NAME = 'abacus-account'
class AbacusAccount(AbacusDaemon):
def __init__(self, **_kwargs) -> None:
super().__init__(daemon_name="abacus-account", **_kwargs)


def account_update(once=False, sleep_time=10):
"""
Main loop to check and update the Account Counters.
"""
run_daemon(
once=once,
graceful_stop=graceful_stop,
executable=DAEMON_NAME,
partition_wait_time=1,
sleep_time=sleep_time,
run_once_fnc=run_once,
)


def run_once(heartbeat_handler, **_kwargs):
worker_number, total_workers, logger = heartbeat_handler.live()

start = time.time() # NOQA
account_rse_ids = get_updated_account_counters(total_workers=total_workers,
worker_number=worker_number)
logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(account_rse_ids)))

# If the list is empty, sent the worker to sleep
if not account_rse_ids:
logger(logging.INFO, 'did not get any work')
return

for account_rse_id in account_rse_ids:
def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> tuple[bool, Any]:
worker_number, total_workers, logger = heartbeat_handler.live()
if graceful_stop.is_set():
break
start_time = time.time()
update_account_counter(account=account_rse_id[0], rse_id=account_rse_id[1])
logger(logging.DEBUG, 'update of account-rse counter "%s-%s" took %f' % (account_rse_id[0], account_rse_id[1], time.time() - start_time))


def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""

graceful_stop.set()


def run(once=False, threads=1, fill_history_table=False, sleep_time=10):
"""
Starts up the Abacus-Account threads.
"""
setup_logging(process_name=DAEMON_NAME)

if rucio.db.sqla.util.is_old_db():
raise exception.DatabaseException('Database was not updated, daemon won\'t start')

if once:
logging.info('main: executing one iteration only')
account_update(once)
else:
logging.info('main: starting threads')
threads = [threading.Thread(target=account_update, kwargs={'once': once, 'sleep_time': sleep_time}) for i in
range(0, threads)]
if fill_history_table:
threads.append(get_thread_with_periodic_running_function(3600, fill_account_counter_history_table, graceful_stop))
[t.start() for t in threads]
logging.info('main: waiting for interrupts')
# Interruptible joins require a timeout.
while threads[0].is_alive():
[t.join(timeout=3.14) for t in threads]
must_sleep = False

start = time.time() # NOQA
account_rse_ids = get_updated_account_counters(total_workers=total_workers,
worker_number=worker_number)
logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(account_rse_ids)))

# If the list is empty, sent the worker to sleep
if not account_rse_ids:
logger(logging.INFO, 'did not get any work')
return must_sleep, None

for account_rse_id in account_rse_ids:
worker_number, total_workers, logger = heartbeat_handler.live()
if self.graceful_stop.is_set():
break
start_time = time.time()
update_account_counter(account=account_rse_id[0], rse_id=account_rse_id[1])
logger(logging.DEBUG, 'update of account-rse counter "%s-%s" took %f' % (account_rse_id[0], account_rse_id[1], time.time() - start_time))
return must_sleep, None
64 changes: 64 additions & 0 deletions lib/rucio/daemons/abacus/common.py
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rucio.daemons.common import Daemon, HeartbeatHandler
from rucio.common.utils import get_thread_with_periodic_running_function
from rucio.core.account_counter import fill_account_counter_history_table
import logging
import threading
from typing import Any
from abc import abstractmethod

ABACUS_HISTORY_TABLE_INTERVAL = 3600


class AbacusDaemon(Daemon):
"""
Common daemon logic for multiple Abacus daemons.
"""
def __init__(self, fill_history_table: bool = False, **_kwargs) -> None:
f"""
:param fill_history_table: Set to True to record account usage into history table every {ABACUS_HISTORY_TABLE_INTERVAL} seconds.
"""
super().__init__(**_kwargs)
self.fill_history_table = fill_history_table
self.paused_dids = {}

@abstractmethod
def _run_once(
self, heartbeat_handler: "HeartbeatHandler", **_kwargs
) -> tuple[bool, Any]:
pass

def run(self) -> None:
self._pre_run_checks()

if self.once:
logging.info(f'{self.daemon_name}: executing one iteration only')
self._call_daemon()
else:
logging.info(f'{self.daemon_name}: starting threads')
thread_list = [threading.Thread(target=self._call_daemon) for _ in
range(0, self.total_workers)]
if self.fill_history_table:
thread_list.append(get_thread_with_periodic_running_function(ABACUS_HISTORY_TABLE_INTERVAL, fill_account_counter_history_table, self.graceful_stop))
[t.start() for t in thread_list]
logging.info(f'{self.daemon_name}: waiting for interrupts')
# Interruptible joins require a timeout.
while thread_list:
thread_list = [
thread.join(timeout=3.14)
for thread in thread_list
if thread and thread.is_alive()
]
2 changes: 0 additions & 2 deletions lib/rucio/daemons/common.py
Expand Up @@ -19,7 +19,6 @@
import os
import queue
import socket
import signal
import threading
import time
from collections.abc import Callable, Generator, Iterator, Sequence
Expand Down Expand Up @@ -64,7 +63,6 @@ def __init__(
self.partition_wait_time = partition_wait_time
self.daemon_name = daemon_name
self.graceful_stop = threading.Event()
signal.signal(signal.SIGTERM, self.stop)
setup_logging(process_name=daemon_name)

@staticmethod
Expand Down
6 changes: 3 additions & 3 deletions tests/test_abacus_account.py
Expand Up @@ -20,7 +20,7 @@
from rucio.core.account import get_usage_history
from rucio.core.account_counter import update_account_counter_history
from rucio.core.account_limit import get_local_account_usage, set_local_account_limit
from rucio.daemons.abacus.account import account_update
from rucio.daemons.abacus.account import AbacusAccount
from rucio.daemons.judge import cleaner
from rucio.daemons.reaper.reaper import Reaper
from rucio.db.sqla import models
Expand All @@ -45,7 +45,7 @@ def test_abacus_account(self, vo, root_account, mock_scope, rse_factory, did_fac
dataset = dids[0]['dataset_name']
activity = get_schema_value('ACTIVITY')['enum'][0]
rucio_client.add_replication_rule([{'scope': mock_scope.external, 'name': dataset}], 1, rse, lifetime=-1, activity=activity)
account_update(once=True)
AbacusAccount(once=True).run()
account_usage = get_local_account_usage(account=root_account, rse_id=rse_id)[0]
assert account_usage['bytes'] == nfiles * file_sizes
assert account_usage['files'] == nfiles
Expand All @@ -63,7 +63,7 @@ def test_abacus_account(self, vo, root_account, mock_scope, rse_factory, did_fac

# Delete rules -> account usage should decrease
cleaner.run(once=True)
account_update(once=True)
AbacusAccount(once=True).run()
# set account limit because return value of get_local_account_usage differs if a limit is set or not
set_local_account_limit(account=root_account, rse_id=rse_id, bytes_=10)
account_usages = get_local_account_usage(account=root_account, rse_id=rse_id)[0]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_bin_rucio.py
Expand Up @@ -1834,7 +1834,7 @@ def test_list_account_usage(self):
""" CLIENT (USER): list account usage. """
from rucio.db.sqla import session, models
from rucio.core.account_counter import increase
from rucio.daemons.abacus import account as abacus_account
from rucio.daemons.abacus.account import AbacusAccount

db_session = session.get_session()
db_session.query(models.AccountUsage).delete()
Expand All @@ -1854,7 +1854,7 @@ def test_list_account_usage(self):
self.account_client.set_local_account_limit(account, rse, local_limit)
self.account_client.set_global_account_limit(account, rse_exp, global_limit)
increase(rse_id, InternalAccount(account, **self.vo), 1, usage)
abacus_account.run(once=True)
AbacusAccount(once=True).run()
cmd = 'rucio list-account-usage {0}'.format(account)
exitcode, out, err = execute(cmd)
assert re.search('.*{0}.*{1}.*{2}.*{3}'.format(rse, usage, local_limit, local_left), out) is not None
Expand Down
12 changes: 6 additions & 6 deletions tests/test_counter.py
Expand Up @@ -19,7 +19,7 @@

from rucio.core import account_counter, rse_counter
from rucio.core.account import get_usage
from rucio.daemons.abacus.account import account_update
from rucio.daemons.abacus.account import AbacusAccount
from rucio.daemons.abacus.rse import rse_update
from rucio.db.sqla import models

Expand Down Expand Up @@ -91,7 +91,7 @@ class TestCoreAccountCounter:
def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session):
"""ACCOUNT COUNTER (CORE): Increase, decrease and get counter """
db_session.commit()
account_update(once=True)
AbacusAccount(once=True).run()
_, rse_id = rse_factory.make_mock_rse(session=db_session)
db_session.commit()
account = jdoe_account
Expand All @@ -104,7 +104,7 @@ def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session):
count, sum_ = 0, 0
for i in range(10):
account_counter.increase(rse_id=rse_id, account=account, files=1, bytes_=2.147e+9)
account_update(once=True)
AbacusAccount(once=True).run()
count += 1
sum_ += 2.147e+9
cnt = get_usage(rse_id=rse_id, account=account)
Expand All @@ -113,7 +113,7 @@ def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session):

for i in range(4):
account_counter.decrease(rse_id=rse_id, account=account, files=1, bytes_=2.147e+9)
account_update(once=True)
AbacusAccount(once=True).run()
count -= 1
sum_ -= 2.147e+9
cnt = get_usage(rse_id=rse_id, account=account)
Expand All @@ -122,7 +122,7 @@ def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session):

for i in range(5):
account_counter.increase(rse_id=rse_id, account=account, files=1, bytes_=2.147e+9)
account_update(once=True)
AbacusAccount(once=True).run()
count += 1
sum_ += 2.147e+9
cnt = get_usage(rse_id=rse_id, account=account)
Expand All @@ -131,7 +131,7 @@ def test_inc_dec_get_counter(self, jdoe_account, rse_factory, db_session):

for i in range(8):
account_counter.decrease(rse_id=rse_id, account=account, files=1, bytes_=2.147e+9)
account_update(once=True)
AbacusAccount(once=True).run()
count -= 1
sum_ -= 2.147e+9
cnt = get_usage(rse_id=rse_id, account=account)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_daemons.py
Expand Up @@ -19,7 +19,7 @@

import rucio.db.sqla.util
from rucio.common import exception
from rucio.daemons.abacus import account, collection_replica, rse
from rucio.daemons.abacus import collection_replica, rse
from rucio.daemons.atropos import atropos
from rucio.daemons.automatix import automatix
from rucio.daemons.badreplicas import minos, minos_temporary_expiration, necromancer
Expand All @@ -37,7 +37,6 @@
from rucio.daemons.common import Daemon

DAEMONS = [
account,
collection_replica,
rse,
atropos,
Expand Down

0 comments on commit 8eb51c8

Please sign in to comment.