From c4642e98c34bdbc1fadd325703f6874eb08e0d5e Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 12 Oct 2022 15:29:14 -0400 Subject: [PATCH 01/10] fix(profiling): Race condition spawning multiple profiling threads There is a race condition where multiple profiling threads may be spawned. Specifically, if `start_profiling` is called immediately after `stop_profiling`. This happens because `stop_profiling` does not immediately terminate the thread, instead the thread will check that the event was set and exit at the end of the current iteration. If `start_profiling` is called during the iteration, the event gets set again and the old thread will continue running. To fix this, a new event is created when a profiling thread starts so they can be terminated independently. --- sentry_sdk/profiler.py | 175 +++++++++++++++++++++++------------------ tests/test_profiler.py | 48 +++++++++++ 2 files changed, 146 insertions(+), 77 deletions(-) create mode 100644 tests/test_profiler.py diff --git a/sentry_sdk/profiler.py b/sentry_sdk/profiler.py index 86cf1bf91d..c108e1f5d0 100644 --- a/sentry_sdk/profiler.py +++ b/sentry_sdk/profiler.py @@ -16,22 +16,22 @@ import platform import random import signal +import sys import threading import time -import sys import uuid - from collections import deque from contextlib import contextmanager import sentry_sdk from sentry_sdk._compat import PY33 - +from sentry_sdk._queue import Queue from sentry_sdk._types import MYPY from sentry_sdk.utils import nanosecond_time if MYPY: from typing import Any + from typing import Callable from typing import Deque from typing import Dict from typing import Generator @@ -45,8 +45,8 @@ FrameData = Tuple[str, str, int] -_sample_buffer = None # type: Optional[_SampleBuffer] -_scheduler = None # type: Optional[_Scheduler] +_sample_buffer = None # type: Optional[SampleBuffer] +_scheduler = None # type: Optional[Scheduler] def setup_profiler(options): @@ -72,17 +72,18 @@ def setup_profiler(options): # To buffer samples for `buffer_secs` at `frequency` Hz, we need # a capcity of `buffer_secs * frequency`. - _sample_buffer = _SampleBuffer(capacity=buffer_secs * frequency) - - profiler_mode = options["_experiments"].get("profiler_mode", _SigprofScheduler.mode) - if profiler_mode == _SigprofScheduler.mode: - _scheduler = _SigprofScheduler(frequency=frequency) - elif profiler_mode == _SigalrmScheduler.mode: - _scheduler = _SigalrmScheduler(frequency=frequency) - elif profiler_mode == _SleepScheduler.mode: - _scheduler = _SleepScheduler(frequency=frequency) - elif profiler_mode == _EventScheduler.mode: - _scheduler = _EventScheduler(frequency=frequency) + _sample_buffer = SampleBuffer(capacity=buffer_secs * frequency) + _sampler = _init_sample_stack_fn(_sample_buffer) + + profiler_mode = options["_experiments"].get("profiler_mode", SigprofScheduler.mode) + if profiler_mode == SigprofScheduler.mode: + _scheduler = SigprofScheduler(sampler=_sampler, frequency=frequency) + elif profiler_mode == SigalrmScheduler.mode: + _scheduler = SigalrmScheduler(sampler=_sampler, frequency=frequency) + elif profiler_mode == SleepScheduler.mode: + _scheduler = SleepScheduler(sampler=_sampler, frequency=frequency) + elif profiler_mode == EventScheduler.mode: + _scheduler = EventScheduler(sampler=_sampler, frequency=frequency) else: raise ValueError("Unknown profiler mode: {}".format(profiler_mode)) _scheduler.setup() @@ -103,23 +104,27 @@ def teardown_profiler(): _scheduler = None -def _sample_stack(*args, **kwargs): - # type: (*Any, **Any) -> None - """ - Take a sample of the stack on all the threads in the process. - This should be called at a regular interval to collect samples. - """ +def _init_sample_stack_fn(buffer): + # type: (SampleBuffer) -> Callable[..., None] - assert _sample_buffer is not None - _sample_buffer.write( - ( - nanosecond_time(), - [ - (tid, _extract_stack(frame)) - for tid, frame in sys._current_frames().items() - ], + def _sample_stack(*args, **kwargs): + # type: (*Any, **Any) -> None + """ + Take a sample of the stack on all the threads in the process. + This should be called at a regular interval to collect samples. + """ + + buffer.write( + ( + nanosecond_time(), + [ + (tid, _extract_stack(frame)) + for tid, frame in sys._current_frames().items() + ], + ) ) - ) + + return _sample_stack # We want to impose a stack depth limit so that samples aren't too large. @@ -220,7 +225,7 @@ def to_json(self, event_opt): } -class _SampleBuffer(object): +class SampleBuffer(object): """ A simple implementation of a ring buffer to buffer the samples taken. @@ -320,11 +325,12 @@ def slice_profile(self, start_ns, stop_ns): } -class _Scheduler(object): +class Scheduler(object): mode = "unknown" - def __init__(self, frequency): - # type: (int) -> None + def __init__(self, sampler, frequency): + # type: (Callable[..., None], int) -> None + self.sampler = sampler self._lock = threading.Lock() self._count = 0 self._interval = 1.0 / frequency @@ -350,7 +356,7 @@ def stop_profiling(self): return self._count == 0 -class _ThreadScheduler(_Scheduler): +class ThreadScheduler(Scheduler): """ This abstract scheduler is based on running a daemon thread that will call the sampler at a regular interval. @@ -359,10 +365,10 @@ class _ThreadScheduler(_Scheduler): mode = "thread" name = None # type: Optional[str] - def __init__(self, frequency): - # type: (int) -> None - super(_ThreadScheduler, self).__init__(frequency) - self.event = threading.Event() + def __init__(self, sampler, frequency): + # type: (Callable[..., None], int) -> None + super(ThreadScheduler, self).__init__(sampler=sampler, frequency=frequency) + self.threads = Queue() def setup(self): # type: () -> None @@ -374,34 +380,37 @@ def teardown(self): def start_profiling(self): # type: () -> bool - if super(_ThreadScheduler, self).start_profiling(): + if super(ThreadScheduler, self).start_profiling(): # make sure to clear the event as we reuse the same event # over the lifetime of the scheduler - self.event.clear() + event = threading.Event() + self.threads.put_nowait(event) + run = self.make_run(event) # make sure the thread is a daemon here otherwise this # can keep the application running after other threads # have exited - thread = threading.Thread(name=self.name, target=self.run, daemon=True) + thread = threading.Thread(name=self.name, target=run, daemon=True) thread.start() return True return False def stop_profiling(self): # type: () -> bool - if super(_ThreadScheduler, self).stop_profiling(): + if super(ThreadScheduler, self).stop_profiling(): # make sure the set the event here so that the thread # can check to see if it should keep running - self.event.set() + event = self.threads.get_nowait() + event.set() return True return False - def run(self): - # type: () -> None + def make_run(self, event): + # type: (threading.Event) -> Callable[..., None] raise NotImplementedError -class _SleepScheduler(_ThreadScheduler): +class SleepScheduler(ThreadScheduler): """ This scheduler uses time.sleep to wait the required interval before calling the sampling function. @@ -410,29 +419,34 @@ class _SleepScheduler(_ThreadScheduler): mode = "sleep" name = "sentry.profiler.SleepScheduler" - def run(self): - # type: () -> None - last = time.perf_counter() + def make_run(self, event): + # type: (threading.Event) -> Callable[..., None] - while True: - # some time may have elapsed since the last time - # we sampled, so we need to account for that and - # not sleep for too long - now = time.perf_counter() - elapsed = max(now - last, 0) + def run(): + # type: () -> None + last = time.perf_counter() - if elapsed < self._interval: - time.sleep(self._interval - elapsed) + while True: + # some time may have elapsed since the last time + # we sampled, so we need to account for that and + # not sleep for too long + now = time.perf_counter() + elapsed = max(now - last, 0) - last = time.perf_counter() + if elapsed < self._interval: + time.sleep(self._interval - elapsed) - if self.event.is_set(): - break + last = time.perf_counter() - _sample_stack() + if event.is_set(): + break + self.sampler() -class _EventScheduler(_ThreadScheduler): + return run + + +class EventScheduler(ThreadScheduler): """ This scheduler uses threading.Event to wait the required interval before calling the sampling function. @@ -441,18 +455,25 @@ class _EventScheduler(_ThreadScheduler): mode = "event" name = "sentry.profiler.EventScheduler" - def run(self): - # type: () -> None - while True: - self.event.wait(timeout=self._interval) + def make_run(self, event): + # type: (threading.Event) -> Callable[..., None] + + def run(): + # type: () -> None + while True: + event.wait(timeout=self._interval) + + if event.is_set(): + break + + self.sampler() - if self.event.is_set(): - break + self.sampler() - _sample_stack() + return run -class _SignalScheduler(_Scheduler): +class SignalScheduler(Scheduler): """ This abstract scheduler is based on UNIX signals. It sets up a signal handler for the specified signal, and the matching itimer in order @@ -485,7 +506,7 @@ def setup(self): # This setups a process wide signal handler that will be called # at an interval to record samples. try: - signal.signal(self.signal_num, _sample_stack) + signal.signal(self.signal_num, self.sampler) except ValueError: raise ValueError( "Signal based profiling can only be enabled from the main thread." @@ -507,20 +528,20 @@ def teardown(self): def start_profiling(self): # type: () -> bool - if super(_SignalScheduler, self).start_profiling(): + if super(SignalScheduler, self).start_profiling(): signal.setitimer(self.signal_timer, self._interval, self._interval) return True return False def stop_profiling(self): # type: () -> bool - if super(_SignalScheduler, self).stop_profiling(): + if super(SignalScheduler, self).stop_profiling(): signal.setitimer(self.signal_timer, 0) return True return False -class _SigprofScheduler(_SignalScheduler): +class SigprofScheduler(SignalScheduler): """ This scheduler uses SIGPROF to regularly call a signal handler where the samples will be taken. @@ -553,7 +574,7 @@ def signal_timer(self): return signal.ITIMER_PROF -class _SigalrmScheduler(_SignalScheduler): +class SigalrmScheduler(SignalScheduler): """ This scheduler uses SIGALRM to regularly call a signal handler where the samples will be taken. diff --git a/tests/test_profiler.py b/tests/test_profiler.py new file mode 100644 index 0000000000..6d6686064d --- /dev/null +++ b/tests/test_profiler.py @@ -0,0 +1,48 @@ +import threading +import time + +from sentry_sdk.profiler import SleepScheduler + + +def test_sleep_scheduler_single_background_thread(): + def sampler(): + pass + + scheduler = SleepScheduler(sampler=sampler, frequency=100) + + assert scheduler.start_profiling() + + # the scheduler thread does not immediately exit + # but it should exit after the next time it samples + assert scheduler.stop_profiling() + + assert scheduler.start_profiling() + + # because the scheduler thread does not immediately exit + # after stop_profiling is called, we have to wait a little + # otherwise, we'll see an extra scheduler thread in the + # following assertion + time.sleep(0.1) + + # there should be 1 scheduler thread now because the first + # one should be stopped and a new one started + assert len([ + thread + for thread in threading.enumerate() + if thread.name == scheduler.name + ]) == 1 + + assert scheduler.stop_profiling() + + # because the scheduler thread does not immediately exit + # after stop_profiling is called, we have to wait a little + # otherwise, we'll see an extra scheduler thread in the + # following assertion + time.sleep(0.1) + + # there should be 0 scheduler threads now because they stopped + assert len([ + thread + for thread in threading.enumerate() + if thread.name == scheduler.name + ]) == 0 From 8840f72c09a35b492272f77e6334429de4444cdf Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 12 Oct 2022 15:36:49 -0400 Subject: [PATCH 02/10] run black --- tests/test_profiler.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 6d6686064d..0bce207af4 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -4,6 +4,13 @@ from sentry_sdk.profiler import SleepScheduler +def get_scheduler_threads(scheduler): + return [ + thread + for thread in threading.enumerate() + if thread.name == scheduler.name + ] + def test_sleep_scheduler_single_background_thread(): def sampler(): pass @@ -26,11 +33,7 @@ def sampler(): # there should be 1 scheduler thread now because the first # one should be stopped and a new one started - assert len([ - thread - for thread in threading.enumerate() - if thread.name == scheduler.name - ]) == 1 + assert len(get_scheduler_threads(scheduler)) == 1 assert scheduler.stop_profiling() @@ -41,8 +44,4 @@ def sampler(): time.sleep(0.1) # there should be 0 scheduler threads now because they stopped - assert len([ - thread - for thread in threading.enumerate() - if thread.name == scheduler.name - ]) == 0 + assert len(get_scheduler_threads(scheduler)) == 0 From 32770b663b8441325721e074e230c91cc854733d Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 12 Oct 2022 15:40:30 -0400 Subject: [PATCH 03/10] run flake8 --- tests/test_profiler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 0bce207af4..b74ca32d33 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -11,6 +11,7 @@ def get_scheduler_threads(scheduler): if thread.name == scheduler.name ] + def test_sleep_scheduler_single_background_thread(): def sampler(): pass From 6364826dbbc11a65871e5e1de88f94854bad8388 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 12 Oct 2022 15:44:44 -0400 Subject: [PATCH 04/10] run black --- tests/test_profiler.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/test_profiler.py b/tests/test_profiler.py index b74ca32d33..3d016ccdd6 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -5,11 +5,7 @@ def get_scheduler_threads(scheduler): - return [ - thread - for thread in threading.enumerate() - if thread.name == scheduler.name - ] + return [thread for thread in threading.enumerate() if thread.name == scheduler.name] def test_sleep_scheduler_single_background_thread(): From 72abea4e9f5dbee0d99bf56fd9cea5a6f9a76cdd Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 12 Oct 2022 15:49:41 -0400 Subject: [PATCH 05/10] fix thread init --- sentry_sdk/profiler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/profiler.py b/sentry_sdk/profiler.py index c108e1f5d0..81e76ff259 100644 --- a/sentry_sdk/profiler.py +++ b/sentry_sdk/profiler.py @@ -390,7 +390,8 @@ def start_profiling(self): # make sure the thread is a daemon here otherwise this # can keep the application running after other threads # have exited - thread = threading.Thread(name=self.name, target=run, daemon=True) + thread = threading.Thread(target=run, name=self.name) + thread.daemon = True thread.start() return True return False From 4da13479b736b411759106cffbbae7878aac2cd9 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 12 Oct 2022 15:57:03 -0400 Subject: [PATCH 06/10] only run on python3.3+ --- sentry_sdk/profiler.py | 3 +-- tests/test_profiler.py | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/profiler.py b/sentry_sdk/profiler.py index 81e76ff259..c108e1f5d0 100644 --- a/sentry_sdk/profiler.py +++ b/sentry_sdk/profiler.py @@ -390,8 +390,7 @@ def start_profiling(self): # make sure the thread is a daemon here otherwise this # can keep the application running after other threads # have exited - thread = threading.Thread(target=run, name=self.name) - thread.daemon = True + thread = threading.Thread(name=self.name, target=run, daemon=True) thread.start() return True return False diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 3d016ccdd6..9a976f49da 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -1,13 +1,17 @@ import threading import time +import pytest + from sentry_sdk.profiler import SleepScheduler +from sentry_sdk.utils import PY33 def get_scheduler_threads(scheduler): return [thread for thread in threading.enumerate() if thread.name == scheduler.name] +@pytest.mark.skipif(not PY33, reason="requires >=python3.3") def test_sleep_scheduler_single_background_thread(): def sampler(): pass From 2e4d899d532836f8ed4b4052ca72a02649160dce Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 12 Oct 2022 16:05:09 -0400 Subject: [PATCH 07/10] cleanup decorator --- tests/test_profiler.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 9a976f49da..38a49ac451 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -1,17 +1,22 @@ +import sys import threading import time import pytest from sentry_sdk.profiler import SleepScheduler -from sentry_sdk.utils import PY33 + + +minimum_python_33 = pytest.mark.skipif( + sys.version_info < (3, 3), reason="ASGI is only supported in Python >= 3.6" +) def get_scheduler_threads(scheduler): return [thread for thread in threading.enumerate() if thread.name == scheduler.name] -@pytest.mark.skipif(not PY33, reason="requires >=python3.3") +@minimum_python_33 def test_sleep_scheduler_single_background_thread(): def sampler(): pass From 348a642ad8698c91e9a58d04c1e4c208c0da89f5 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 12 Oct 2022 16:09:01 -0400 Subject: [PATCH 08/10] update minimum python version message --- tests/test_profiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 38a49ac451..bb186bc132 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -8,7 +8,7 @@ minimum_python_33 = pytest.mark.skipif( - sys.version_info < (3, 3), reason="ASGI is only supported in Python >= 3.6" + sys.version_info < (3, 3), reason="Profiling is only supported in Python >= 3.3" ) From 14366314cec147d1e5b2d083034f87edfe38a856 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 12 Oct 2022 16:48:50 -0400 Subject: [PATCH 09/10] sleep less --- tests/test_profiler.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/test_profiler.py b/tests/test_profiler.py index bb186bc132..0c85e9624e 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -21,7 +21,7 @@ def test_sleep_scheduler_single_background_thread(): def sampler(): pass - scheduler = SleepScheduler(sampler=sampler, frequency=100) + scheduler = SleepScheduler(sampler=sampler, frequency=1000) assert scheduler.start_profiling() @@ -35,7 +35,10 @@ def sampler(): # after stop_profiling is called, we have to wait a little # otherwise, we'll see an extra scheduler thread in the # following assertion - time.sleep(0.1) + # + # one iteration of the scheduler takes 1.0 / frequency seconds + # so make sure this sleeps for longer than that to avoid flakes + time.sleep(0.002) # there should be 1 scheduler thread now because the first # one should be stopped and a new one started @@ -47,7 +50,10 @@ def sampler(): # after stop_profiling is called, we have to wait a little # otherwise, we'll see an extra scheduler thread in the # following assertion - time.sleep(0.1) + # + # one iteration of the scheduler takes 1.0 / frequency seconds + # so make sure this sleeps for longer than that to avoid flakes + time.sleep(0.002) # there should be 0 scheduler threads now because they stopped assert len(get_scheduler_threads(scheduler)) == 0 From a833a78e42adbf9eb7480b322614884e43448aa1 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Thu, 13 Oct 2022 11:08:03 -0400 Subject: [PATCH 10/10] rename threads to stop_events --- sentry_sdk/profiler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/profiler.py b/sentry_sdk/profiler.py index a3fa4a1ce8..38e54b8c5b 100644 --- a/sentry_sdk/profiler.py +++ b/sentry_sdk/profiler.py @@ -398,7 +398,7 @@ class ThreadScheduler(Scheduler): def __init__(self, sampler, frequency): # type: (Callable[..., None], int) -> None super(ThreadScheduler, self).__init__(sampler=sampler, frequency=frequency) - self.threads = Queue() + self.stop_events = Queue() def setup(self): # type: () -> None @@ -414,7 +414,7 @@ def start_profiling(self): # make sure to clear the event as we reuse the same event # over the lifetime of the scheduler event = threading.Event() - self.threads.put_nowait(event) + self.stop_events.put_nowait(event) run = self.make_run(event) # make sure the thread is a daemon here otherwise this @@ -430,7 +430,7 @@ def stop_profiling(self): if super(ThreadScheduler, self).stop_profiling(): # make sure the set the event here so that the thread # can check to see if it should keep running - event = self.threads.get_nowait() + event = self.stop_events.get_nowait() event.set() return True return False