Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(profiling): Race condition spawning multiple profiling threads #1676

Merged
171 changes: 97 additions & 74 deletions sentry_sdk/profiler.py
Expand Up @@ -25,12 +25,14 @@

import sentry_sdk
from sentry_sdk._compat import PY33
from sentry_sdk._queue import Queue
from sentry_sdk._types import MYPY
from sentry_sdk.utils import nanosecond_time

if MYPY:
from types import FrameType
from typing import Any
from typing import Callable
from typing import Deque
from typing import Dict
from typing import Generator
Expand All @@ -43,8 +45,8 @@
FrameData = namedtuple("FrameData", ["name", "file", "line"])


_sample_buffer = None # type: Optional[_SampleBuffer]
_scheduler = None # type: Optional[_Scheduler]
_sample_buffer = None # type: Optional[SampleBuffer]
_scheduler = None # type: Optional[Scheduler]


def setup_profiler(options):
Expand All @@ -70,17 +72,18 @@ def setup_profiler(options):

# To buffer samples for `buffer_secs` at `frequency` Hz, we need
# a capcity of `buffer_secs * frequency`.
_sample_buffer = _SampleBuffer(capacity=buffer_secs * frequency)

profiler_mode = options["_experiments"].get("profiler_mode", _SigprofScheduler.mode)
if profiler_mode == _SigprofScheduler.mode:
_scheduler = _SigprofScheduler(frequency=frequency)
elif profiler_mode == _SigalrmScheduler.mode:
_scheduler = _SigalrmScheduler(frequency=frequency)
elif profiler_mode == _SleepScheduler.mode:
_scheduler = _SleepScheduler(frequency=frequency)
elif profiler_mode == _EventScheduler.mode:
_scheduler = _EventScheduler(frequency=frequency)
_sample_buffer = SampleBuffer(capacity=buffer_secs * frequency)
_sampler = _init_sample_stack_fn(_sample_buffer)

profiler_mode = options["_experiments"].get("profiler_mode", SigprofScheduler.mode)
if profiler_mode == SigprofScheduler.mode:
_scheduler = SigprofScheduler(sampler=_sampler, frequency=frequency)
elif profiler_mode == SigalrmScheduler.mode:
_scheduler = SigalrmScheduler(sampler=_sampler, frequency=frequency)
elif profiler_mode == SleepScheduler.mode:
_scheduler = SleepScheduler(sampler=_sampler, frequency=frequency)
elif profiler_mode == EventScheduler.mode:
_scheduler = EventScheduler(sampler=_sampler, frequency=frequency)
else:
raise ValueError("Unknown profiler mode: {}".format(profiler_mode))
_scheduler.setup()
Expand All @@ -101,23 +104,27 @@ def teardown_profiler():
_scheduler = None


def _sample_stack(*args, **kwargs):
# type: (*Any, **Any) -> None
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""
def _init_sample_stack_fn(buffer):
# type: (SampleBuffer) -> Callable[..., None]

assert _sample_buffer is not None
_sample_buffer.write(
(
nanosecond_time(),
[
(tid, extract_stack(frame))
for tid, frame in sys._current_frames().items()
],
def _sample_stack(*args, **kwargs):
# type: (*Any, **Any) -> None
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""

buffer.write(
(
nanosecond_time(),
[
(tid, extract_stack(frame))
for tid, frame in sys._current_frames().items()
],
)
)
)

return _sample_stack


# We want to impose a stack depth limit so that samples aren't too large.
Expand Down Expand Up @@ -248,7 +255,7 @@ def to_json(self, event_opt):
}


