Skip to content

Commit

Permalink
Redis: Add support for redis.asyncio (#1933)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Anton Pirker <anton.pirker@sentry.io>
  • Loading branch information
Zhenay and antonpirker committed Jul 11, 2023
1 parent b89fa8d commit 994a45b
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 83 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-integration-redis.yml
Expand Up @@ -31,7 +31,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.7","3.8","3.9"]
python-version: ["3.7","3.8","3.9","3.10","3.11"]
# python3.6 reached EOL and is no longer being supported on
# new versions of hosted runners on Github Actions
# ubuntu-20.04 is the last version that supported python3.6
Expand Down
Expand Up @@ -14,6 +14,7 @@

if TYPE_CHECKING:
from typing import Any, Sequence
from sentry_sdk.tracing import Span

_SINGLE_KEY_COMMANDS = frozenset(
["decr", "decrby", "get", "incr", "incrby", "pttl", "set", "setex", "setnx", "ttl"]
Expand All @@ -25,10 +26,64 @@
]

_MAX_NUM_ARGS = 10 # Trim argument lists to this many values
_MAX_NUM_COMMANDS = 10 # Trim command lists to this many values

_DEFAULT_MAX_DATA_SIZE = 1024


def _get_safe_command(name, args):
# type: (str, Sequence[Any]) -> str
command_parts = [name]

for i, arg in enumerate(args):
if i > _MAX_NUM_ARGS:
break

name_low = name.lower()

if name_low in _COMMANDS_INCLUDING_SENSITIVE_DATA:
command_parts.append(SENSITIVE_DATA_SUBSTITUTE)
continue

arg_is_the_key = i == 0
if arg_is_the_key:
command_parts.append(repr(arg))

else:
if _should_send_default_pii():
command_parts.append(repr(arg))
else:
command_parts.append(SENSITIVE_DATA_SUBSTITUTE)

command = " ".join(command_parts)
return command


def _set_pipeline_data(
span, is_cluster, get_command_args_fn, is_transaction, command_stack
):
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
span.set_tag("redis.is_cluster", is_cluster)
transaction = is_transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)

commands = []
for i, arg in enumerate(command_stack):
if i >= _MAX_NUM_COMMANDS:
break

command = get_command_args_fn(arg)
commands.append(_get_safe_command(command[0], command[1:]))

span.set_data(
"redis.commands",
{
"count": len(command_stack),
"first_ten": commands,
},
)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
# type: (Any, bool, Any) -> None
old_execute = pipeline_cls.execute
Expand All @@ -44,24 +99,12 @@ def sentry_patched_execute(self, *args, **kwargs):
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
span.set_tag("redis.is_cluster", is_cluster)
transaction = self.transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)

commands = []
for i, arg in enumerate(self.command_stack):
if i > _MAX_NUM_ARGS:
break
command_args = []
for j, command_arg in enumerate(get_command_args_fn(arg)):
if j > 0:
command_arg = repr(command_arg)
command_args.append(command_arg)
commands.append(" ".join(command_args))

span.set_data(
"redis.commands",
{"count": len(self.command_stack), "first_ten": commands},
_set_pipeline_data(
span,
is_cluster,
get_command_args_fn,
self.transaction,
self.command_stack,
)
span.set_data(SPANDATA.DB_SYSTEM, "redis")

Expand All @@ -80,6 +123,43 @@ def _parse_rediscluster_command(command):
return command.args


def _patch_redis(StrictRedis, client): # noqa: N803
# type: (Any, Any) -> None
patch_redis_client(StrictRedis, is_cluster=False)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
try:
strict_pipeline = client.StrictPipeline
except AttributeError:
pass
else:
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)

try:
import redis.asyncio
except ImportError:
pass
else:
from sentry_sdk.integrations.redis.asyncio import (
patch_redis_async_client,
patch_redis_async_pipeline,
)

patch_redis_async_client(redis.asyncio.client.StrictRedis)
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)


def _patch_rb():
# type: () -> None
try:
import rb.clients # type: ignore
except ImportError:
pass
else:
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)


def _patch_rediscluster():
# type: () -> None
try:
Expand Down Expand Up @@ -119,30 +199,40 @@ def setup_once():
except ImportError:
raise DidNotEnable("Redis client not installed")

patch_redis_client(StrictRedis, is_cluster=False)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
try:
strict_pipeline = client.StrictPipeline # type: ignore
except AttributeError:
pass
else:
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)

