Skip to content

Commit

Permalink
Daemons: Fix graceful stop logic
Browse files Browse the repository at this point in the history
  • Loading branch information
rdimaio committed Mar 12, 2024
1 parent fe86b52 commit 91599a5
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 19 deletions.
1 change: 1 addition & 0 deletions bin/rucio-abacus-account
Expand Up @@ -68,6 +68,7 @@ if __name__ == "__main__":
args = parser.parse_args()
abacus_account = AbacusAccount(once=args.run_once, threads=args.threads, fill_history_table=args.enable_history, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, abacus_account.stop)
signal.signal(signal.SIGINT, abacus_account.stop)
try:
abacus_account.run()
except KeyboardInterrupt:
Expand Down
1 change: 1 addition & 0 deletions bin/rucio-judge-repairer
Expand Up @@ -40,6 +40,7 @@ if __name__ == "__main__":
args = parser.parse_args()
judge_repairer = JudgeRepairer(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, judge_repairer.stop)
signal.signal(signal.SIGINT, judge_repairer.stop)
try:
judge_repairer.run()
except KeyboardInterrupt:
Expand Down
1 change: 1 addition & 0 deletions bin/rucio-reaper
Expand Up @@ -77,6 +77,7 @@ if __name__ == "__main__":
auto_exclude_threshold=args.auto_exclude_threshold,
auto_exclude_timeout=args.auto_exclude_timeout)
signal.signal(signal.SIGTERM, reaper.stop)
signal.signal(signal.SIGINT, reaper.stop)
try:
reaper.run()
except KeyboardInterrupt:
Expand Down
1 change: 1 addition & 0 deletions bin/rucio-undertaker
Expand Up @@ -70,6 +70,7 @@ if __name__ == "__main__":
args = parser.parse_args()
undertaker = Undertaker(total_workers=args.total_workers, chunk_size=args.chunk_size, once=args.run_once, sleep_time=args.sleep_time)
signal.signal(signal.SIGTERM, undertaker.stop)
signal.signal(signal.SIGINT, undertaker.stop)
try:
undertaker.run()
except KeyboardInterrupt:
Expand Down
43 changes: 24 additions & 19 deletions lib/rucio/daemons/common.py
Expand Up @@ -18,6 +18,7 @@
import logging
import os
import queue
import signal
import socket
import threading
import time
Expand All @@ -36,6 +37,7 @@
T = TypeVar('T')
METRICS = MetricManager(module=__name__)


class Daemon(ABC):
"""
Base daemon class
Expand Down Expand Up @@ -92,19 +94,20 @@ def _call_daemon(self, activities: Optional[list[str]] = None) -> None:
Run the daemon loop and call the function _run_once at each iteration
"""

run_once_fnc = functools.partial(self._run_once)
while not self.graceful_stop.is_set():
run_once_fnc = functools.partial(self._run_once)

daemon = db_workqueue(
once=self.once,
graceful_stop=self.graceful_stop,
executable=self.daemon_name,
partition_wait_time=self.partition_wait_time,
sleep_time=self.sleep_time,
activities=activities,
)(run_once_fnc)
daemon = db_workqueue(
once=self.once,
graceful_stop=self.graceful_stop,
executable=self.daemon_name,
partition_wait_time=self.partition_wait_time,
sleep_time=self.sleep_time,
activities=activities,
)(run_once_fnc)

for _ in daemon():
pass
for _ in daemon():
pass

def run(self) -> None:
"""
Expand All @@ -121,17 +124,12 @@ def run(self) -> None:
for t in thread_list:
t.start()

for t in thread_list:
t.join()

if not self.once:
logging.info("%s: waiting for interrupts", self.daemon_name)

# Interruptible joins require a timeout.
while thread_list:
thread_list = [
thread.join(timeout=3.14)
for thread in thread_list
if thread and thread.is_alive()
]

def stop(
self, signum: Optional[int] = None, frame: Optional[FrameType] = None
) -> None:
Expand All @@ -143,6 +141,13 @@ def stop(
:param frame: stack frame
"""
self.graceful_stop.set()
logging.info("%s: terminating", self.daemon_name)
if signum == signal.SIGINT or signum == signal.SIGTERM:
if hasattr(self.stop, 'received_sigint_or_sigterm'):
exit(1)
else:
self.stop.received_sigint_or_sigterm = True


class HeartbeatHandler:
"""
Expand Down
1 change: 1 addition & 0 deletions lib/rucio/daemons/reaper/reaper.py
Expand Up @@ -58,6 +58,7 @@

METRICS = MetricManager(module=__name__)
REGION = make_region_memcached(expiration_time=600)
EXCLUDED_RSE_GAUGE = METRICS.gauge('excluded_rses.{rse}', documentation='Temporarly excluded RSEs')


class Reaper(Daemon):
Expand Down

0 comments on commit 91599a5

Please sign in to comment.