Skip to content

Commit

Permalink
Refactor Reaper to inherit from base Daemon class
Browse files Browse the repository at this point in the history
  • Loading branch information
rdimaio committed Jan 30, 2024
1 parent 33b8daa commit c2ad227
Show file tree
Hide file tree
Showing 15 changed files with 1,082 additions and 713 deletions.
35 changes: 16 additions & 19 deletions bin/rucio-reaper
Expand Up @@ -18,10 +18,9 @@
'''

import argparse
import signal

from rucio.common.utils import StoreAndDeprecateWarningAction
from rucio.daemons.reaper.reaper import run, stop
from rucio.daemons.reaper.reaper import Reaper


def get_parser():
Expand Down Expand Up @@ -61,24 +60,22 @@ def get_parser():

if __name__ == "__main__":

# Bind our callback to the SIGTERM signal and run the daemon:
signal.signal(signal.SIGTERM, stop)

parser = get_parser()
args = parser.parse_args()
reaper = Reaper(threads=args.threads,
chunk_size=args.chunk_size,
include_rses=args.include_rses,
exclude_rses=args.exclude_rses,
rses=args.rses,
vos=args.vos,
once=args.run_once,
greedy=args.greedy,
scheme=args.scheme,
delay_seconds=args.delay_seconds,
sleep_time=args.sleep_time,
auto_exclude_threshold=args.auto_exclude_threshold,
auto_exclude_timeout=args.auto_exclude_timeout)
try:
run(threads=args.threads,
chunk_size=args.chunk_size,
include_rses=args.include_rses,
exclude_rses=args.exclude_rses,
rses=args.rses,
vos=args.vos,
once=args.run_once,
greedy=args.greedy,
scheme=args.scheme,
delay_seconds=args.delay_seconds,
sleep_time=args.sleep_time,
auto_exclude_threshold=args.auto_exclude_threshold,
auto_exclude_timeout=args.auto_exclude_timeout)
reaper.run()
except KeyboardInterrupt:
stop()
reaper.stop()
10 changes: 10 additions & 0 deletions lib/rucio/common/exception.py
Expand Up @@ -1090,3 +1090,13 @@ def __init__(self, *args, **kwargs):
super(DeprecationError, self).__init__(*args, **kwargs)
self._message = 'Command or function has been deprecated.'
self.error_code = 105


class ReaperNoRSEsToProcess(RucioException):
"""
Reaper found no RSEs to process.
"""
def __init__(self, *args, **kwargs):
super(ReaperNoRSEsToProcess, self).__init__(*args, **kwargs)
self._message = 'Reaper: No RSEs to process found.'
self.error_code = 106
2 changes: 1 addition & 1 deletion lib/rucio/core/rse_expression_parser.py
Expand Up @@ -42,7 +42,7 @@


@transactional_session
def parse_expression(expression, filter_=None, *, session: "Session"):
def parse_expression(expression: str, filter_=None, *, session: "Session"):
"""
Parse a RSE expression and return the list of RSE dictionaries.
Expand Down
39 changes: 25 additions & 14 deletions lib/rucio/daemons/common.py
Expand Up @@ -76,9 +76,17 @@ def _pre_run_checks() -> None:
raise DatabaseException("Database was not updated, daemon won't start")

@abstractmethod
def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> None:
def _run_once(
self, heartbeat_handler: "HeartbeatHandler", **_kwargs
) -> tuple[bool, Any]:
"""
Daemon-specific logic (to be defined in child classes) for a single iteration
:param heartbeat_handler: Handler to set and manage the heartbeat for this execution.
:returns: Tuple of (must_sleep, ret_value).
must_sleep: set to True to signal to the calling context that it must sleep before next execution, False otherwise
ret_value: Daemon-specific return value
"""
pass

Expand Down Expand Up @@ -107,20 +115,23 @@ def run(self) -> None:
"""
self._pre_run_checks()

if self.once:
self._call_daemon()
else:
logging.info("main: starting threads")
threads = [
threading.Thread(target=self._call_daemon)
for _ in range(0, self.total_workers)
]
[t.start() for t in threads]
logging.info("main: waiting for interrupts")
logging.info(f"{self.daemon_name}: starting threads")
thread_list = [
threading.Thread(target=self._call_daemon)
for _ in range(0, self.total_workers)
]
[t.start() for t in thread_list]

# Interruptible joins require a timeout.
while threads[0].is_alive():
[t.join(timeout=3.14) for t in threads]
if not self.once:
logging.info(f"{self.daemon_name}: waiting for interrupts")

# Interruptible joins require a timeout.
while thread_list:
thread_list = [
thread.join(timeout=3.14)
for thread in thread_list
if thread and thread.is_alive()
]

def stop(
self, signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None
Expand Down

0 comments on commit c2ad227

Please sign in to comment.