Skip to content

Commit

Permalink
fix(profiling): Race condition spawning multiple profiling threads (#…
Browse files Browse the repository at this point in the history
…1676)

There is a race condition where multiple profiling threads may be spawned.
Specifically, if `start_profiling` is called immediately after `stop_profiling`.
This happens because `stop_profiling` does not immediately terminate the thread,
instead the thread will check that the event was set and exit at the end of the
current iteration. If `start_profiling` is called during the iteration, the
event gets set again and the old thread will continue running. To fix this, a
new event is created when a profiling thread starts so they can be terminated
independently.
  • Loading branch information
Zylphrex committed Oct 13, 2022
1 parent ed0d4db commit 40993fe
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 75 deletions.
171 changes: 97 additions & 74 deletions sentry_sdk/profiler.py
Expand Up @@ -25,12 +25,14 @@

import sentry_sdk
from sentry_sdk._compat import PY33
from sentry_sdk._queue import Queue
from sentry_sdk._types import MYPY
from sentry_sdk.utils import nanosecond_time

if MYPY:
from types import FrameType
from typing import Any
from typing import Callable
from typing import Deque
from typing import Dict
from typing import Generator
Expand All @@ -43,8 +45,8 @@
FrameData = namedtuple("FrameData", ["name", "file", "line"])


_sample_buffer = None # type: Optional[_SampleBuffer]
_scheduler = None # type: Optional[_Scheduler]
_sample_buffer = None # type: Optional[SampleBuffer]
_scheduler = None # type: Optional[Scheduler]


def setup_profiler(options):
Expand All @@ -70,17 +72,18 @@ def setup_profiler(options):

# To buffer samples for `buffer_secs` at `frequency` Hz, we need
# a capcity of `buffer_secs * frequency`.
_sample_buffer = _SampleBuffer(capacity=buffer_secs * frequency)

profiler_mode = options["_experiments"].get("profiler_mode", _SigprofScheduler.mode)
if profiler_mode == _SigprofScheduler.mode:
_scheduler = _SigprofScheduler(frequency=frequency)
elif profiler_mode == _SigalrmScheduler.mode:
_scheduler = _SigalrmScheduler(frequency=frequency)
elif profiler_mode == _SleepScheduler.mode:
_scheduler = _SleepScheduler(frequency=frequency)
elif profiler_mode == _EventScheduler.mode:
_scheduler = _EventScheduler(frequency=frequency)
_sample_buffer = SampleBuffer(capacity=buffer_secs * frequency)
_sampler = _init_sample_stack_fn(_sample_buffer)

profiler_mode = options["_experiments"].get("profiler_mode", SigprofScheduler.mode)
if profiler_mode == SigprofScheduler.mode:
_scheduler = SigprofScheduler(sampler=_sampler, frequency=frequency)
elif profiler_mode == SigalrmScheduler.mode:
_scheduler = SigalrmScheduler(sampler=_sampler, frequency=frequency)
elif profiler_mode == SleepScheduler.mode:
_scheduler = SleepScheduler(sampler=_sampler, frequency=frequency)
elif profiler_mode == EventScheduler.mode:
_scheduler = EventScheduler(sampler=_sampler, frequency=frequency)
else:
raise ValueError("Unknown profiler mode: {}".format(profiler_mode))
_scheduler.setup()
Expand All @@ -101,23 +104,27 @@ def teardown_profiler():
_scheduler = None


def _sample_stack(*args, **kwargs):
# type: (*Any, **Any) -> None
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""
def _init_sample_stack_fn(buffer):
# type: (SampleBuffer) -> Callable[..., None]

assert _sample_buffer is not None
_sample_buffer.write(
(
nanosecond_time(),
[
(tid, extract_stack(frame))
for tid, frame in sys._current_frames().items()
],
def _sample_stack(*args, **kwargs):
# type: (*Any, **Any) -> None
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""

buffer.write(
(
nanosecond_time(),
[
(tid, extract_stack(frame))
for tid, frame in sys._current_frames().items()
],
)
)
)

return _sample_stack


# We want to impose a stack depth limit so that samples aren't too large.
Expand Down Expand Up @@ -248,7 +255,7 @@ def to_json(self, event_opt):
}


