-
Notifications
You must be signed in to change notification settings - Fork 462
/
boto3.py
131 lines (104 loc) · 4.08 KB
/
boto3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from __future__ import absolute_import
from sentry_sdk import Hub
from sentry_sdk.consts import OP
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.tracing import Span
from sentry_sdk._functools import partial
from sentry_sdk._types import MYPY
if MYPY:
from typing import Any
from typing import Dict
from typing import Optional
from typing import Type
try:
from botocore import __version__ as BOTOCORE_VERSION # type: ignore
from botocore.client import BaseClient # type: ignore
from botocore.response import StreamingBody # type: ignore
from botocore.awsrequest import AWSRequest # type: ignore
except ImportError:
raise DidNotEnable("botocore is not installed")
class Boto3Integration(Integration):
identifier = "boto3"
@staticmethod
def setup_once():
# type: () -> None
try:
version = tuple(map(int, BOTOCORE_VERSION.split(".")[:3]))
except (ValueError, TypeError):
raise DidNotEnable(
"Unparsable botocore version: {}".format(BOTOCORE_VERSION)
)
if version < (1, 12):
raise DidNotEnable("Botocore 1.12 or newer is required.")
orig_init = BaseClient.__init__
def sentry_patched_init(self, *args, **kwargs):
# type: (Type[BaseClient], *Any, **Any) -> None
orig_init(self, *args, **kwargs)
meta = self.meta
service_id = meta.service_model.service_id.hyphenize()
meta.events.register(
"request-created",
partial(_sentry_request_created, service_id=service_id),
)
meta.events.register("after-call", _sentry_after_call)
meta.events.register("after-call-error", _sentry_after_call_error)
BaseClient.__init__ = sentry_patched_init
def _sentry_request_created(service_id, request, operation_name, **kwargs):
# type: (str, AWSRequest, str, **Any) -> None
hub = Hub.current
if hub.get_integration(Boto3Integration) is None:
return
description = "aws.%s.%s" % (service_id, operation_name)
span = hub.start_span(
hub=hub,
op=OP.HTTP_CLIENT,
description=description,
)
span.set_tag("aws.service_id", service_id)
span.set_tag("aws.operation_name", operation_name)
span.set_data("aws.request.url", request.url)
# We do it in order for subsequent http calls/retries be
# attached to this span.
span.__enter__()
# request.context is an open-ended data-structure
# where we can add anything useful in request life cycle.
request.context["_sentrysdk_span"] = span
def _sentry_after_call(context, parsed, **kwargs):
# type: (Dict[str, Any], Dict[str, Any], **Any) -> None
span = context.pop("_sentrysdk_span", None) # type: Optional[Span]
# Span could be absent if the integration is disabled.
if span is None:
return
span.__exit__(None, None, None)
body = parsed.get("Body")
if not isinstance(body, StreamingBody):
return
streaming_span = span.start_child(
op=OP.HTTP_CLIENT_STREAM,
description=span.description,
)
orig_read = body.read
orig_close = body.close
def sentry_streaming_body_read(*args, **kwargs):
# type: (*Any, **Any) -> bytes
try:
ret = orig_read(*args, **kwargs)
if not ret:
streaming_span.finish()
return ret
except Exception:
streaming_span.finish()
raise
body.read = sentry_streaming_body_read
def sentry_streaming_body_close(*args, **kwargs):
# type: (*Any, **Any) -> None
streaming_span.finish()
orig_close(*args, **kwargs)
body.close = sentry_streaming_body_close
def _sentry_after_call_error(context, exception, **kwargs):
# type: (Dict[str, Any], Type[BaseException], **Any) -> None
span = context.pop("_sentrysdk_span", None) # type: Optional[Span]
# Span could be absent if the integration is disabled.
if span is None:
return
span.__exit__(type(exception), exception, None)