class _SampleBuffer(object):
class SampleBuffer(object):
"""
A simple implementation of a ring buffer to buffer the samples taken.

Expand Down Expand Up @@ -348,11 +355,12 @@ def slice_profile(self, start_ns, stop_ns):
}


class _Scheduler(object):
class Scheduler(object):
mode = "unknown"

def __init__(self, frequency):
# type: (int) -> None
def __init__(self, sampler, frequency):
# type: (Callable[..., None], int) -> None
self.sampler = sampler
self._lock = threading.Lock()
self._count = 0
self._interval = 1.0 / frequency
Expand All @@ -378,7 +386,7 @@ def stop_profiling(self):
return self._count == 0


class _ThreadScheduler(_Scheduler):
class ThreadScheduler(Scheduler):
"""
This abstract scheduler is based on running a daemon thread that will call
the sampler at a regular interval.
Expand All @@ -387,10 +395,10 @@ class _ThreadScheduler(_Scheduler):
mode = "thread"
name = None # type: Optional[str]

def __init__(self, frequency):
# type: (int) -> None
super(_ThreadScheduler, self).__init__(frequency)
self.event = threading.Event()
def __init__(self, sampler, frequency):
# type: (Callable[..., None], int) -> None
super(ThreadScheduler, self).__init__(sampler=sampler, frequency=frequency)
self.stop_events = Queue()

def setup(self):
# type: () -> None
Expand All @@ -402,34 +410,37 @@ def teardown(self):

def start_profiling(self):
# type: () -> bool
if super(_ThreadScheduler, self).start_profiling():
if super(ThreadScheduler, self).start_profiling():
# make sure to clear the event as we reuse the same event
# over the lifetime of the scheduler
self.event.clear()
event = threading.Event()
self.stop_events.put_nowait(event)
run = self.make_run(event)

# make sure the thread is a daemon here otherwise this
# can keep the application running after other threads
# have exited
thread = threading.Thread(name=self.name, target=self.run, daemon=True)
thread = threading.Thread(name=self.name, target=run, daemon=True)
thread.start()
return True
return False

def stop_profiling(self):
# type: () -> bool
if super(_ThreadScheduler, self).stop_profiling():
if super(ThreadScheduler, self).stop_profiling():
# make sure the set the event here so that the thread
# can check to see if it should keep running
self.event.set()
event = self.stop_events.get_nowait()
event.set()
return True
return False

def run(self):
# type: () -> None
def make_run(self, event):
# type: (threading.Event) -> Callable[..., None]
raise NotImplementedError


class _SleepScheduler(_ThreadScheduler):
class SleepScheduler(ThreadScheduler):
"""
This scheduler uses time.sleep to wait the required interval before calling
the sampling function.
Expand All @@ -438,29 +449,34 @@ class _SleepScheduler(_ThreadScheduler):
mode = "sleep"
name = "sentry.profiler.SleepScheduler"

def run(self):
# type: () -> None
last = time.perf_counter()
def make_run(self, event):
# type: (threading.Event) -> Callable[..., None]

while True:
# some time may have elapsed since the last time
# we sampled, so we need to account for that and
# not sleep for too long
now = time.perf_counter()
elapsed = max(now - last, 0)
def run():
# type: () -> None
last = time.perf_counter()

if elapsed < self._interval:
time.sleep(self._interval - elapsed)
while True:
# some time may have elapsed since the last time
# we sampled, so we need to account for that and
# not sleep for too long
now = time.perf_counter()
elapsed = max(now - last, 0)

last = time.perf_counter()
if elapsed < self._interval:
time.sleep(self._interval - elapsed)

last = time.perf_counter()

if self.event.is_set():
break
if event.is_set():
break

_sample_stack()
self.sampler()

return run

class _EventScheduler(_ThreadScheduler):

class EventScheduler(ThreadScheduler):
"""
This scheduler uses threading.Event to wait the required interval before
calling the sampling function.
Expand All @@ -469,18 +485,25 @@ class _EventScheduler(_ThreadScheduler):
mode = "event"
name = "sentry.profiler.EventScheduler"

def run(self):
# type: () -> None
while True:
self.event.wait(timeout=self._interval)
def make_run(self, event):
# type: (threading.Event) -> Callable[..., None]