class _SampleBuffer(object):
class SampleBuffer(object):
"""
A simple implementation of a ring buffer to buffer the samples taken.
Expand Down Expand Up @@ -348,11 +355,12 @@ def slice_profile(self, start_ns, stop_ns):
}


class _Scheduler(object):
class Scheduler(object):
mode = "unknown"

def __init__(self, frequency):
# type: (int) -> None
def __init__(self, sampler, frequency):
# type: (Callable[..., None], int) -> None
self.sampler = sampler
self._lock = threading.Lock()
self._count = 0
self._interval = 1.0 / frequency
Expand All @@ -378,7 +386,7 @@ def stop_profiling(self):
return self._count == 0


class _ThreadScheduler(_Scheduler):
class ThreadScheduler(Scheduler):
"""
This abstract scheduler is based on running a daemon thread that will call
the sampler at a regular interval.
Expand All @@ -387,10 +395,10 @@ class _ThreadScheduler(_Scheduler):
mode = "thread"
name = None # type: Optional[str]

def __init__(self, frequency):
# type: (int) -> None
super(_ThreadScheduler, self).__init__(frequency)
self.event = threading.Event()
def __init__(self, sampler, frequency):
# type: (Callable[..., None], int) -> None
super(ThreadScheduler, self).__init__(sampler=sampler, frequency=frequency)
self.stop_events = Queue()

def setup(self):
# type: () -> None
Expand All @@ -402,34 +410,37 @@ def teardown(self):

def start_profiling(self):
# type: () -> bool
if super(_ThreadScheduler, self).start_profiling():
if super(ThreadScheduler, self).start_profiling():
# make sure to clear the event as we reuse the same event
# over the lifetime of the scheduler
self.event.clear()
event = threading.Event()
self.stop_events.put_nowait(event)
run = self.make_run(event)

# make sure the thread is a daemon here otherwise this
# can keep the application running after other threads
# have exited
thread = threading.Thread(name=self.name, target=self.run, daemon=True)
thread = threading.Thread(name=self.name, target=run, daemon=True)
thread.start()
return True
return False

def stop_profiling(self):
# type: () -> bool
if super(_ThreadScheduler, self).stop_profiling():
if super(ThreadScheduler, self).stop_profiling():
# make sure the set the event here so that the thread
# can check to see if it should keep running
self.event.set()
event = self.stop_events.get_nowait()
event.set()
return True
return False

def run(self):
# type: () -> None
def make_run(self, event):
# type: (threading.Event) -> Callable[..., None]
raise NotImplementedError


class _SleepScheduler(_ThreadScheduler):
class SleepScheduler(ThreadScheduler):
"""
This scheduler uses time.sleep to wait the required interval before calling
the sampling function.
Expand All @@ -438,29 +449,34 @@ class _SleepScheduler(_ThreadScheduler):
mode = "sleep"
name = "sentry.profiler.SleepScheduler"

def run(self):
# type: () -> None
last = time.perf_counter()
def make_run(self, event):
# type: (threading.Event) -> Callable[..., None]

while True:
# some time may have elapsed since the last time
# we sampled, so we need to account for that and
# not sleep for too long
now = time.perf_counter()
elapsed = max(now - last, 0)
def run():
# type: () -> None
last = time.perf_counter()

if elapsed < self._interval:
time.sleep(self._interval - elapsed)
while True:
# some time may have elapsed since the last time
# we sampled, so we need to account for that and
# not sleep for too long
now = time.perf_counter()
elapsed = max(now - last, 0)

last = time.perf_counter()
if elapsed < self._interval:
time.sleep(self._interval - elapsed)

last = time.perf_counter()

if self.event.is_set():
break
if event.is_set():
break

_sample_stack()
self.sampler()

return run

class _EventScheduler(_ThreadScheduler):

class EventScheduler(ThreadScheduler):
"""
This scheduler uses threading.Event to wait the required interval before
calling the sampling function.
Expand All @@ -469,18 +485,25 @@ class _EventScheduler(_ThreadScheduler):
mode = "event"
name = "sentry.profiler.EventScheduler"

