Skip to content

Commit

Permalink
Add a configurable max_export_batch_size to the gRPC metrics exporter (
Browse files Browse the repository at this point in the history
  • Loading branch information
overmeulen committed Sep 9, 2022
1 parent 438ca5b commit 41b9e26
Show file tree
Hide file tree
Showing 3 changed files with 429 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Add a configurable max_export_batch_size to the gRPC metrics exporter
([#2809](https://github.com/open-telemetry/opentelemetry-python/pull/2809))
- Remove support for 3.6
([#2763](https://github.com/open-telemetry/opentelemetry-python/pull/2763))
- Update PeriodicExportingMetricReader to never call export() concurrently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import dataclasses
from logging import getLogger
from os import environ
from typing import Dict, Optional, Sequence
from typing import Dict, Iterable, List, Optional, Sequence
from grpc import ChannelCredentials, Compression
from opentelemetry.sdk.metrics._internal.aggregation import Aggregation
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
Expand Down Expand Up @@ -42,12 +43,15 @@
)
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
DataPointT,
Gauge,
Histogram as HistogramType,
Metric,
MetricExporter,
MetricExportResult,
MetricsData,
ResourceMetrics,
ScopeMetrics,
Sum,
)

Expand All @@ -58,6 +62,14 @@ class OTLPMetricExporter(
MetricExporter,
OTLPExporterMixin[Metric, ExportMetricsServiceRequest, MetricExportResult],
):
"""OTLP metric exporter
Args:
max_export_batch_size: Maximum number of data points to export in a single request. This is to deal with
gRPC's 4MB message size limit. If not set there is no limit to the number of data points in a request.
If it is set and the number of data points exceeds the max, the request will be split.
"""

_result = MetricExportResult
_stub = MetricsServiceStub

Expand All @@ -71,6 +83,7 @@ def __init__(
compression: Optional[Compression] = None,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, Aggregation] = None,
max_export_batch_size: Optional[int] = None,
):

if insecure is None:
Expand Down Expand Up @@ -122,6 +135,8 @@ def __init__(
compression=compression,
)

self._max_export_batch_size: Optional[int] = max_export_batch_size

def _translate_data(
self, data: MetricsData
) -> ExportMetricsServiceRequest:
Expand Down Expand Up @@ -223,8 +238,9 @@ def _translate_data(
)
pb2_metric.sum.data_points.append(pt)
else:
_logger.warn(
"unsupported datapoint type %s", metric.point
_logger.warning(
"unsupported data type %s",
metric.data.__class__.__name__,
)
continue

Expand All @@ -245,7 +261,101 @@ def export(
**kwargs,
) -> MetricExportResult:
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
return self._export(metrics_data)
if self._max_export_batch_size is None:
return self._export(data=metrics_data)

export_result = MetricExportResult.SUCCESS

for split_metrics_data in self._split_metrics_data(metrics_data):
split_export_result = self._export(data=split_metrics_data)

if split_export_result is MetricExportResult.FAILURE:
export_result = MetricExportResult.FAILURE

return export_result

def _split_metrics_data(
self,
metrics_data: MetricsData,
) -> Iterable[MetricsData]:
batch_size: int = 0
split_resource_metrics: List[ResourceMetrics] = []

for resource_metrics in metrics_data.resource_metrics:
split_scope_metrics: List[ScopeMetrics] = []
split_resource_metrics.append(
dataclasses.replace(
resource_metrics,
scope_metrics=split_scope_metrics,
)
)
for scope_metrics in resource_metrics.scope_metrics:
split_metrics: List[Metric] = []
split_scope_metrics.append(
dataclasses.replace(
scope_metrics,
metrics=split_metrics,
)
)
for metric in scope_metrics.metrics:
split_data_points: List[DataPointT] = []
split_metrics.append(
dataclasses.replace(
metric,
data=dataclasses.replace(
metric.data,
data_points=split_data_points,
),
)
)

for data_point in metric.data.data_points:
split_data_points.append(data_point)
batch_size += 1

if batch_size >= self._max_export_batch_size:
yield MetricsData(
resource_metrics=split_resource_metrics
)
# Reset all the variables
batch_size = 0
split_data_points = []
split_metrics = [
dataclasses.replace(
metric,
data=dataclasses.replace(
metric.data,
data_points=split_data_points,
),
)
]
split_scope_metrics = [
dataclasses.replace(
scope_metrics,
metrics=split_metrics,
)
]
split_resource_metrics = [
dataclasses.replace(
resource_metrics,
scope_metrics=split_scope_metrics,
)
]

if not split_data_points:
# If data_points is empty remove the whole metric
split_metrics.pop()

if not split_metrics:
# If metrics is empty remove the whole scope_metrics
split_scope_metrics.pop()

if not split_scope_metrics:
# If scope_metrics is empty remove the whole resource_metrics
split_resource_metrics.pop()

if batch_size > 0:
yield MetricsData(resource_metrics=split_resource_metrics)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
Expand Down

0 comments on commit 41b9e26

Please sign in to comment.