def run():
# type: () -> None
while True:
event.wait(timeout=self._interval)

if event.is_set():
break

self.sampler()

if self.event.is_set():
break
self.sampler()

_sample_stack()
return run


class _SignalScheduler(_Scheduler):
class SignalScheduler(Scheduler):
"""
This abstract scheduler is based on UNIX signals. It sets up a
signal handler for the specified signal, and the matching itimer in order
Expand Down Expand Up @@ -513,7 +536,7 @@ def setup(self):
# This setups a process wide signal handler that will be called
# at an interval to record samples.
try:
signal.signal(self.signal_num, _sample_stack)
signal.signal(self.signal_num, self.sampler)
except ValueError:
raise ValueError(
"Signal based profiling can only be enabled from the main thread."
Expand All @@ -535,20 +558,20 @@ def teardown(self):

def start_profiling(self):
# type: () -> bool
if super(_SignalScheduler, self).start_profiling():
if super(SignalScheduler, self).start_profiling():
signal.setitimer(self.signal_timer, self._interval, self._interval)
return True
return False

def stop_profiling(self):
# type: () -> bool
if super(_SignalScheduler, self).stop_profiling():
if super(SignalScheduler, self).stop_profiling():
signal.setitimer(self.signal_timer, 0)
return True
return False


class _SigprofScheduler(_SignalScheduler):
class SigprofScheduler(SignalScheduler):
"""
This scheduler uses SIGPROF to regularly call a signal handler where the
samples will be taken.
Expand Down Expand Up @@ -581,7 +604,7 @@ def signal_timer(self):
return signal.ITIMER_PROF


class _SigalrmScheduler(_SignalScheduler):
class SigalrmScheduler(SignalScheduler):
"""
This scheduler uses SIGALRM to regularly call a signal handler where the
samples will be taken.
Expand Down
55 changes: 54 additions & 1 deletion tests/test_profiler.py
Expand Up @@ -2,10 +2,16 @@
import platform
import sys
import threading
import time

import pytest

from sentry_sdk.profiler import extract_stack, get_frame_name, setup_profiler
from sentry_sdk.profiler import (
SleepScheduler,
extract_stack,
get_frame_name,
setup_profiler,
)


minimum_python_33 = pytest.mark.skipif(
Expand Down Expand Up @@ -148,3 +154,50 @@ def test_extract_stack_with_max_depth(depth, max_stack_depth, actual_depth):
# index 0 contains the inner most frame on the stack, so the lamdba
# should be at index `actual_depth`
assert stack[actual_depth].name == "<lambda>", actual_depth


def get_scheduler_threads(scheduler):
return [thread for thread in threading.enumerate() if thread.name == scheduler.name]


@minimum_python_33
def test_sleep_scheduler_single_background_thread():
def sampler():
pass

scheduler = SleepScheduler(sampler=sampler, frequency=1000)

assert scheduler.start_profiling()

# the scheduler thread does not immediately exit
# but it should exit after the next time it samples
assert scheduler.stop_profiling()

assert scheduler.start_profiling()

# because the scheduler thread does not immediately exit
# after stop_profiling is called, we have to wait a little
# otherwise, we'll see an extra scheduler thread in the
# following assertion
#
# one iteration of the scheduler takes 1.0 / frequency seconds
# so make sure this sleeps for longer than that to avoid flakes
time.sleep(0.002)

# there should be 1 scheduler thread now because the first
# one should be stopped and a new one started
assert len(get_scheduler_threads(scheduler)) == 1

assert scheduler.stop_profiling()

# because the scheduler thread does not immediately exit
# after stop_profiling is called, we have to wait a little
# otherwise, we'll see an extra scheduler thread in the
# following assertion
#
# one iteration of the scheduler takes 1.0 / frequency seconds
# so make sure this sleeps for longer than that to avoid flakes
time.sleep(0.002)

# there should be 0 scheduler threads now because they stopped
assert len(get_scheduler_threads(scheduler)) == 0