Skip to content

Commit

Permalink
Better gunicorn timeout handling.
Browse files Browse the repository at this point in the history
By raising our own exception in Gunicorn's worker_abort handler, we can
catch and handle it just like any other error. This means we get an
error response, logging, and sentry report for the timeout, which is
a massive improvement.
  • Loading branch information
Simon Davy committed Dec 13, 2019
1 parent 7860dae commit bdac91a
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 58 deletions.
3 changes: 2 additions & 1 deletion Makefile
Expand Up @@ -59,8 +59,9 @@ export DEVEL=1
WORKER ?= sync
PORT ?= 8000
TALISKER = $(BIN)/talisker.gunicorn --bind 0.0.0.0:$(PORT) --reload --worker-class $(WORKER) $(ARGS)
APP ?= application
run wsgi:
$(TALISKER) tests.wsgi_app:application
$(TALISKER) tests.wsgi_app:$(APP)

run_multiprocess: ARGS=-w4
run_multiprocess: run
Expand Down
30 changes: 27 additions & 3 deletions talisker/gunicorn.py
Expand Up @@ -31,6 +31,7 @@

from collections import deque
import logging
import sys

from gunicorn.glogging import Logger
from gunicorn.app.wsgiapp import WSGIApplication
Expand Down Expand Up @@ -92,7 +93,7 @@ def handle_custom():


def gunicorn_on_starting(arbiter):
"""Gunicorn on_starging server hook.
"""Gunicorn on_starting server hook.
Sets up the fake signal and handler on the arbiter instance.
"""
Expand All @@ -112,11 +113,33 @@ def gunicorn_child_exit(server, worker):
server.SIG_QUEUE.append('SIGCUSTOM')


def gunicorn_worker_abort(worker):
"""Worker SIGABRT handler function.
SIGABRT is only used by gunicorn on worker timeout. We raise a custom
exception, rather than falling through to the default of SystemExit, which
is easier to catch and handle as a normal error by our WSGI middleware.
"""
raise talisker.wsgi.RequestTimeout(
'gunicorn worker timeout (pid: {})'.format(worker.pid)
)


def gunicorn_worker_exit(server, worker):
"""Logs any requests that are still in flight."""
"""Worker exit function.
Last chance to try log any outstanding requests before we die.
"""
for rid in list(talisker.wsgi.REQUESTS):
request = talisker.wsgi.REQUESTS[rid]
request.finish_request(timeout=True)
try:
raise talisker.wsgi.RequestTimeout(
'finish processing ongoing requests on worker exit '
'(pid: {})'.format(worker.pid)
)
except talisker.wsgi.RequestTimeout:
request.exc_info = sys.exc_info()
request.finish_request(timeout=True)


class GunicornLogger(Logger):
Expand Down Expand Up @@ -153,6 +176,7 @@ def init(self, parser, opts, args):

cfg['logger_class'] = GunicornLogger
cfg['worker_exit'] = gunicorn_worker_exit
cfg['worker_abort'] = gunicorn_worker_abort

# only enable these if we are doing multiproc cleanup
if talisker.prometheus_multiproc_cleanup:
Expand Down
19 changes: 11 additions & 8 deletions talisker/testing.py
Expand Up @@ -172,17 +172,20 @@ def assert_log(self, **kwargs):
if not self.exists(**kwargs):
# evaluate each term independently to narrow down culprit
terms = []
extra = kwargs.pop('extra', {})
for kw, value in kwargs.items():
num = len(self.filter(**{kw: value}))
terms.append((num, kw, value))
terms.sort() # 0 matches go first, as likely to be the issue
if len(self.filter(**{kw: value})) == 0:
terms.append((kw, value))
for kw, value in extra.items():
if len(self.filter(**{'extra': {kw: value}})) == 0:
terms.append(('extra["' + kw + '"]', value))

desc = '\n '.join(
'{1}={2!r} ({0} matches)'.format(*t) for t in terms
)
desc = '\n'.join(' {}={}'.format(k, v) for k, v in terms)
raise AssertionError(
'Could not find log out of {} logs:\n {}'.format(
len(self), desc))
'Could not find log out of {} logs.\n'
'Search terms that could not be found:\n'
'{}'.format(len(self), desc)
)

def assert_not_log(self, **kwargs):
if self.exists(**kwargs):
Expand Down
16 changes: 12 additions & 4 deletions talisker/wsgi.py
Expand Up @@ -65,6 +65,10 @@
REQUESTS = {}


class RequestTimeout(Exception):
pass


def talisker_error_response(environ, headers, exc_info):
"""Returns WSGI iterable to be returned as an error response.
Expand All @@ -89,11 +93,11 @@ def talisker_error_response(environ, headers, exc_info):
wsgi_environ.append((k, v))

