Skip to content

Commit

Permalink
refs(alert_rules): Remove bulk methods for fetching incident stats. (#…
Browse files Browse the repository at this point in the history
…18842)

We no longer use these methods since we moved this parallelization to the frontend. These just make
it harder to understand how these functions work, so simplifying them back down to single fetch.
  • Loading branch information
wedamija committed May 19, 2020
1 parent 6fa01ac commit 927ac4b
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 224 deletions.
4 changes: 2 additions & 2 deletions src/sentry/incidents/endpoints/organization_incident_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from sentry.api.bases.incident import IncidentEndpoint, IncidentPermission
from sentry.api.serializers.snuba import SnubaTSResultSerializer
from sentry.incidents.logic import bulk_get_incident_stats
from sentry.incidents.logic import get_incident_stats


class OrganizationIncidentStatsEndpoint(IncidentEndpoint):
Expand All @@ -16,7 +16,7 @@ def get(self, request, organization, incident):
``````````````````
:auth: required
"""
stats = bulk_get_incident_stats([incident], windowed_stats=True)[0]
stats = get_incident_stats(incident, windowed_stats=True)
event_stats_serializer = SnubaTSResultSerializer(organization, None, request.user)
results = {
"eventStats": event_stats_serializer.serialize(stats["event_stats"]),
Expand Down
184 changes: 67 additions & 117 deletions src/sentry/incidents/logic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import absolute_import

from collections import defaultdict
from copy import deepcopy
from datetime import timedelta

Expand Down Expand Up @@ -43,9 +42,7 @@
update_snuba_query,
)
from sentry.snuba.tasks import apply_dataset_conditions
from sentry.utils.db import attach_foreignkey
from sentry.utils.snuba import bulk_raw_query, SnubaQueryParams, SnubaTSResult
from sentry.utils.compat import zip

# We can return an incident as "windowed" which returns a range of points around the start of the incident
# It attempts to center the start of the incident, only showing earlier data if there isn't enough time
Expand Down Expand Up @@ -320,56 +317,35 @@ def create_event_stat_snapshot(incident, windowed_stats=False):


def build_incident_query_params(incident, start=None, end=None, windowed_stats=False):
return bulk_build_incident_query_params(
[incident], start=start, end=end, windowed_stats=windowed_stats
)[0]


def bulk_build_incident_query_params(incidents, start=None, end=None, windowed_stats=False):
incident_groups = defaultdict(list)
for incident_id, group_id in IncidentGroup.objects.filter(incident__in=incidents).values_list(
"incident_id", "group_id"
):
incident_groups[incident_id].append(group_id)
incident_projects = defaultdict(list)
for incident_id, project_id in IncidentProject.objects.filter(
incident__in=incidents
).values_list("incident_id", "project_id"):
incident_projects[incident_id].append(project_id)

attach_foreignkey(incidents, Incident.alert_rule)

query_args_list = []
for incident in incidents:
params = {}
params = {}
params["start"], params["end"] = calculate_incident_time_range(
incident, start, end, windowed_stats=windowed_stats
)

params["start"], params["end"] = calculate_incident_time_range(
incident, start, end, windowed_stats=windowed_stats
group_ids = list(
IncidentGroup.objects.filter(incident=incident).values_list("group_id", flat=True)
)
if group_ids:
params["group_ids"] = group_ids
project_ids = list(
IncidentProject.objects.filter(incident=incident).values_list("project_id", flat=True)
)
if project_ids:
params["project_id"] = project_ids

snuba_filter = get_filter(incident.alert_rule.snuba_query.query, params)
conditions = resolve_discover_aliases(snuba_filter)[0].conditions
if incident.alert_rule:
conditions = apply_dataset_conditions(
QueryDatasets(incident.alert_rule.snuba_query.dataset), conditions
)

group_ids = incident_groups[incident.id]
if group_ids:
params["group_ids"] = group_ids
project_ids = incident_projects[incident.id]
if project_ids:
params["project_id"] = project_ids

snuba_filter = get_filter(incident.alert_rule.snuba_query.query, params)
conditions = resolve_discover_aliases(snuba_filter)[0].conditions
if incident.alert_rule:
conditions = apply_dataset_conditions(
QueryDatasets(incident.alert_rule.snuba_query.dataset), conditions
)
snuba_args = {
"start": snuba_filter.start,
"end": snuba_filter.end,
"conditions": conditions,
"filter_keys": snuba_filter.filter_keys,
"having": [],
}
query_args_list.append(snuba_args)

return query_args_list
return {
"start": snuba_filter.start,
"end": snuba_filter.end,
"conditions": conditions,
"filter_keys": snuba_filter.filter_keys,
"having": [],
}


def calculate_incident_time_range(incident, start=None, end=None, windowed_stats=False):
Expand Down Expand Up @@ -416,39 +392,30 @@ def get_incident_event_stats(incident, start=None, end=None, windowed_stats=Fals
Gets event stats for an incident. If start/end are provided, uses that time
period, otherwise uses the incident start/current_end.
"""
query_params = bulk_build_incident_query_params(
[incident], start=start, end=end, windowed_stats=windowed_stats
query_params = build_incident_query_params(
incident, start=start, end=end, windowed_stats=windowed_stats
)
snuba_params = SnubaQueryParams(
aggregations=[
(
query_aggregation_to_snuba[
aggregate_to_query_aggregation[incident.alert_rule.snuba_query.aggregate]
][0],
query_aggregation_to_snuba[
aggregate_to_query_aggregation[incident.alert_rule.snuba_query.aggregate]
][1],
"count",
)
],
orderby="time",
groupby=["time"],
rollup=incident.alert_rule.snuba_query.time_window,
limit=10000,
**query_params
)
return bulk_get_incident_event_stats([incident], query_params)[0]


def bulk_get_incident_event_stats(incidents, query_params_list):
snuba_params_list = [
SnubaQueryParams(
aggregations=[
(
query_aggregation_to_snuba[
aggregate_to_query_aggregation[incident.alert_rule.snuba_query.aggregate]
][0],
query_aggregation_to_snuba[
aggregate_to_query_aggregation[incident.alert_rule.snuba_query.aggregate]
][1],
"count",
)
],
orderby="time",
groupby=["time"],
rollup=incident.alert_rule.snuba_query.time_window,
limit=10000,
**query_param
)
for incident, query_param in zip(incidents, query_params_list)
]
results = bulk_raw_query(snuba_params_list, referrer="incidents.get_incident_event_stats")
return [
SnubaTSResult(result, snuba_params.start, snuba_params.end, snuba_params.rollup)
for snuba_params, result in zip(snuba_params_list, results)
]
results = bulk_raw_query([snuba_params], referrer="incidents.get_incident_event_stats")
return SnubaTSResult(results[0], snuba_params.start, snuba_params.end, snuba_params.rollup)


def get_incident_aggregates(incident, start=None, end=None, windowed_stats=False):
Expand All @@ -458,64 +425,47 @@ def get_incident_aggregates(incident, start=None, end=None, windowed_stats=False
- unique_users: Total number of unique users
"""
query_params = build_incident_query_params(incident, start, end, windowed_stats)
return bulk_get_incident_aggregates([query_params])[0]


def bulk_get_incident_aggregates(query_params_list):
snuba_params_list = [
SnubaQueryParams(
aggregations=[("count()", "", "count"), ("uniq", "tags[sentry:user]", "unique_users")],
limit=10000,
**query_param
**query_params
)
for query_param in query_params_list
]
results = bulk_raw_query(snuba_params_list, referrer="incidents.get_incident_aggregates")
return [result["data"][0] for result in results]
return [result["data"][0] for result in results][0]


def bulk_get_incident_stats(incidents, windowed_stats=False):
def get_incident_stats(incident, windowed_stats=False):
"""
Returns bulk stats for a list of incidents. This includes unique user count,
total event count and event stats.
Returns stats for an incident. This includes unique user count, total event count
and event stats.
Note that even though this function accepts a windowed_stats parameter, it does not
affect the snapshots. Only the live fetched stats.
"""
incident_stats = {}
if windowed_stats:
if windowed_stats and incident.status == IncidentStatus.CLOSED.value:
# At the moment, snapshots are only ever created with windowed_stats as True
# so if they send False, we need to do a live calculation below.
closed = [i for i in incidents if i.status == IncidentStatus.CLOSED.value]
snapshots = IncidentSnapshot.objects.filter(incident__in=closed)
for snapshot in snapshots:
try:
snapshot = IncidentSnapshot.objects.get(incident=incident)
event_stats = snapshot.event_stats_snapshot
incident_stats[snapshot.incident_id] = {
return {
"event_stats": SnubaTSResult(
event_stats.snuba_values, event_stats.start, event_stats.end, event_stats.period
),
"total_events": snapshot.total_events,
"unique_users": snapshot.unique_users,
}
except IncidentSnapshot.DoesNotExist:
pass

to_fetch = [i for i in incidents if i.id not in incident_stats]
if to_fetch:
query_params_list = bulk_build_incident_query_params(to_fetch, windowed_stats=False)
if windowed_stats:
windowed_query_params_list = bulk_build_incident_query_params(
to_fetch, windowed_stats=True
)
all_event_stats = bulk_get_incident_event_stats(to_fetch, windowed_query_params_list)
else:
all_event_stats = bulk_get_incident_event_stats(to_fetch, query_params_list)
all_aggregates = bulk_get_incident_aggregates(query_params_list)
for incident, event_stats, aggregates in zip(to_fetch, all_event_stats, all_aggregates):
incident_stats[incident.id] = {
"event_stats": event_stats,
"total_events": aggregates["count"],
"unique_users": aggregates["unique_users"],
}

return [incident_stats[incident.id] for incident in incidents]
event_stats = get_incident_event_stats(incident, windowed_stats=windowed_stats)
aggregates = get_incident_aggregates(incident)
return {
"event_stats": event_stats,
"total_events": aggregates["count"],
"unique_users": aggregates["unique_users"],
}


def subscribe_to_incident(incident, user):
Expand Down

0 comments on commit 927ac4b

Please sign in to comment.