Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing: Add type annotations to monitor.py #6595

Merged
merged 1 commit into from Apr 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 17 additions & 9 deletions lib/rucio/core/monitor.py
Expand Up @@ -26,7 +26,7 @@
from functools import wraps
from pathlib import Path
from threading import Lock
from typing import Any, Optional, TypeVar, Union
from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union

from prometheus_client import REGISTRY, CollectorRegistry, Counter, Gauge, Histogram, generate_latest, multiprocess, push_to_gateway, start_http_server, values
from statsd import StatsClient
Expand All @@ -36,13 +36,16 @@
from rucio.common.stopwatch import Stopwatch
from rucio.common.utils import retrying

if TYPE_CHECKING:
from rucio.common.types import LoggerFunction

_T = TypeVar('_T')
_M = TypeVar('_M', bound="_MultiMetric")

PROMETHEUS_MULTIPROC_DIR = os.environ.get('PROMETHEUS_MULTIPROC_DIR', os.environ.get('prometheus_multiproc_dir', None))


def cleanup_prometheus_files_at_exit():
def cleanup_prometheus_files_at_exit() -> None:
if PROMETHEUS_MULTIPROC_DIR:
multiprocess.mark_process_dead(os.getpid())

Expand Down Expand Up @@ -99,7 +102,12 @@ def get(self):
_HISTOGRAM_DEFAULT_BUCKETS = Histogram.DEFAULT_BUCKETS


def _cleanup_old_prometheus_files(path, file_pattern, cleanup_delay, logger):
def _cleanup_old_prometheus_files(
path: str,
file_pattern: str,
cleanup_delay: float,
logger: "LoggerFunction"
) -> None:
"""cleanup behind processes which didn't finish gracefully."""

oldest_accepted_mtime = datetime.now() - timedelta(seconds=cleanup_delay)
Expand All @@ -118,7 +126,7 @@ def _cleanup_old_prometheus_files(path, file_pattern, cleanup_delay, logger):
pass


def cleanup_old_prometheus_files(logger=logging.log):
def cleanup_old_prometheus_files(logger: "LoggerFunction" = logging.log) -> None:
path = PROMETHEUS_MULTIPROC_DIR
if path:
_cleanup_old_prometheus_files(path, file_pattern='gauge_live*.db', cleanup_delay=timedelta(hours=1).total_seconds(), logger=logger)
Expand All @@ -128,7 +136,7 @@ def cleanup_old_prometheus_files(logger=logging.log):
@retrying(retry_on_exception=lambda _: True,
wait_fixed=500,
stop_max_attempt_number=2)
def generate_prometheus_metrics():
def generate_prometheus_metrics() -> bytes:
cleanup_old_prometheus_files()

registry = CollectorRegistry()
Expand Down Expand Up @@ -207,7 +215,7 @@ def inc(self, delta=1):
if STATSD_CLIENT:
STATSD_CLIENT.incr(self._statsd, delta)

def init_prometheus_metric(self, name: str, documentation: Optional[str], labelnames: Sequence[str] = ()):
def init_prometheus_metric(self, name: str, documentation: str, labelnames: Sequence[str] = ()) -> Counter:
return Counter(name, documentation, labelnames=labelnames, registry=self._registry)


Expand All @@ -218,7 +226,7 @@ def set(self, value):
if STATSD_CLIENT:
STATSD_CLIENT.gauge(self._statsd, value)

def init_prometheus_metric(self, name: str, documentation: Optional[str], labelnames: Sequence[str] = ()):
def init_prometheus_metric(self, name: str, documentation: str, labelnames: Sequence[str] = ()) -> Gauge:
return Gauge(name, documentation, labelnames=labelnames, registry=self._registry)


Expand All @@ -242,7 +250,7 @@ def observe(self, value: float):
if STATSD_CLIENT:
STATSD_CLIENT.timing(self._statsd, value * 1000)

def init_prometheus_metric(self, name: str, documentation: Optional[str], labelnames: Sequence[str] = ()):
def init_prometheus_metric(self, name: str, documentation: str, labelnames: Sequence[str] = ()) -> Histogram:
return Histogram(name, documentation, labelnames=labelnames, registry=self._registry, buckets=self._histogram_buckets)

def __enter__(self):
Expand Down Expand Up @@ -334,7 +342,7 @@ def __init__(self, prefix: Optional[str] = None, module: Optional[str] = None,
self.registry = registry or REGISTRY
self.push_gateways = push_gateways or []

def full_name(self, name: str):
def full_name(self, name: str) -> str:
if self.prefix:
return f'{self.prefix}.{name}'
return name
Expand Down