def run(self):
# type: () -> None
while True:
self.event.wait(timeout=self._interval)
def make_run(self, event):
# type: (threading.Event) -> Callable[..., None]

def run():
# type: () -> None
while True:
event.wait(timeout=self._interval)

if event.is_set():
break

self.sampler()

if self.event.is_set():
break
self.sampler()

_sample_stack()
return run


class _SignalScheduler(_Scheduler):
class SignalScheduler(Scheduler):
"""
This abstract scheduler is based on UNIX signals. It sets up a
signal handler for the specified signal, and the matching itimer in order
Expand Down Expand Up @@ -513,7 +536,7 @@ def setup(self):
# This setups a process wide signal handler that will be called
# at an interval to record samples.
try:
signal.signal(self.signal_num, _sample_stack)
signal.signal(self.signal_num, self.sampler)
except ValueError:
raise ValueError(
"Signal based profiling can only be enabled from the main thread."
Expand All @@ -535,20 +558,20 @@ def teardown(self):

def start_profiling(self):
# type: () -> bool
if super(_SignalScheduler, self).start_profiling():
if super(SignalScheduler, self).start_profiling():
signal.setitimer(self.signal_timer, self._interval, self._interval)
return True
return False

def stop_profiling(self):
# type: () -> bool
if super(_SignalScheduler, self).stop_profiling():
if super(SignalScheduler, self).stop_profiling():
signal.setitimer(self.signal_timer, 0)
return True
return False


class _SigprofScheduler(_SignalScheduler):
class SigprofScheduler(SignalScheduler):
"""
This scheduler uses SIGPROF to regularly call a signal handler where the
samples will be taken.
Expand Down Expand Up @@ -581,7 +604,7 @@ def signal_timer(self):
return signal.ITIMER_PROF


class _SigalrmScheduler(_SignalScheduler):
class SigalrmScheduler(SignalScheduler):
"""
This scheduler uses SIGALRM to regularly call a signal handler where the
samples will be taken.
Expand Down
55 changes: 54 additions & 1 deletion tests/test_profiler.py
Expand Up @@ -2,10 +2,16 @@
import platform
import sys
import threading
import time

import pytest

from sentry_sdk.profiler import extract_stack, get_frame_name, setup_profiler
from sentry_sdk.profiler import (
SleepScheduler,
extract_stack,
get_frame_name,
setup_profiler,
)


minimum_python_33 = pytest.mark.skipif(
Expand Down Expand Up @@ -148,3 +154,50 @@ def test_extract_stack_with_max_depth(depth, max_stack_depth, actual_depth):
# index 0 contains the inner most frame on the stack, so the lamdba
# should be at index `actual_depth`
assert stack[actual_depth].name == "<lambda>", actual_depth


def get_scheduler_threads(scheduler):
return [thread for thread in threading.enumerate() if thread.name == scheduler.name]


@minimum_python_33
def test_sleep_scheduler_single_background_thread():
def sampler():
pass

scheduler = SleepScheduler(sampler=sampler, frequency=1000)

assert scheduler.start_profiling()

# the scheduler thread does not immediately exit
# but it should exit after the next time it samples
assert scheduler.stop_profiling()

assert scheduler.start_profiling()

# because the scheduler thread does not immediately exit
# after stop_profiling is called, we have to wait a little
# otherwise, we'll see an extra scheduler thread in the
# following assertion
#
# one iteration of the scheduler takes 1.0 / frequency seconds
# so make sure this sleeps for longer than that to avoid flakes
time.sleep(0.002)

# there should be 1 scheduler thread now because the first
# one should be stopped and a new one started
assert len(get_scheduler_threads(scheduler)) == 1

assert scheduler.stop_profiling()

# because the scheduler thread does not immediately exit
# after stop_profiling is called, we have to wait a little
# otherwise, we'll see an extra scheduler thread in the
# following assertion
#
# one iteration of the scheduler takes 1.0 / frequency seconds
# so make sure this sleeps for longer than that to avoid flakes
time.sleep(0.002)

# there should be 0 scheduler threads now because they stopped
assert len(get_scheduler_threads(scheduler)) == 0

0 comments on commit 40993fe

Please sign in to comment.