Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better gunicorn timeout handling. #497

Merged
merged 3 commits into from Dec 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Expand Up @@ -59,8 +59,9 @@ export DEVEL=1
WORKER ?= sync
PORT ?= 8000
TALISKER = $(BIN)/talisker.gunicorn --bind 0.0.0.0:$(PORT) --reload --worker-class $(WORKER) $(ARGS)
APP ?= application
run wsgi:
$(TALISKER) tests.wsgi_app:application
$(TALISKER) tests.wsgi_app:$(APP)

run_multiprocess: ARGS=-w4
run_multiprocess: run
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Expand Up @@ -53,7 +53,7 @@ install_requires =
gunicorn = gunicorn>=19.7.0,<20.0
raven = raven>=6.4.0,<7.0
celery =
celery>=3.1.25.0,<5.0
celery>=3.1.25.0,<4.4
kombu>=3.0.37,<4.5
django = django>=1.10,<2.0
prometheus = prometheus-client>=0.2.0,<0.5.0,!=0.4.0,!=0.4.1
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -139,7 +139,7 @@
'aiocontextvars==0.2.2;python_version>="3.5" and python_version<"3.7"',
],
celery=[
'celery>=3.1.25.0,<5.0',
'celery>=3.1.25.0,<4.4',
'kombu>=3.0.37,<4.5',
],
dev=[
Expand Down
30 changes: 27 additions & 3 deletions talisker/gunicorn.py
Expand Up @@ -31,6 +31,7 @@

from collections import deque
import logging
import sys

from gunicorn.glogging import Logger
from gunicorn.app.wsgiapp import WSGIApplication
Expand Down Expand Up @@ -92,7 +93,7 @@ def handle_custom():


def gunicorn_on_starting(arbiter):
"""Gunicorn on_starging server hook.
"""Gunicorn on_starting server hook.

Sets up the fake signal and handler on the arbiter instance.
"""
Expand All @@ -112,11 +113,33 @@ def gunicorn_child_exit(server, worker):
server.SIG_QUEUE.append('SIGCUSTOM')


def gunicorn_worker_abort(worker):
"""Worker SIGABRT handler function.

SIGABRT is only used by gunicorn on worker timeout. We raise a custom
exception, rather than falling through to the default of SystemExit, which
is easier to catch and handle as a normal error by our WSGI middleware.
"""
raise talisker.wsgi.RequestTimeout(
'gunicorn worker timeout (pid: {})'.format(worker.pid)
)


def gunicorn_worker_exit(server, worker):
"""Logs any requests that are still in flight."""
"""Worker exit function.

Last chance to try log any outstanding requests before we die.
"""
for rid in list(talisker.wsgi.REQUESTS):
request = talisker.wsgi.REQUESTS[rid]
request.finish_request(timeout=True)
try:
raise talisker.wsgi.RequestTimeout(
'finish processing ongoing requests on worker exit '
'(pid: {})'.format(worker.pid)
)
except talisker.wsgi.RequestTimeout:
request.exc_info = sys.exc_info()
request.finish_request(timeout=True)


class GunicornLogger(Logger):
Expand Down Expand Up @@ -153,6 +176,7 @@ def init(self, parser, opts, args):

cfg['logger_class'] = GunicornLogger
cfg['worker_exit'] = gunicorn_worker_exit
cfg['worker_abort'] = gunicorn_worker_abort

# only enable these if we are doing multiproc cleanup
if talisker.prometheus_multiproc_cleanup:
Expand Down
19 changes: 11 additions & 8 deletions talisker/testing.py
Expand Up @@ -172,17 +172,20 @@ def assert_log(self, **kwargs):
if not self.exists(**kwargs):
# evaluate each term independently to narrow down culprit
terms = []
extra = kwargs.pop('extra', {})
for kw, value in kwargs.items():
num = len(self.filter(**{kw: value}))
terms.append((num, kw, value))
terms.sort() # 0 matches go first, as likely to be the issue
if len(self.filter(**{kw: value})) == 0:
terms.append((kw, value))
for kw, value in extra.items():
if len(self.filter(**{'extra': {kw: value}})) == 0:
terms.append(('extra["' + kw + '"]', value))

desc = '\n '.join(
'{1}={2!r} ({0} matches)'.format(*t) for t in terms
)
desc = '\n'.join(' {}={}'.format(k, v) for k, v in terms)
raise AssertionError(
'Could not find log out of {} logs:\n {}'.format(
len(self), desc))
'Could not find log out of {} logs.\n'
'Search terms that could not be found:\n'
'{}'.format(len(self), desc)
)

def assert_not_log(self, **kwargs):
if self.exists(**kwargs):
Expand Down
49 changes: 29 additions & 20 deletions talisker/wsgi.py
Expand Up @@ -65,6 +65,10 @@
REQUESTS = {}


class RequestTimeout(Exception):
pass


def talisker_error_response(environ, headers, exc_info):
"""Returns WSGI iterable to be returned as an error response.

Expand All @@ -89,11 +93,11 @@ def talisker_error_response(environ, headers, exc_info):
wsgi_environ.append((k, v))

if config.devel:
title = 'Request {}: {}'.format(rid, exc)
title = '{}: {}'.format(exc_type.__name__, exc)
lines = traceback.format_exception(*exc_info)
tb = PreformattedText(''.join(lines), id='traceback')
else:
title = 'Request {}: {}'.format(rid, exc_type.__name__)
title = 'Server Error: {}'.format(exc_type.__name__)

content = [
Content(title, tag='h1', id='title'),
Expand Down Expand Up @@ -302,8 +306,10 @@ def __next__(self):
if not self.start_response_called:
self.call_start_response()
raise
except Exception:
except Exception as e:
self.exc_info = sys.exc_info()
if isinstance(e, RequestTimeout):
self.timedout = True
# switch to generating an error response
self.iter = iter(self.error(self.exc_info))
chunk = next(self.iter)
Expand Down Expand Up @@ -378,22 +384,23 @@ def finish_request(self, timeout=False):
# b) soft timeout
# c) manual debugging (TODO)

soft_timeout = Context.current.soft_timeout
try:
if self.exc_info:
self.send_sentry(metadata)
elif soft_timeout > 0 and response_latency > soft_timeout:
self.send_sentry(
metadata,
msg='start_response latency exceeded soft timeout',
level='warning',
extra={
'start_response_latency': response_latency,
'soft_timeout': soft_timeout,
},
)
except Exception:
logger.exception('failed to send soft timeout report')
if talisker.sentry.enabled:
soft_timeout = Context.current.soft_timeout
try:
if self.exc_info:
self.send_sentry(metadata)
elif soft_timeout > 0 and response_latency > soft_timeout:
self.send_sentry(
metadata,
msg='start_response latency exceeded soft timeout',
level='warning',
extra={
'start_response_latency': response_latency,
'soft_timeout': soft_timeout,
},
)
except Exception:
logger.exception('failed to send soft timeout report')

talisker.clear_context()
rid = self.environ.get('REQUEST_ID')
Expand Down Expand Up @@ -583,9 +590,11 @@ def __call__(self, environ, start_response):

try:
response_iter = self.app(environ, request.start_response)
except Exception:
except Exception as e:
# store details for later
request.exc_info = sys.exc_info()
if isinstance(e, RequestTimeout):
request.timedout = True
# switch to generating an error response
response_iter = request.error(request.exc_info)
except SystemExit as e:
Expand Down
30 changes: 30 additions & 0 deletions tests/test_functional.py
Expand Up @@ -116,6 +116,36 @@ def test_django_app(django):
assert response.headers['X-View-Name'] == 'django_app.views.index'


@require_module('gunicorn')
def test_gunicorn_timeout(tmpdir):

def test_app():
import time

def app(environ, start_response):
start_response('200 OK', [('content-type', 'text/plain')])
time.sleep(100)
return []

app_module = str(tmpdir / 'app.py')
with open(app_module, 'w') as f:
f.write(get_function_body(test_app))

# ensure devel mode
env = os.environ.copy()
env['DEVEL'] = '1'
p = GunicornProcess('app:app', args=['-t1'], cwd=str(tmpdir), env=env)
with p:
response = requests.get(
p.url('/'),
headers={'Accept': 'application/json'},
).json()

assert response['title'].startswith(
'RequestTimeout: gunicorn worker timeout (pid:'
)


@require_module('celery')
def test_celery_basic(celery_signals):
from tests.celery_app import basic_task, error_task, propagate_task
Expand Down
7 changes: 6 additions & 1 deletion tests/test_gunicorn.py
Expand Up @@ -264,7 +264,10 @@ def test_gunicorn_worker_exit(wsgi_env, context):
request = talisker.wsgi.TaliskerWSGIRequest(wsgi_env, None, [])
talisker.wsgi.REQUESTS['ID'] = request

gunicorn.gunicorn_worker_exit(None, None)
class worker:
pid = 100

gunicorn.gunicorn_worker_exit(None, worker())

context.assert_log(
name='talisker.wsgi',
Expand All @@ -274,3 +277,5 @@ def test_gunicorn_worker_exit(wsgi_env, context):
'timeout': True,
},
)

assert len(context.sentry) == 1
11 changes: 6 additions & 5 deletions tests/test_testing.py
Expand Up @@ -115,13 +115,14 @@ def test_test_context():
name=logger.name, msg='bar', level='warning', extra={'b': 2})

with pytest.raises(AssertionError) as exc:
ctx.assert_log(name=logger.name, msg='XXX', level='info')
ctx.assert_log(
name=logger.name, msg='XXX', level='info', extra={"baz": 1})

assert str(exc.value) == textwrap.dedent("""
Could not find log out of {0} logs:
msg={1}'XXX' (0 matches)
level={1}'info' (2 matches)
name='test_test_context' (2 matches)
Could not find log out of {0} logs.
Search terms that could not be found:
msg=XXX
extra["baz"]=1
""").strip().format(len(ctx.logs), 'u' if sys.version_info[0] == 2 else '')

with pytest.raises(AssertionError) as exc:
Expand Down