diff --git a/bin/rucio-abacus-account b/bin/rucio-abacus-account index cba5105b3a..52a16a231e 100755 --- a/bin/rucio-abacus-account +++ b/bin/rucio-abacus-account @@ -68,6 +68,7 @@ if __name__ == "__main__": args = parser.parse_args() abacus_account = AbacusAccount(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time) signal.signal(signal.SIGTERM, abacus_account.stop) + signal.signal(signal.SIGINT, abacus_account.stop) try: abacus_account.run() except KeyboardInterrupt: diff --git a/bin/rucio-judge-repairer b/bin/rucio-judge-repairer index 07eaa6b61f..a2b6f1e75b 100755 --- a/bin/rucio-judge-repairer +++ b/bin/rucio-judge-repairer @@ -40,6 +40,7 @@ if __name__ == "__main__": args = parser.parse_args() judge_repairer = JudgeRepairer(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time) signal.signal(signal.SIGTERM, judge_repairer.stop) + signal.signal(signal.SIGINT, judge_repairer.stop) try: judge_repairer.run() except KeyboardInterrupt: diff --git a/bin/rucio-reaper b/bin/rucio-reaper index bc6ef8c270..be69dbc961 100755 --- a/bin/rucio-reaper +++ b/bin/rucio-reaper @@ -77,6 +77,7 @@ if __name__ == "__main__": auto_exclude_threshold=args.auto_exclude_threshold, auto_exclude_timeout=args.auto_exclude_timeout) signal.signal(signal.SIGTERM, reaper.stop) + signal.signal(signal.SIGINT, reaper.stop) try: reaper.run() except KeyboardInterrupt: diff --git a/bin/rucio-undertaker b/bin/rucio-undertaker index 3141074a77..dfc5d133f4 100755 --- a/bin/rucio-undertaker +++ b/bin/rucio-undertaker @@ -70,6 +70,7 @@ if __name__ == "__main__": args = parser.parse_args() undertaker = Undertaker(total_workers=args.total_workers, chunk_size=args.chunk_size, once=args.run_once, sleep_time=args.sleep_time) signal.signal(signal.SIGTERM, undertaker.stop) + signal.signal(signal.SIGINT, undertaker.stop) try: undertaker.run() except KeyboardInterrupt: diff --git a/lib/rucio/daemons/common.py b/lib/rucio/daemons/common.py index b88663a3df..12d2485bc9 100644 --- a/lib/rucio/daemons/common.py +++ b/lib/rucio/daemons/common.py @@ -18,6 +18,7 @@ import logging import os import queue +import signal import socket import threading import time @@ -36,6 +37,7 @@ T = TypeVar('T') METRICS = MetricManager(module=__name__) + class Daemon(ABC): """ Base daemon class @@ -92,19 +94,20 @@ def _call_daemon(self, activities: Optional[list[str]] = None) -> None: Run the daemon loop and call the function _run_once at each iteration """ - run_once_fnc = functools.partial(self._run_once) + while not self.graceful_stop.is_set(): + run_once_fnc = functools.partial(self._run_once) - daemon = db_workqueue( - once=self.once, - graceful_stop=self.graceful_stop, - executable=self.daemon_name, - partition_wait_time=self.partition_wait_time, - sleep_time=self.sleep_time, - activities=activities, - )(run_once_fnc) + daemon = db_workqueue( + once=self.once, + graceful_stop=self.graceful_stop, + executable=self.daemon_name, + partition_wait_time=self.partition_wait_time, + sleep_time=self.sleep_time, + activities=activities, + )(run_once_fnc) - for _ in daemon(): - pass + for _ in daemon(): + pass def run(self) -> None: """ @@ -121,17 +124,12 @@ def run(self) -> None: for t in thread_list: t.start() + for t in thread_list: + t.join() + if not self.once: logging.info("%s: waiting for interrupts", self.daemon_name) - # Interruptible joins require a timeout. - while thread_list: - thread_list = [ - thread.join(timeout=3.14) - for thread in thread_list - if thread and thread.is_alive() - ] - def stop( self, signum: Optional[int] = None, frame: Optional[FrameType] = None ) -> None: @@ -143,6 +141,13 @@ def stop( :param frame: stack frame """ self.graceful_stop.set() + logging.info("%s: terminating", self.daemon_name) + if signum == signal.SIGINT or signum == signal.SIGTERM: + if hasattr(self.stop, 'received_sigint_or_sigterm'): + exit(1) + else: + self.stop.received_sigint_or_sigterm = True + class HeartbeatHandler: """ diff --git a/lib/rucio/daemons/reaper/reaper.py b/lib/rucio/daemons/reaper/reaper.py index 4a561a2bda..f8a63e37a1 100644 --- a/lib/rucio/daemons/reaper/reaper.py +++ b/lib/rucio/daemons/reaper/reaper.py @@ -58,6 +58,7 @@ METRICS = MetricManager(module=__name__) REGION = make_region_memcached(expiration_time=600) +EXCLUDED_RSE_GAUGE = METRICS.gauge('excluded_rses.{rse}', documentation='Temporarly excluded RSEs') class Reaper(Daemon):