From 40993fe003af118947a73baa1331e6d6aeaf70d2 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Thu, 13 Oct 2022 11:54:45 -0400 Subject: [PATCH] fix(profiling): Race condition spawning multiple profiling threads (#1676) There is a race condition where multiple profiling threads may be spawned. Specifically, if `start_profiling` is called immediately after `stop_profiling`. This happens because `stop_profiling` does not immediately terminate the thread, instead the thread will check that the event was set and exit at the end of the current iteration. If `start_profiling` is called during the iteration, the event gets set again and the old thread will continue running. To fix this, a new event is created when a profiling thread starts so they can be terminated independently. --- sentry_sdk/profiler.py | 171 +++++++++++++++++++++++------------------ tests/test_profiler.py | 55 ++++++++++++- 2 files changed, 151 insertions(+), 75 deletions(-) diff --git a/sentry_sdk/profiler.py b/sentry_sdk/profiler.py index fc409abfe7..38e54b8c5b 100644 --- a/sentry_sdk/profiler.py +++ b/sentry_sdk/profiler.py @@ -25,12 +25,14 @@ import sentry_sdk from sentry_sdk._compat import PY33 +from sentry_sdk._queue import Queue from sentry_sdk._types import MYPY from sentry_sdk.utils import nanosecond_time if MYPY: from types import FrameType from typing import Any + from typing import Callable from typing import Deque from typing import Dict from typing import Generator @@ -43,8 +45,8 @@ FrameData = namedtuple("FrameData", ["name", "file", "line"]) -_sample_buffer = None # type: Optional[_SampleBuffer] -_scheduler = None # type: Optional[_Scheduler] +_sample_buffer = None # type: Optional[SampleBuffer] +_scheduler = None # type: Optional[Scheduler] def setup_profiler(options): @@ -70,17 +72,18 @@ def setup_profiler(options): # To buffer samples for `buffer_secs` at `frequency` Hz, we need # a capcity of `buffer_secs * frequency`. - _sample_buffer = _SampleBuffer(capacity=buffer_secs * frequency) - - profiler_mode = options["_experiments"].get("profiler_mode", _SigprofScheduler.mode) - if profiler_mode == _SigprofScheduler.mode: - _scheduler = _SigprofScheduler(frequency=frequency) - elif profiler_mode == _SigalrmScheduler.mode: - _scheduler = _SigalrmScheduler(frequency=frequency) - elif profiler_mode == _SleepScheduler.mode: - _scheduler = _SleepScheduler(frequency=frequency) - elif profiler_mode == _EventScheduler.mode: - _scheduler = _EventScheduler(frequency=frequency) + _sample_buffer = SampleBuffer(capacity=buffer_secs * frequency) + _sampler = _init_sample_stack_fn(_sample_buffer) + + profiler_mode = options["_experiments"].get("profiler_mode", SigprofScheduler.mode) + if profiler_mode == SigprofScheduler.mode: + _scheduler = SigprofScheduler(sampler=_sampler, frequency=frequency) + elif profiler_mode == SigalrmScheduler.mode: + _scheduler = SigalrmScheduler(sampler=_sampler, frequency=frequency) + elif profiler_mode == SleepScheduler.mode: + _scheduler = SleepScheduler(sampler=_sampler, frequency=frequency) + elif profiler_mode == EventScheduler.mode: + _scheduler = EventScheduler(sampler=_sampler, frequency=frequency) else: raise ValueError("Unknown profiler mode: {}".format(profiler_mode)) _scheduler.setup() @@ -101,23 +104,27 @@ def teardown_profiler(): _scheduler = None -def _sample_stack(*args, **kwargs): - # type: (*Any, **Any) -> None - """ - Take a sample of the stack on all the threads in the process. - This should be called at a regular interval to collect samples. - """ +def _init_sample_stack_fn(buffer): + # type: (SampleBuffer) -> Callable[..., None] - assert _sample_buffer is not None - _sample_buffer.write( - ( - nanosecond_time(), - [ - (tid, extract_stack(frame)) - for tid, frame in sys._current_frames().items() - ], + def _sample_stack(*args, **kwargs): + # type: (*Any, **Any) -> None + """ + Take a sample of the stack on all the threads in the process. + This should be called at a regular interval to collect samples. + """ + + buffer.write( + ( + nanosecond_time(), + [ + (tid, extract_stack(frame)) + for tid, frame in sys._current_frames().items() + ], + ) ) - ) + + return _sample_stack # We want to impose a stack depth limit so that samples aren't too large. @@ -248,7 +255,7 @@ def to_json(self, event_opt): } -class _SampleBuffer(object): +class SampleBuffer(object): """ A simple implementation of a ring buffer to buffer the samples taken. @@ -348,11 +355,12 @@ def slice_profile(self, start_ns, stop_ns): } -class _Scheduler(object): +class Scheduler(object): mode = "unknown" - def __init__(self, frequency): - # type: (int) -> None + def __init__(self, sampler, frequency): + # type: (Callable[..., None], int) -> None + self.sampler = sampler self._lock = threading.Lock() self._count = 0 self._interval = 1.0 / frequency @@ -378,7 +386,7 @@ def stop_profiling(self): return self._count == 0 -class _ThreadScheduler(_Scheduler): +class ThreadScheduler(Scheduler): """ This abstract scheduler is based on running a daemon thread that will call the sampler at a regular interval. @@ -387,10 +395,10 @@ class _ThreadScheduler(_Scheduler): mode = "thread" name = None # type: Optional[str] - def __init__(self, frequency): - # type: (int) -> None - super(_ThreadScheduler, self).__init__(frequency) - self.event = threading.Event() + def __init__(self, sampler, frequency): + # type: (Callable[..., None], int) -> None + super(ThreadScheduler, self).__init__(sampler=sampler, frequency=frequency) + self.stop_events = Queue() def setup(self): # type: () -> None @@ -402,34 +410,37 @@ def teardown(self): def start_profiling(self): # type: () -> bool - if super(_ThreadScheduler, self).start_profiling(): + if super(ThreadScheduler, self).start_profiling(): # make sure to clear the event as we reuse the same event # over the lifetime of the scheduler - self.event.clear() + event = threading.Event() + self.stop_events.put_nowait(event) + run = self.make_run(event) # make sure the thread is a daemon here otherwise this # can keep the application running after other threads # have exited - thread = threading.Thread(name=self.name, target=self.run, daemon=True) + thread = threading.Thread(name=self.name, target=run, daemon=True) thread.start() return True return False def stop_profiling(self): # type: () -> bool - if super(_ThreadScheduler, self).stop_profiling(): + if super(ThreadScheduler, self).stop_profiling(): # make sure the set the event here so that the thread # can check to see if it should keep running - self.event.set() + event = self.stop_events.get_nowait() + event.set() return True return False - def run(self): - # type: () -> None + def make_run(self, event): + # type: (threading.Event) -> Callable[..., None] raise NotImplementedError -class _SleepScheduler(_ThreadScheduler): +class SleepScheduler(ThreadScheduler): """ This scheduler uses time.sleep to wait the required interval before calling the sampling function. @@ -438,29 +449,34 @@ class _SleepScheduler(_ThreadScheduler): mode = "sleep" name = "sentry.profiler.SleepScheduler" - def run(self): - # type: () -> None - last = time.perf_counter() + def make_run(self, event): + # type: (threading.Event) -> Callable[..., None] - while True: - # some time may have elapsed since the last time - # we sampled, so we need to account for that and - # not sleep for too long - now = time.perf_counter() - elapsed = max(now - last, 0) + def run(): + # type: () -> None + last = time.perf_counter() - if elapsed < self._interval: - time.sleep(self._interval - elapsed) + while True: + # some time may have elapsed since the last time + # we sampled, so we need to account for that and + # not sleep for too long + now = time.perf_counter() + elapsed = max(now - last, 0) - last = time.perf_counter() + if elapsed < self._interval: + time.sleep(self._interval - elapsed) + + last = time.perf_counter() - if self.event.is_set(): - break + if event.is_set(): + break - _sample_stack() + self.sampler() + return run -class _EventScheduler(_ThreadScheduler): + +class EventScheduler(ThreadScheduler): """ This scheduler uses threading.Event to wait the required interval before calling the sampling function. @@ -469,18 +485,25 @@ class _EventScheduler(_ThreadScheduler): mode = "event" name = "sentry.profiler.EventScheduler" - def run(self): - # type: () -> None - while True: - self.event.wait(timeout=self._interval) + def make_run(self, event): + # type: (threading.Event) -> Callable[..., None] + + def run(): + # type: () -> None + while True: + event.wait(timeout=self._interval) + + if event.is_set(): + break + + self.sampler() - if self.event.is_set(): - break + self.sampler() - _sample_stack() + return run -class _SignalScheduler(_Scheduler): +class SignalScheduler(Scheduler): """ This abstract scheduler is based on UNIX signals. It sets up a signal handler for the specified signal, and the matching itimer in order @@ -513,7 +536,7 @@ def setup(self): # This setups a process wide signal handler that will be called # at an interval to record samples. try: - signal.signal(self.signal_num, _sample_stack) + signal.signal(self.signal_num, self.sampler) except ValueError: raise ValueError( "Signal based profiling can only be enabled from the main thread." @@ -535,20 +558,20 @@ def teardown(self): def start_profiling(self): # type: () -> bool - if super(_SignalScheduler, self).start_profiling(): + if super(SignalScheduler, self).start_profiling(): signal.setitimer(self.signal_timer, self._interval, self._interval) return True return False def stop_profiling(self): # type: () -> bool - if super(_SignalScheduler, self).stop_profiling(): + if super(SignalScheduler, self).stop_profiling(): signal.setitimer(self.signal_timer, 0) return True return False -class _SigprofScheduler(_SignalScheduler): +class SigprofScheduler(SignalScheduler): """ This scheduler uses SIGPROF to regularly call a signal handler where the samples will be taken. @@ -581,7 +604,7 @@ def signal_timer(self): return signal.ITIMER_PROF -class _SigalrmScheduler(_SignalScheduler): +class SigalrmScheduler(SignalScheduler): """ This scheduler uses SIGALRM to regularly call a signal handler where the samples will be taken. diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 5feae5cc11..8b5d1fb5a6 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -2,10 +2,16 @@ import platform import sys import threading +import time import pytest -from sentry_sdk.profiler import extract_stack, get_frame_name, setup_profiler +from sentry_sdk.profiler import ( + SleepScheduler, + extract_stack, + get_frame_name, + setup_profiler, +) minimum_python_33 = pytest.mark.skipif( @@ -148,3 +154,50 @@ def test_extract_stack_with_max_depth(depth, max_stack_depth, actual_depth): # index 0 contains the inner most frame on the stack, so the lamdba # should be at index `actual_depth` assert stack[actual_depth].name == "", actual_depth + + +def get_scheduler_threads(scheduler): + return [thread for thread in threading.enumerate() if thread.name == scheduler.name] + + +@minimum_python_33 +def test_sleep_scheduler_single_background_thread(): + def sampler(): + pass + + scheduler = SleepScheduler(sampler=sampler, frequency=1000) + + assert scheduler.start_profiling() + + # the scheduler thread does not immediately exit + # but it should exit after the next time it samples + assert scheduler.stop_profiling() + + assert scheduler.start_profiling() + + # because the scheduler thread does not immediately exit + # after stop_profiling is called, we have to wait a little + # otherwise, we'll see an extra scheduler thread in the + # following assertion + # + # one iteration of the scheduler takes 1.0 / frequency seconds + # so make sure this sleeps for longer than that to avoid flakes + time.sleep(0.002) + + # there should be 1 scheduler thread now because the first + # one should be stopped and a new one started + assert len(get_scheduler_threads(scheduler)) == 1 + + assert scheduler.stop_profiling() + + # because the scheduler thread does not immediately exit + # after stop_profiling is called, we have to wait a little + # otherwise, we'll see an extra scheduler thread in the + # following assertion + # + # one iteration of the scheduler takes 1.0 / frequency seconds + # so make sure this sleeps for longer than that to avoid flakes + time.sleep(0.002) + + # there should be 0 scheduler threads now because they stopped + assert len(get_scheduler_threads(scheduler)) == 0