if config.devel:
title = 'Request {}: {}'.format(rid, exc)
title = '{}: {}'.format(exc_type.__name__, exc)
lines = traceback.format_exception(*exc_info)
tb = PreformattedText(''.join(lines), id='traceback')
else:
title = 'Request {}: {}'.format(rid, exc_type.__name__)
title = 'Server Error: {}'.format(exc_type.__name__)

content = [
Content(title, tag='h1', id='title'),
Expand Down Expand Up @@ -302,8 +306,10 @@ def __next__(self):
if not self.start_response_called:
self.call_start_response()
raise
except Exception:
except Exception as e:
self.exc_info = sys.exc_info()
if isinstance(e, RequestTimeout):
self.timedout = True
# switch to generating an error response
self.iter = iter(self.error(self.exc_info))
chunk = next(self.iter)
Expand Down Expand Up @@ -583,9 +589,11 @@ def __call__(self, environ, start_response):

try:
response_iter = self.app(environ, request.start_response)
except Exception:
except Exception as e:
# store details for later
request.exc_info = sys.exc_info()
if isinstance(e, RequestTimeout):
request.timedout = True
# switch to generating an error response
response_iter = request.error(request.exc_info)
except SystemExit as e:
Expand Down
27 changes: 27 additions & 0 deletions tests/test_functional.py
Expand Up @@ -116,6 +116,33 @@ def test_django_app(django):
assert response.headers['X-View-Name'] == 'django_app.views.index'


@require_module('gunicorn')
def test_gunicorn_timeout(tmpdir, config):
config['DEVEL'] = 0

def test_app():
import time

def app(environ, start_response):
start_response('200 OK', [('content-type', 'text/plain')])
time.sleep(100)
return []

app_module = str(tmpdir / 'app.py')
with open(app_module, 'w') as f:
f.write(get_function_body(test_app))

with GunicornProcess('app:app', args=['-t1'], cwd=str(tmpdir)) as p:
response = requests.get(
p.url('/'),
headers={'Accept': 'application/json'},
).json()

assert response['title'].startswith(
'RequestTimeout: gunicorn worker timeout (pid:'
)


@require_module('celery')
def test_celery_basic(celery_signals):
from tests.celery_app import basic_task, error_task, propagate_task
Expand Down
7 changes: 6 additions & 1 deletion tests/test_gunicorn.py
Expand Up @@ -264,7 +264,10 @@ def test_gunicorn_worker_exit(wsgi_env, context):
request = talisker.wsgi.TaliskerWSGIRequest(wsgi_env, None, [])
talisker.wsgi.REQUESTS['ID'] = request

gunicorn.gunicorn_worker_exit(None, None)
class worker:
pid = 100

gunicorn.gunicorn_worker_exit(None, worker())

context.assert_log(
name='talisker.wsgi',
Expand All @@ -274,3 +277,5 @@ def test_gunicorn_worker_exit(wsgi_env, context):
'timeout': True,
},
)

assert len(context.sentry) == 1
11 changes: 6 additions & 5 deletions tests/test_testing.py
Expand Up @@ -115,13 +115,14 @@ def test_test_context():
name=logger.name, msg='bar', level='warning', extra={'b': 2})

with pytest.raises(AssertionError) as exc:
ctx.assert_log(name=logger.name, msg='XXX', level='info')
ctx.assert_log(
name=logger.name, msg='XXX', level='info', extra={"baz": 1})

assert str(exc.value) == textwrap.dedent("""
Could not find log out of {0} logs:
msg={1}'XXX' (0 matches)
level={1}'info' (2 matches)
name='test_test_context' (2 matches)
Could not find log out of {0} logs.
Search terms that could not be found:
msg=XXX
extra["baz"]=1
""").strip().format(len(ctx.logs), 'u' if sys.version_info[0] == 2 else '')

with pytest.raises(AssertionError) as exc:
Expand Down

0 comments on commit bdac91a

Please sign in to comment.