/
common.py
507 lines (423 loc) · 18.5 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import functools
import logging
import os
import queue
import signal
import socket
import threading
import time
from abc import ABC, abstractmethod
from collections.abc import Callable, Generator, Iterator, Sequence
from types import FrameType, TracebackType
from typing import Any, Final, Generic, Optional, TypeVar, Union
from rucio.common.exception import DatabaseException
from rucio.common.logging import formatted_logger, setup_logging
from rucio.common.utils import PriorityQueue
import rucio.db.sqla.util
from rucio.core import heartbeat as heartbeat_core
from rucio.core.monitor import MetricManager
T = TypeVar('T')
METRICS = MetricManager(module=__name__)
class Daemon(ABC):
"""
Base daemon class
"""
def __init__(
self,
once: bool = False,
total_workers: int = 1,
sleep_time: int = 60,
partition_wait_time: int = 1,
daemon_name: str = "undefined_daemon",
) -> None:
"""
:param once: Only execute one iteration.
:param total_workers: Number of total workers.
:param sleep_time: Sleep time between daemon iterations.
:param partition_wait_time: Time to wait for database partition rebalancing before starting the actual daemon loop.
:param daemon_name: Name of daemon that is constructed.
"""
self.once: Final[bool] = once
self.total_workers: Final[int] = total_workers
self.sleep_time: Final[int] = sleep_time
self.partition_wait_time: Final[int] = partition_wait_time
self.daemon_name = daemon_name
self.graceful_stop = threading.Event()
setup_logging(process_name=daemon_name)
@staticmethod
def _pre_run_checks() -> None:
"""
Checks to run before daemon execution
"""
if rucio.db.sqla.util.is_old_db():
raise DatabaseException("Database was not updated, daemon won't start")
@abstractmethod
def _run_once(
self, heartbeat_handler: "HeartbeatHandler", **_kwargs
) -> tuple[bool, Any]:
"""
Daemon-specific logic (to be defined in child classes) for a single iteration
:param heartbeat_handler: Handler to set and manage the heartbeat for this execution.
:returns: Tuple of (must_sleep, ret_value).
must_sleep: set to True to signal to the calling context that it must sleep before next execution, False otherwise
ret_value: Daemon-specific return value
"""
pass
def _call_daemon(self, activities: Optional[list[str]] = None) -> None:
"""
Run the daemon loop and call the function _run_once at each iteration
"""
while not self.graceful_stop.is_set():
run_once_fnc = functools.partial(self._run_once)
daemon = db_workqueue(
once=self.once,
graceful_stop=self.graceful_stop,
executable=self.daemon_name,
partition_wait_time=self.partition_wait_time,
sleep_time=self.sleep_time,
activities=activities,
)(run_once_fnc)
for _ in daemon():
pass
def run(self) -> None:
"""
Run the daemon.
"""
self._pre_run_checks()
logging.info("%s: starting threads", self.daemon_name)
thread_list = [
threading.Thread(target=self._call_daemon)
for _ in range(self.total_workers)
]
for t in thread_list:
t.start()
for t in thread_list:
t.join()
if not self.once:
logging.info("%s: waiting for interrupts", self.daemon_name)
def stop(
self, signum: Optional[int] = None, frame: Optional[FrameType] = None
) -> None:
"""
Graceful exit the daemon. Used as handler for SIGTERM.
The unused parameters are needed for the method to be accepted as handler for signal.signal().
:param signum: signal number
:param frame: stack frame
"""
self.graceful_stop.set()
logging.info("%s: terminating", self.daemon_name)
if signum == signal.SIGINT or signum == signal.SIGTERM:
if hasattr(self.stop, 'received_sigint_or_sigterm'):
exit(1)
else:
self.stop.received_sigint_or_sigterm = True
class HeartbeatHandler:
"""
Simple contextmanager which sets a heartbeat and associated logger on entry and cleans up the heartbeat on exit.
"""
def __init__(self, executable: str, renewal_interval: int) -> None:
"""
:param executable: the executable name which will be set in heartbeats
:param renewal_interval: the interval at which the heartbeat will be renewed in the database.
Calls to live() in-between intervals will re-use the locally cached heartbeat.
"""
self.executable = executable
self._hash_executable = None
self.renewal_interval = renewal_interval
self.older_than = renewal_interval * 10 if renewal_interval > 0 else None # 10 was chosen without any particular reason
self.hostname = socket.getfqdn()
self.pid = os.getpid()
self.hb_thread = threading.current_thread()
self.logger = logging.log
self.last_heart_beat = None
self.last_time = None
self.last_payload = None
def __enter__(self) -> "HeartbeatHandler":
heartbeat_core.sanity_check(executable=self.executable, hostname=self.hostname)
self.live()
return self
def __exit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType):
if self.last_heart_beat:
heartbeat_core.die(self.executable, self.hostname, self.pid, self.hb_thread)
if self.logger:
self.logger(logging.INFO, 'Heartbeat cleaned up')
@property
def hash_executable(self) -> str:
if not self._hash_executable:
self._hash_executable = heartbeat_core.calc_hash(self.executable)
return self._hash_executable
@property
def short_executable(self) -> str:
return min(self.executable, self.hash_executable, key=len)
def live(self, force_renew: bool = False, payload: Optional[str] = None):
"""
:return: a tuple: <the number of the current worker>, <total number of workers>, <decorated logger>
"""
if force_renew \
or not self.last_time \
or not self.last_heart_beat \
or self.last_time < datetime.datetime.now() - datetime.timedelta(seconds=self.renewal_interval) \
or self.last_payload != payload:
if self.older_than:
self.last_heart_beat = heartbeat_core.live(self.executable, self.hostname, self.pid, self.hb_thread, payload=payload, older_than=self.older_than)
else:
self.last_heart_beat = heartbeat_core.live(self.executable, self.hostname, self.pid, self.hb_thread, payload=payload)
prefix = '[%i/%i]: ' % (self.last_heart_beat['assign_thread'], self.last_heart_beat['nr_threads'])
self.logger = formatted_logger(logging.log, prefix + '%s')
if not self.last_time:
self.logger(logging.DEBUG, 'First heartbeat set')
else:
self.logger(logging.DEBUG, 'Heartbeat renewed')
self.last_time = datetime.datetime.now()
self.last_payload = payload
return self.last_heart_beat['assign_thread'], self.last_heart_beat['nr_threads'], self.logger
def _activity_looper(
once: bool,
sleep_time: int,
activities: Optional[Sequence[str]],
heartbeat_handler: HeartbeatHandler,
) -> Generator[tuple[str, float], tuple[float, bool], None]:
"""
Generator which loops (either once, or indefinitely) over all activities while ensuring that `sleep_time`
passes between handling twice the same activity.
Returns an activity and how much time the calling context must sleep before handling that activity
and expects to get in return the time when the activity started to be executed and whether next
execution must be immediate.
"""
# For each activity, the priority queue will keep the next absolute time when that
# activity must be handled.
activity_next_exe_time = PriorityQueue()
# On startup, we schedule to immediately handle all activities.
now = time.time()
for activity in activities or [None]:
activity_next_exe_time[activity] = now
while activity_next_exe_time:
activity = activity_next_exe_time.top()
desired_exe_time = activity_next_exe_time[activity]
if once:
time_to_sleep = 0
activity_next_exe_time.pop()
else:
time_to_sleep = desired_exe_time - time.time()
logger = heartbeat_handler.logger
if time_to_sleep > 0:
if activity:
logger(logging.DEBUG, 'Switching to activity %s and sleeping %s seconds', activity, time_to_sleep)
else:
logger(logging.DEBUG, 'Sleeping %s seconds', time_to_sleep)
else:
if activity:
logger(logging.DEBUG, 'Switching to activity %s', activity)
else:
logger(logging.DEBUG, 'Starting next iteration')
# The calling context notifies us when the activity actually got handled. And if sleeping is desired.
actual_exe_time, must_sleep = yield activity, time_to_sleep
if not once:
if must_sleep:
time_diff = time.time() - actual_exe_time
time_to_sleep = max(1, sleep_time - time_diff)
activity_next_exe_time[activity] = time.time() + time_to_sleep
else:
activity_next_exe_time[activity] = time.time() + 1
def db_workqueue(
once: bool,
graceful_stop: threading.Event,
executable: str,
partition_wait_time: int,
sleep_time: int,
activities: Optional[Sequence[str]] = None,
):
"""
Used to wrap a function for interacting with the database as a work queue: i.e. to select
a set of rows and perform some work on those rows while ensuring that two instances running in parallel don't
work on the same set of rows. The last condition is ensured by using heartbeats to keep track of currently
active workers.
:param once: Whether to stop after one iteration
:param graceful_stop: the threading.Event() object used for graceful stop of the daemon
:param executable: the name of the executable used for hearbeats
:param partition_wait_time: time to wait for database partition rebalancing before starting the actual daemon loop
:param sleep_time: time to sleep between the iterations of the daemon
:param activities: optional list of activities on which to work. The run_once_fnc will be called on activities one by one.
"""
def _decorate(run_once_fnc: Callable[..., Optional[Union[bool, tuple[bool, T]]]]) -> Callable[[], Iterator[Optional[T]]]:
@functools.wraps(run_once_fnc)
def _generator():
with HeartbeatHandler(executable=executable, renewal_interval=sleep_time - 1) as heartbeat_handler:
logger = heartbeat_handler.logger
logger(logging.INFO, 'started')
if partition_wait_time:
graceful_stop.wait(partition_wait_time)
_, _, logger = heartbeat_handler.live(force_renew=True)
activity_loop = _activity_looper(once=once, sleep_time=sleep_time, activities=activities, heartbeat_handler=heartbeat_handler)
activity, time_to_sleep = next(activity_loop, (None, None))
while time_to_sleep is not None:
if graceful_stop.is_set():
break
if time_to_sleep > 0:
graceful_stop.wait(time_to_sleep)
_, _, logger = heartbeat_handler.live()
must_sleep = True
start_time = time.time()
try:
result = run_once_fnc(heartbeat_handler=heartbeat_handler, activity=activity)
# Handle return values already existing in the code
# TODO: update all existing daemons to always explicitly return (must_sleep, ret_value)
if result is None:
must_sleep = True
ret_value = None
elif isinstance(result, bool):
must_sleep = result
ret_value = None
else:
must_sleep, ret_value = result
if ret_value is not None:
yield ret_value
except Exception as e:
METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc()
logger(logging.CRITICAL, "Exception", exc_info=True)
if once:
raise
try:
activity, time_to_sleep = activity_loop.send((start_time, must_sleep))
except StopIteration:
break
if not once:
logger(logging.INFO, 'Graceful stop requested')
return _generator
return _decorate
def run_daemon(
once: bool,
graceful_stop: threading.Event,
executable: str,
partition_wait_time: int,
sleep_time: int,
run_once_fnc: Callable[..., Optional[Union[bool, tuple[bool, Any]]]],
activities: Optional[list[str]] = None):
"""
Run the daemon loop and call the function run_once_fnc at each iteration
"""
daemon = db_workqueue(
once=once,
graceful_stop=graceful_stop,
executable=executable,
partition_wait_time=partition_wait_time,
sleep_time=sleep_time,
activities=activities,
)(run_once_fnc)
for _ in daemon():
pass
class ProducerConsumerDaemon(Generic[T]):
"""
Daemon which connects N producers with M consumers via a queue.
"""
def __init__(self, producers, consumers, graceful_stop, logger=logging.log):
self.producers = producers
self.consumers = consumers
self.queue = queue.Queue()
self.lock = threading.Lock()
self.graceful_stop = graceful_stop
self.active_producers = 0
self.producers_done_event = threading.Event()
self.logger = logger
def _produce(
self,
it: Callable[[], Iterator[T]],
wait_for_consumers: bool = False
):
"""
Iterate over the generator function and put the extracted elements into the queue.
Perform a graceful shutdown when graceful_stop is set.
"""
i = it()
with self.lock:
self.active_producers += 1
try:
while not self.graceful_stop.is_set():
if self.queue.qsize() > len(self.consumers):
self.graceful_stop.wait(1)
continue
try:
product = next(i)
self.queue.put(product)
except StopIteration:
break
except Exception as e:
METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc()
self.logger(logging.CRITICAL, "Exception", exc_info=True)
finally:
with self.lock:
self.active_producers -= 1
if not self.active_producers > 0:
self.producers_done_event.set()
if wait_for_consumers:
self.queue.join()
def _consume(
self,
fnc: Callable[[T], Any]
):
"""
Wait for elements to arrive via the queue and call the given function on each element.
If producers_done_event is set, handle all remaining elements from the queue and exit gracefully.
"""
while not self.producers_done_event.is_set() or self.queue.unfinished_tasks:
try:
product = self.queue.get_nowait()
except queue.Empty:
self.producers_done_event.wait(1)
continue
try:
fnc(product)
except Exception as e:
METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc()
self.logger(logging.CRITICAL, "Exception", exc_info=True)
finally:
self.queue.task_done()
def run(self):
producer_threads = []
for i, producer in enumerate(self.producers):
thread = threading.Thread(
target=self._produce,
name=f'producer-{i}-{producer.__name__}',
kwargs={
'it': producer,
'wait_for_consumers': True
}
)
thread.start()
producer_threads.append(thread)
consumer_threads = []
for i, consumer in enumerate(self.consumers):
thread = threading.Thread(
target=self._consume,
name=f'consumer-{i}-{consumer.__name__}',
kwargs={
'fnc': consumer,
}
)
thread.start()
consumer_threads.append(thread)
logging.info('waiting for interrupts')
while producer_threads:
for thread in producer_threads:
thread.join(timeout=3.14)
producer_threads = [thread for thread in producer_threads if thread.is_alive()]
self.producers_done_event.set()
while consumer_threads:
for thread in consumer_threads:
thread.join(timeout=3.14)
consumer_threads = [thread for thread in consumer_threads if thread.is_alive()]