try:
import rb.clients # type: ignore
except ImportError:
pass
else:
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
_patch_redis(StrictRedis, client)
_patch_rb()

try:
_patch_rediscluster()
except Exception:
logger.exception("Error occurred while patching `rediscluster` library")


def _get_span_description(name, *args):
# type: (str, *Any) -> str
description = name

with capture_internal_exceptions():
description = _get_safe_command(name, args)

return description


def _set_client_data(span, is_cluster, name, *args):
# type: (Span, bool, str, *Any) -> None
span.set_tag("redis.is_cluster", is_cluster)
if name:
span.set_tag("redis.command", name)
span.set_tag(SPANDATA.DB_OPERATION, name)

if name and args:
name_low = name.lower()
if (name_low in _SINGLE_KEY_COMMANDS) or (
name_low in _MULTI_KEY_COMMANDS and len(args) == 1
):
span.set_tag("redis.key", args[0])


def patch_redis_client(cls, is_cluster):
# type: (Any, bool) -> None
"""
Expand All @@ -159,31 +249,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
if integration is None:
return old_execute_command(self, name, *args, **kwargs)

description = name

with capture_internal_exceptions():
description_parts = [name]
for i, arg in enumerate(args):
if i > _MAX_NUM_ARGS:
break

name_low = name.lower()

if name_low in _COMMANDS_INCLUDING_SENSITIVE_DATA:
description_parts.append(SENSITIVE_DATA_SUBSTITUTE)
continue

arg_is_the_key = i == 0
if arg_is_the_key:
description_parts.append(repr(arg))

else:
if _should_send_default_pii():
description_parts.append(repr(arg))
else:
description_parts.append(SENSITIVE_DATA_SUBSTITUTE)

description = " ".join(description_parts)
description = _get_span_description(name, *args)

data_should_be_truncated = (
integration.max_data_size and len(description) > integration.max_data_size
Expand All @@ -192,18 +258,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
description = description[: integration.max_data_size - len("...")] + "..."

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
span.set_tag("redis.is_cluster", is_cluster)

if name:
span.set_tag("redis.command", name)
span.set_tag(SPANDATA.DB_OPERATION, name)

if name and args:
name_low = name.lower()
if (name_low in _SINGLE_KEY_COMMANDS) or (
name_low in _MULTI_KEY_COMMANDS and len(args) == 1
):
span.set_tag("redis.key", args[0])
_set_client_data(span, is_cluster, name, *args)

return old_execute_command(self, name, *args, **kwargs)

Expand Down
67 changes: 67 additions & 0 deletions sentry_sdk/integrations/redis/asyncio.py
@@ -0,0 +1,67 @@
from __future__ import absolute_import

from sentry_sdk import Hub
from sentry_sdk.consts import OP
from sentry_sdk.utils import capture_internal_exceptions
from sentry_sdk.integrations.redis import (
RedisIntegration,
_get_redis_command_args,
_get_span_description,
_set_client_data,
_set_pipeline_data,
)


from sentry_sdk._types import MYPY

if MYPY:
from typing import Any


def patch_redis_async_pipeline(pipeline_cls):
# type: (Any) -> None
old_execute = pipeline_cls.execute

async def _sentry_execute(self, *args, **kwargs):
# type: (Any, *Any, **Any) -> Any
hub = Hub.current

if hub.get_integration(RedisIntegration) is None:
return await old_execute(self, *args, **kwargs)

with hub.start_span(
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_pipeline_data(
span,
False,
_get_redis_command_args,
self.is_transaction,
self.command_stack,
)

return await old_execute(self, *args, **kwargs)

pipeline_cls.execute = _sentry_execute


def patch_redis_async_client(cls):
# type: (Any) -> None
old_execute_command = cls.execute_command

async def _sentry_execute_command(self, name, *args, **kwargs):
# type: (Any, str, *Any, **Any) -> Any
hub = Hub.current

if hub.get_integration(RedisIntegration) is None:
return await old_execute_command(self, name, *args, **kwargs)

description = _get_span_description(name, *args)

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_client_data(span, False, name, *args)

return await old_execute_command(self, name, *args, **kwargs)

cls.execute_command = _sentry_execute_command
3 changes: 3 additions & 0 deletions tests/integrations/redis/asyncio/__init__.py
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("fakeredis.aioredis")

0 comments on commit 994a45b

Please sign in to comment.