Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create base Daemon class and refactor Undertaker, Reaper, Abacus Account and Judge Repairer daemons to inherit from base class #6489

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions bin/rucio-abacus-account
Expand Up @@ -20,8 +20,8 @@ Abacus account is a daemon to update account counters.

import argparse
import signal

from rucio.daemons.abacus.account import run, stop
from rucio.daemons.abacus.account import AbacusAccount
from rucio.daemons.abacus.common import ABACUS_HISTORY_TABLE_INTERVAL


def get_parser():
Expand Down Expand Up @@ -57,19 +57,19 @@ Check account usage again::
''', formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only')
parser.add_argument("--threads", action="store", default=1, type=int, help='Concurrency control: total number of threads on this process')
parser.add_argument("--enable-history", action="store_true", default=False, help='Record account usage into history table every hour.')
parser.add_argument("--enable-history", action="store_true", default=False, help=f'Record account usage into history table every {ABACUS_HISTORY_TABLE_INTERVAL} seconds.')
parser.add_argument('--sleep-time', action="store", default=10, type=int, help='Concurrency control: thread sleep time after each chunk of work')

return parser


if __name__ == "__main__":

signal.signal(signal.SIGTERM, stop)

parser = get_parser()
args = parser.parse_args()
abacus_account = AbacusAccount(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, abacus_account.stop)
signal.signal(signal.SIGINT, abacus_account.stop)
try:
run(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time)
abacus_account.run()
except KeyboardInterrupt:
stop()
abacus_account.stop()
10 changes: 6 additions & 4 deletions bin/rucio-judge-repairer
Expand Up @@ -21,7 +21,7 @@ Judge-Repairer is a daemon to repair stuck replication rules.
import argparse
import signal

from rucio.daemons.judge.repairer import run, stop
from rucio.daemons.judge.repairer import JudgeRepairer


def get_parser():
Expand All @@ -36,10 +36,12 @@ def get_parser():


if __name__ == "__main__":
signal.signal(signal.SIGTERM, stop)
parser = get_parser()
args = parser.parse_args()
judge_repairer = JudgeRepairer(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, judge_repairer.stop)
signal.signal(signal.SIGINT, judge_repairer.stop)
try:
run(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time)
judge_repairer.run()
except KeyboardInterrupt:
stop()
judge_repairer.stop()
36 changes: 18 additions & 18 deletions bin/rucio-reaper
Expand Up @@ -21,7 +21,7 @@ import argparse
import signal

from rucio.common.utils import StoreAndDeprecateWarningAction
from rucio.daemons.reaper.reaper import run, stop
from rucio.daemons.reaper.reaper import Reaper


def get_parser():
Expand Down Expand Up @@ -61,24 +61,24 @@ def get_parser():

if __name__ == "__main__":

# Bind our callback to the SIGTERM signal and run the daemon:
signal.signal(signal.SIGTERM, stop)

parser = get_parser()
args = parser.parse_args()
reaper = Reaper(threads=args.threads,
chunk_size=args.chunk_size,
include_rses=args.include_rses,
exclude_rses=args.exclude_rses,
rses=args.rses,
vos=args.vos,
once=args.run_once,
greedy=args.greedy,
scheme=args.scheme,
delay_seconds=args.delay_seconds,
sleep_time=args.sleep_time,
auto_exclude_threshold=args.auto_exclude_threshold,
auto_exclude_timeout=args.auto_exclude_timeout)
signal.signal(signal.SIGTERM, reaper.stop)
signal.signal(signal.SIGINT, reaper.stop)
try:
run(threads=args.threads,
chunk_size=args.chunk_size,
include_rses=args.include_rses,
exclude_rses=args.exclude_rses,
rses=args.rses,
vos=args.vos,
once=args.run_once,
greedy=args.greedy,
scheme=args.scheme,
delay_seconds=args.delay_seconds,
sleep_time=args.sleep_time,
auto_exclude_threshold=args.auto_exclude_threshold,
auto_exclude_timeout=args.auto_exclude_timeout)
reaper.run()
except KeyboardInterrupt:
stop()
reaper.stop()
12 changes: 6 additions & 6 deletions bin/rucio-undertaker
Expand Up @@ -21,7 +21,7 @@ Undertaker is a daemon to manage expired did.
import argparse
import signal

from rucio.daemons.undertaker.undertaker import run, stop
from rucio.daemons.undertaker.undertaker import Undertaker


def get_parser():
Expand Down Expand Up @@ -66,12 +66,12 @@ Check if the DID exists::


if __name__ == "__main__":

signal.signal(signal.SIGTERM, stop)
parser = get_parser()
args = parser.parse_args()
undertaker = Undertaker(total_workers=args.total_workers, chunk_size=args.chunk_size, once=args.run_once, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, undertaker.stop)
signal.signal(signal.SIGINT, undertaker.stop)
try:
run(total_workers=args.total_workers, chunk_size=args.chunk_size, once=args.run_once,
sleep_time=args.sleep_time)
undertaker.run()
except KeyboardInterrupt:
stop()
undertaker.stop()
Comment on lines 74 to +77
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this try except is not needed

10 changes: 10 additions & 0 deletions lib/rucio/common/exception.py
Expand Up @@ -1090,3 +1090,13 @@ def __init__(self, *args, **kwargs):
super(DeprecationError, self).__init__(*args, **kwargs)
self._message = 'Command or function has been deprecated.'
self.error_code = 105


class ReaperNoRSEsToProcess(RucioException):
"""
Reaper found no RSEs to process.
"""
def __init__(self, *args, **kwargs):
super(ReaperNoRSEsToProcess, self).__init__(*args, **kwargs)
self._message = 'Reaper: No RSEs to process found.'
self.error_code = 106
108 changes: 28 additions & 80 deletions lib/rucio/daemons/abacus/account.py
Expand Up @@ -18,89 +18,37 @@
"""

import logging
import threading
import time
from typing import TYPE_CHECKING
from typing import Any

import rucio.db.sqla.util
from rucio.common import exception
from rucio.common.logging import setup_logging
from rucio.common.utils import get_thread_with_periodic_running_function
from rucio.core.account_counter import get_updated_account_counters, update_account_counter, fill_account_counter_history_table
from rucio.daemons.common import run_daemon
from rucio.core.account_counter import get_updated_account_counters, update_account_counter
from rucio.daemons.common import HeartbeatHandler
from rucio.daemons.abacus.common import AbacusDaemon

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

graceful_stop = threading.Event()
DAEMON_NAME = 'abacus-account'
class AbacusAccount(AbacusDaemon):
def __init__(self, **_kwargs) -> None:
super().__init__(daemon_name="abacus-account", **_kwargs)


def account_update(once=False, sleep_time=10):
"""
Main loop to check and update the Account Counters.
"""
run_daemon(
once=once,
graceful_stop=graceful_stop,
executable=DAEMON_NAME,
partition_wait_time=1,
sleep_time=sleep_time,
run_once_fnc=run_once,
)


def run_once(heartbeat_handler, **_kwargs):
worker_number, total_workers, logger = heartbeat_handler.live()

start = time.time() # NOQA
account_rse_ids = get_updated_account_counters(total_workers=total_workers,
worker_number=worker_number)
logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(account_rse_ids)))

# If the list is empty, sent the worker to sleep
if not account_rse_ids:
logger(logging.INFO, 'did not get any work')
return

for account_rse_id in account_rse_ids:
def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> tuple[bool, Any]:
worker_number, total_workers, logger = heartbeat_handler.live()
if graceful_stop.is_set():
break
start_time = time.time()
update_account_counter(account=account_rse_id[0], rse_id=account_rse_id[1])
logger(logging.DEBUG, 'update of account-rse counter "%s-%s" took %f' % (account_rse_id[0], account_rse_id[1], time.time() - start_time))


def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""

graceful_stop.set()


def run(once=False, threads=1, fill_history_table=False, sleep_time=10):
"""
Starts up the Abacus-Account threads.
"""
setup_logging(process_name=DAEMON_NAME)

if rucio.db.sqla.util.is_old_db():
raise exception.DatabaseException('Database was not updated, daemon won\'t start')

if once:
logging.info('main: executing one iteration only')
account_update(once)
else:
logging.info('main: starting threads')
threads = [threading.Thread(target=account_update, kwargs={'once': once, 'sleep_time': sleep_time}) for i in
range(0, threads)]
if fill_history_table:
threads.append(get_thread_with_periodic_running_function(3600, fill_account_counter_history_table, graceful_stop))
[t.start() for t in threads]
logging.info('main: waiting for interrupts')
# Interruptible joins require a timeout.
while threads[0].is_alive():
[t.join(timeout=3.14) for t in threads]
must_sleep = False

start = time.time() # NOQA
account_rse_ids = get_updated_account_counters(total_workers=total_workers,
worker_number=worker_number)
logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(account_rse_ids)))

# If the list is empty, sent the worker to sleep
if not account_rse_ids:
logger(logging.INFO, 'did not get any work')
return must_sleep, None

for account_rse_id in account_rse_ids:
worker_number, total_workers, logger = heartbeat_handler.live()
if self.graceful_stop.is_set():
break
start_time = time.time()
update_account_counter(account=account_rse_id[0], rse_id=account_rse_id[1])
logger(logging.DEBUG, 'update of account-rse counter "%s-%s" took %f' % (account_rse_id[0], account_rse_id[1], time.time() - start_time))
return must_sleep, None
64 changes: 64 additions & 0 deletions lib/rucio/daemons/abacus/common.py
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rucio.daemons.common import Daemon, HeartbeatHandler
from rucio.common.utils import get_thread_with_periodic_running_function
from rucio.core.account_counter import fill_account_counter_history_table
import logging
import threading
from typing import Any
from abc import abstractmethod

ABACUS_HISTORY_TABLE_INTERVAL = 3600


class AbacusDaemon(Daemon):
"""
Common daemon logic for multiple Abacus daemons.
"""
def __init__(self, fill_history_table: bool = False, **_kwargs) -> None:
f"""
:param fill_history_table: Set to True to record account usage into history table every {ABACUS_HISTORY_TABLE_INTERVAL} seconds.
"""
super().__init__(**_kwargs)
self.fill_history_table = fill_history_table
self.paused_dids = {}

@abstractmethod
def _run_once(
self, heartbeat_handler: "HeartbeatHandler", **_kwargs
) -> tuple[bool, Any]:
pass

def run(self) -> None:
self._pre_run_checks()

if self.once:
logging.info("%s: executing one iteration only", self.daemon_name)
self._call_daemon()
else:
logging.info("%s: starting threads", self.daemon_name)
thread_list = [threading.Thread(target=self._call_daemon) for _ in
range(0, self.total_workers)]
if self.fill_history_table:
thread_list.append(get_thread_with_periodic_running_function(ABACUS_HISTORY_TABLE_INTERVAL, fill_account_counter_history_table, self.graceful_stop))
[t.start() for t in thread_list]
logging.info("%s: waiting for interrupts", self.daemon_name)
# Interruptible joins require a timeout.
while thread_list:
thread_list = [
thread.join(timeout=3.14)
for thread in thread_list
if thread and thread.is_alive()
]