Skip to content

Commit

Permalink
Refactor Judge Repairer to inherit from Daemon base class; #6478
Browse files Browse the repository at this point in the history
  • Loading branch information
rdimaio committed Feb 15, 2024
1 parent ee265e6 commit 5fcff6c
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 140 deletions.
9 changes: 4 additions & 5 deletions bin/rucio-judge-repairer
Expand Up @@ -19,9 +19,8 @@ Judge-Repairer is a daemon to repair stuck replication rules.
"""

import argparse
import signal

from rucio.daemons.judge.repairer import run, stop
from rucio.daemons.judge.repairer import JudgeRepairer


def get_parser():
Expand All @@ -36,10 +35,10 @@ def get_parser():


if __name__ == "__main__":
signal.signal(signal.SIGTERM, stop)
parser = get_parser()
args = parser.parse_args()
judge_repairer = JudgeRepairer(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time)
try:
run(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time)
judge_repairer.run()
except KeyboardInterrupt:
stop()
judge_repairer.stop()
167 changes: 58 additions & 109 deletions lib/rucio/daemons/judge/repairer.py
Expand Up @@ -16,130 +16,79 @@
"""
Judge-Repairer is a daemon to repair stuck replication rules.
"""
import functools
import logging
import threading
import time
import traceback
from copy import deepcopy
from datetime import datetime, timedelta
from random import randint
from re import match
from typing import TYPE_CHECKING
from typing import Any
from rucio.db.sqla.constants import ORACLE_CONNECTION_LOST_CONTACT_REGEX, ORACLE_RESOURCE_BUSY_REGEX

from sqlalchemy.exc import DatabaseError

import rucio.db.sqla.util
from rucio.common import exception
from rucio.common.exception import DatabaseException
from rucio.common.logging import setup_logging
from rucio.core.monitor import MetricManager
from rucio.core.rule import repair_rule, get_stuck_rules
from rucio.daemons.common import run_daemon

if TYPE_CHECKING:
from types import FrameType
from typing import Optional
from rucio.daemons.common import Daemon, HeartbeatHandler

METRICS = MetricManager(module=__name__)
graceful_stop = threading.Event()
DAEMON_NAME = 'judge-repairer'


def rule_repairer(once=False, sleep_time=60):
"""
Main loop to check for STUCK replication rules
"""
paused_rules = {} # {rule_id: datetime}
run_daemon(
once=once,
graceful_stop=graceful_stop,
executable=DAEMON_NAME,
partition_wait_time=1,
sleep_time=sleep_time,
run_once_fnc=functools.partial(
run_once,
paused_rules=paused_rules,
delta=-1 if once else 1800,
)
)


def run_once(paused_rules, delta, heartbeat_handler, **_kwargs):
worker_number, total_workers, logger = heartbeat_handler.live()

start = time.time()

# Refresh paused rules
iter_paused_rules = deepcopy(paused_rules)
for key in iter_paused_rules:
if datetime.utcnow() > paused_rules[key]:
del paused_rules[key]

# Select a bunch of rules for this worker to repair
rules = get_stuck_rules(total_workers=total_workers,
worker_number=worker_number,
delta=delta,
limit=100,
blocked_rules=[key for key in paused_rules])

logger(logging.DEBUG, 'index query time %f fetch size is %d' % (time.time() - start, len(rules)))

if not rules:
logger(logging.DEBUG, 'did not get any work (paused_rules=%s)' % (str(len(paused_rules))))
return

for rule_id in rules:
_, _, logger = heartbeat_handler.live()
rule_id = rule_id[0]
logger(logging.INFO, 'Repairing rule %s' % (rule_id))
if graceful_stop.is_set():
break
try:
start = time.time()
repair_rule(rule_id=rule_id)
logger(logging.DEBUG, 'repairing of %s took %f' % (rule_id, time.time() - start))
except (DatabaseException, DatabaseError) as e:
if match(ORACLE_RESOURCE_BUSY_REGEX, str(e.args[0])):
paused_rules[rule_id] = datetime.utcnow() + timedelta(seconds=randint(600, 2400))
logger(logging.WARNING, 'Locks detected for %s' % (rule_id))
METRICS.counter('exceptions.{exception}').labels(exception='LocksDetected').inc()
elif match('.*QueuePool.*', str(e.args[0])):
logger(logging.WARNING, traceback.format_exc())
METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc()
elif match(ORACLE_CONNECTION_LOST_CONTACT_REGEX, str(e.args[0])):
logger(logging.WARNING, traceback.format_exc())
METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc()
else:
logger(logging.ERROR, traceback.format_exc())
METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc()


def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""

graceful_stop.set()


def run(once=False, threads=1, sleep_time=60):
"""
Starts up the Judge-Repairer threads.
"""
setup_logging(process_name=DAEMON_NAME)

if rucio.db.sqla.util.is_old_db():
raise exception.DatabaseException('Database was not updated, daemon won\'t start')

if once:
rule_repairer(once)
else:
logging.info('Repairer starting %s threads' % str(threads))
threads = [threading.Thread(target=rule_repairer, kwargs={'once': once,
'sleep_time': sleep_time}) for i in range(0, threads)]
[t.start() for t in threads]
# Interruptible joins require a timeout.
while threads[0].is_alive():
[t.join(timeout=3.14) for t in threads]
class JudgeRepairer(Daemon):
def __init__(self, **_kwargs) -> None:
super().__init__(daemon_name="judge-repairer", **_kwargs)
self.delta = -1 if self.once else 1800
self.paused_rules = {} # {rule_id: datetime}

def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> tuple[bool, Any]:
worker_number, total_workers, logger = heartbeat_handler.live()
must_sleep = False

start = time.time()

# Refresh paused rules
iter_paused_rules = deepcopy(self.paused_rules)
for key in iter_paused_rules:
if datetime.utcnow() > self.paused_rules[key]:
del self.paused_rules[key]

# Select a bunch of rules for this worker to repair
rules = get_stuck_rules(total_workers=total_workers,
worker_number=worker_number,
delta=self.delta,
limit=100,
blocked_rules=[key for key in self.paused_rules])

logger(logging.DEBUG, 'index query time %f fetch size is %d' % (time.time() - start, len(rules)))

if not rules:
logger(logging.DEBUG, 'did not get any work (paused_rules=%s)' % (str(len(self.paused_rules))))
return must_sleep, None

for rule_id in rules:
_, _, logger = heartbeat_handler.live()
rule_id = rule_id[0]
logger(logging.INFO, 'Repairing rule %s' % (rule_id))
if self.graceful_stop.is_set():
break
try:
start = time.time()
repair_rule(rule_id=rule_id)
logger(logging.DEBUG, 'repairing of %s took %f' % (rule_id, time.time() - start))
except (DatabaseException, DatabaseError) as e:
if match(ORACLE_RESOURCE_BUSY_REGEX, str(e.args[0])):
self.paused_rules[rule_id] = datetime.utcnow() + timedelta(seconds=randint(600, 2400))
logger(logging.WARNING, 'Locks detected for %s' % (rule_id))
METRICS.counter('exceptions.{exception}').labels(exception='LocksDetected').inc()
elif match('.*QueuePool.*', str(e.args[0])):
logger(logging.WARNING, traceback.format_exc())
METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc()
elif match(ORACLE_CONNECTION_LOST_CONTACT_REGEX, str(e.args[0])):
logger(logging.WARNING, traceback.format_exc())
METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc()
else:
logger(logging.ERROR, traceback.format_exc())
METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc()
return must_sleep, None
3 changes: 1 addition & 2 deletions lib/rucio/daemons/reaper/reaper.py
Expand Up @@ -19,7 +19,6 @@

import logging
import random
import threading
import time
import traceback
from configparser import NoOptionError, NoSectionError
Expand Down Expand Up @@ -56,7 +55,7 @@
if TYPE_CHECKING:
from collections.abc import Callable

GRACEFUL_STOP = threading.Event()

METRICS = MetricManager(module=__name__)
REGION = make_region_memcached(expiration_time=600)

Expand Down
2 changes: 0 additions & 2 deletions lib/rucio/daemons/undertaker/undertaker.py
Expand Up @@ -18,7 +18,6 @@
'''

import logging
import threading
import traceback
from copy import deepcopy
from datetime import datetime, timedelta
Expand All @@ -39,7 +38,6 @@
logging.getLogger("requests").setLevel(logging.CRITICAL)

METRICS = MetricManager(module=__name__)
graceful_stop = threading.Event()


class Undertaker(Daemon):
Expand Down
3 changes: 1 addition & 2 deletions tests/test_daemons.py
Expand Up @@ -28,7 +28,7 @@
from rucio.daemons.conveyor import finisher, poller, receiver, stager, submitter, throttler, preparer
from rucio.daemons.follower import follower
from rucio.daemons.hermes import hermes
from rucio.daemons.judge import cleaner, evaluator, injector, repairer
from rucio.daemons.judge import cleaner, evaluator, injector
from rucio.daemons.oauthmanager import oauthmanager
from rucio.daemons.reaper import dark_reaper
from rucio.daemons.replicarecoverer import suspicious_replica_recoverer
Expand Down Expand Up @@ -59,7 +59,6 @@
cleaner,
evaluator,
injector,
repairer,
oauthmanager,
dark_reaper,
suspicious_replica_recoverer,
Expand Down

0 comments on commit 5fcff6c

Please sign in to comment.