Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply sampling override to Exceptions #3022

Merged
merged 13 commits into from
May 1, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,24 @@
package com.microsoft.applicationinsights.agent.internal.exporter;

import static com.azure.monitor.opentelemetry.exporter.implementation.utils.AzureMonitorMsgId.EXPORTER_MAPPING_ERROR;
import static com.microsoft.applicationinsights.agent.internal.exporter.ExporterUtils.shouldSample;

import com.azure.monitor.opentelemetry.exporter.implementation.LogDataMapper;
import com.azure.monitor.opentelemetry.exporter.implementation.logging.OperationLogger;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.QuickPulse;
import com.microsoft.applicationinsights.agent.internal.configuration.Configuration.SamplingOverride;
import com.microsoft.applicationinsights.agent.internal.sampling.AiSampler;
import com.microsoft.applicationinsights.agent.internal.sampling.SamplingOverrides;
import com.microsoft.applicationinsights.agent.internal.telemetry.BatchItemProcessor;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryClient;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryObservers;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -134,23 +132,4 @@ public CompletableResultCode flush() {
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}

@SuppressFBWarnings(
value = "SECPR", // Predictable pseudorandom number generator
justification = "Predictable random is ok for sampling decision")
private static boolean shouldSample(SpanContext spanContext, double percentage) {
if (percentage == 100) {
// optimization, no need to calculate score
return true;
}
if (percentage == 0) {
// optimization, no need to calculate score
return false;
}
if (spanContext.isValid()) {
return AiSampler.shouldRecordAndSample(spanContext.getTraceId(), percentage);
}
// this is a standalone log (not part of a trace), so randomly sample at the given percentage
return ThreadLocalRandom.current().nextDouble() < percentage / 100;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,28 @@
package com.microsoft.applicationinsights.agent.internal.exporter;

import static com.azure.monitor.opentelemetry.exporter.implementation.utils.AzureMonitorMsgId.EXPORTER_MAPPING_ERROR;
import static com.microsoft.applicationinsights.agent.internal.exporter.ExporterUtils.shouldSample;

import com.azure.monitor.opentelemetry.exporter.implementation.SemanticAttributes;
import com.azure.monitor.opentelemetry.exporter.implementation.SpanDataMapper;
import com.azure.monitor.opentelemetry.exporter.implementation.logging.OperationLogger;
import com.azure.monitor.opentelemetry.exporter.implementation.models.ContextTagKeys;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.QuickPulse;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings;
import com.microsoft.applicationinsights.agent.internal.configuration.Configuration;
import com.microsoft.applicationinsights.agent.internal.sampling.SamplingOverrides;
import com.microsoft.applicationinsights.agent.internal.telemetry.BatchItemProcessor;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryClient;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryObservers;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
import java.util.List;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
Expand All @@ -31,11 +40,15 @@ public final class AgentSpanExporter implements SpanExporter {

private final SpanDataMapper mapper;
private final Consumer<TelemetryItem> telemetryItemConsumer;
private final SamplingOverrides exceptionSamplingOverrides;
private final BiPredicate<EventData, String> eventSuppressor;

public AgentSpanExporter(
SpanDataMapper mapper,
@Nullable QuickPulse quickPulse,
BatchItemProcessor batchItemProcessor) {
BatchItemProcessor batchItemProcessor,
List<Configuration.SamplingOverride> samplingOverrides,
BiPredicate<EventData, String> eventSuppressor) {
this.mapper = mapper;
telemetryItemConsumer =
telemetryItem -> {
Expand All @@ -47,6 +60,8 @@ public AgentSpanExporter(
.forEach(consumer -> consumer.accept(telemetryItem));
batchItemProcessor.trackAsync(telemetryItem);
};
exceptionSamplingOverrides = new SamplingOverrides(samplingOverrides);
this.eventSuppressor = eventSuppressor;
}

@Override
Expand All @@ -58,13 +73,16 @@ public CompletableResultCode export(Collection<SpanData> spans) {
}
for (SpanData span : spans) {
logger.debug("exporting span: {}", span);
TelemetryItem telemetryItem = null;
try {
mapper.map(span, telemetryItemConsumer);
telemetryItem = mapper.map(span, telemetryItemConsumer);
exportEvents(span, telemetryItem);
exportingSpanLogger.recordSuccess();
} catch (Throwable t) {
exportingSpanLogger.recordFailure(t.getMessage(), t, EXPORTER_MAPPING_ERROR);
}
}

// always returning success, because all error handling is performed internally
return CompletableResultCode.ofSuccess();
}
Expand All @@ -78,4 +96,38 @@ public CompletableResultCode flush() {
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}

private void exportEvents(SpanData span, TelemetryItem telemetryItem) {
for (EventData event : span.getEvents()) {
String instrumentationScopeName = span.getInstrumentationScopeInfo().getName();
if (eventSuppressor.test(event, instrumentationScopeName)) {
continue;
}
event.getAttributes().forEach((k, v) -> logger.debug("event.attributes: {}:{}", k, v));
if (event.getAttributes().get(SemanticAttributes.EXCEPTION_TYPE) != null
|| event.getAttributes().get(SemanticAttributes.EXCEPTION_MESSAGE) != null) {
SpanContext parentSpanContext = span.getParentSpanContext();
// Application Insights expects exception records to be "top-level" exceptions
// not just any exception that bubbles up
if (!parentSpanContext.isValid() || parentSpanContext.isRemote()) {
// TODO (trask) map OpenTelemetry exception to Application Insights exception better
String stacktrace = event.getAttributes().get(SemanticAttributes.EXCEPTION_STACKTRACE);
if (stacktrace != null) {
Double samplingPercentage =
exceptionSamplingOverrides.getOverridePercentage(event.getAttributes());
if (samplingPercentage != null
heyams marked this conversation as resolved.
Show resolved Hide resolved
&& !shouldSample(span.getSpanContext(), samplingPercentage)) {
continue;
}
mapper.exportEvent(
span,
telemetryItem.getTags().get(ContextTagKeys.AI_OPERATION_NAME.toString()),
telemetryItemConsumer,
stacktrace,
event);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.applicationinsights.agent.internal.exporter;

import com.microsoft.applicationinsights.agent.internal.sampling.AiSampler;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentelemetry.api.trace.SpanContext;
import java.util.concurrent.ThreadLocalRandom;

final class ExporterUtils {

@SuppressFBWarnings(
value = "SECPR", // Predictable pseudorandom number generator
justification = "Predictable random is ok for sampling decision")
static boolean shouldSample(SpanContext spanContext, double percentage) {
if (percentage == 100) {
// optimization, no need to calculate score
return true;
}
if (percentage == 0) {
// optimization, no need to calculate score
return false;
}
if (spanContext.isValid()) {
return AiSampler.shouldRecordAndSample(spanContext.getTraceId(), percentage);
}
// this is a standalone log (not part of a trace), so randomly sample at the given percentage
return ThreadLocalRandom.current().nextDouble() < percentage / 100;
}

private ExporterUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,16 @@ private static SdkTracerProviderBuilder configureTracing(

String tracesExporter = otelConfig.getString("otel.traces.exporter");
if ("none".equals(tracesExporter)) { // "none" is the default set in AiConfigCustomizer
List<Configuration.SamplingOverride> exceptionSamplingOverrides =
configuration.preview.sampling.overrides.stream()
.filter(override -> override.telemetryType == SamplingTelemetryType.EXCEPTION)
.collect(Collectors.toList());
SpanExporter spanExporter =
createSpanExporter(
telemetryClient, quickPulse, configuration.preview.captureHttpServer4xxAsError);
telemetryClient,
quickPulse,
configuration.preview.captureHttpServer4xxAsError,
exceptionSamplingOverrides);

spanExporter = wrapSpanExporter(spanExporter, configuration);

Expand All @@ -355,12 +362,20 @@ private static SdkTracerProviderBuilder configureTracing(
private static SpanExporter createSpanExporter(
TelemetryClient telemetryClient,
@Nullable QuickPulse quickPulse,
boolean captureHttpServer4xxAsError) {
boolean captureHttpServer4xxAsError,
List<Configuration.SamplingOverride> exceptionSamplingOverrides) {

SpanDataMapper mapper =
new SpanDataMapper(
captureHttpServer4xxAsError,
telemetryClient::populateDefaults,
new SpanDataMapper(captureHttpServer4xxAsError, telemetryClient::populateDefaults);

BatchItemProcessor batchItemProcessor = telemetryClient.getGeneralBatchItemProcessor();

return new StatsbeatSpanExporter(
new AgentSpanExporter(
mapper,
quickPulse,
batchItemProcessor,
exceptionSamplingOverrides,
(event, instrumentationName) -> {
boolean lettuce51 = instrumentationName.equals("io.opentelemetry.lettuce-5.1");
if (lettuce51 && event.getName().startsWith("redis.encode.")) {
Expand All @@ -376,12 +391,7 @@ private static SpanExporter createSpanExporter(
return true;
}
return false;
});

BatchItemProcessor batchItemProcessor = telemetryClient.getGeneralBatchItemProcessor();

return new StatsbeatSpanExporter(
new AgentSpanExporter(mapper, quickPulse, batchItemProcessor),
}),
telemetryClient.getStatsbeatModule());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ public AzureMonitorExporterBuilder credential(TokenCredential credential) {
* environment variable "APPLICATIONINSIGHTS_CONNECTION_STRING" is not set.
*/
public SpanExporter buildTraceExporter() {
SpanDataMapper mapper =
new SpanDataMapper(true, this::populateDefaults, (event, instrumentationName) -> false);
SpanDataMapper mapper = new SpanDataMapper(true, this::populateDefaults);

return new AzureMonitorTraceExporter(mapper, initExporterBuilder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.azure.monitor.opentelemetry.exporter.implementation.utils.FormattedTime;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.UrlParser;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.resources.Resource;
Expand All @@ -32,7 +31,6 @@
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import reactor.util.annotation.Nullable;

Expand Down Expand Up @@ -104,31 +102,24 @@ public final class SpanDataMapper {

private final boolean captureHttpServer4xxAsError;
private final BiConsumer<AbstractTelemetryBuilder, Resource> telemetryInitializer;
private final BiPredicate<EventData, String> eventSuppressor;

public SpanDataMapper(
boolean captureHttpServer4xxAsError,
BiConsumer<AbstractTelemetryBuilder, Resource> telemetryInitializer,
BiPredicate<EventData, String> eventSuppressor) {
BiConsumer<AbstractTelemetryBuilder, Resource> telemetryInitializer) {
this.captureHttpServer4xxAsError = captureHttpServer4xxAsError;
this.telemetryInitializer = telemetryInitializer;
this.eventSuppressor = eventSuppressor;
}

public TelemetryItem map(SpanData span) {
long itemCount = getItemCount(span);
return map(span, itemCount);
}

public void map(SpanData span, Consumer<TelemetryItem> consumer) {
public TelemetryItem map(SpanData span, Consumer<TelemetryItem> consumer) {
long itemCount = getItemCount(span);
TelemetryItem telemetryItem = map(span, itemCount);
consumer.accept(telemetryItem);
exportEvents(
span,
telemetryItem.getTags().get(ContextTagKeys.AI_OPERATION_NAME.toString()),
itemCount,
consumer);
return telemetryItem;
}

public TelemetryItem map(SpanData span, long itemCount) {
Expand Down Expand Up @@ -685,55 +676,39 @@ private static String nullAwareConcat(
return str1 + separator + str2;
}

private void exportEvents(
public void exportEvent(
SpanData span,
@Nullable String operationName,
long itemCount,
Consumer<TelemetryItem> consumer) {
for (EventData event : span.getEvents()) {
String instrumentationScopeName = span.getInstrumentationScopeInfo().getName();
if (eventSuppressor.test(event, instrumentationScopeName)) {
continue;
}

if (event.getAttributes().get(SemanticAttributes.EXCEPTION_TYPE) != null
|| event.getAttributes().get(SemanticAttributes.EXCEPTION_MESSAGE) != null) {
SpanContext parentSpanContext = span.getParentSpanContext();
// Application Insights expects exception records to be "top-level" exceptions
// not just any exception that bubbles up
if (!parentSpanContext.isValid() || parentSpanContext.isRemote()) {
// TODO (trask) map OpenTelemetry exception to Application Insights exception better
String stacktrace = event.getAttributes().get(SemanticAttributes.EXCEPTION_STACKTRACE);
if (stacktrace != null) {
consumer.accept(
createExceptionTelemetryItem(stacktrace, span, operationName, itemCount));
}
}
return;
}
Consumer<TelemetryItem> consumer,
String stacktrace,
EventData event) {
long itemCount = getItemCount(span);
if (stacktrace != null) {
consumer.accept(createExceptionTelemetryItem(stacktrace, span, operationName, itemCount));
return;
}

MessageTelemetryBuilder telemetryBuilder = MessageTelemetryBuilder.create();
telemetryInitializer.accept(telemetryBuilder, span.getResource());
MessageTelemetryBuilder telemetryBuilder = MessageTelemetryBuilder.create();
telemetryInitializer.accept(telemetryBuilder, span.getResource());

// set standard properties
setOperationId(telemetryBuilder, span.getTraceId());
setOperationParentId(telemetryBuilder, span.getSpanId());
if (operationName != null) {
setOperationName(telemetryBuilder, operationName);
} else {
setOperationName(telemetryBuilder, span.getAttributes());
}
setTime(telemetryBuilder, event.getEpochNanos());
setItemCount(telemetryBuilder, itemCount);
// set standard properties
setOperationId(telemetryBuilder, span.getTraceId());
setOperationParentId(telemetryBuilder, span.getSpanId());
if (operationName != null) {
setOperationName(telemetryBuilder, operationName);
} else {
setOperationName(telemetryBuilder, span.getAttributes());
}
setTime(telemetryBuilder, event.getEpochNanos());
setItemCount(telemetryBuilder, itemCount);

// update tags
MAPPINGS.map(event.getAttributes(), telemetryBuilder);
// update tags
MAPPINGS.map(event.getAttributes(), telemetryBuilder);

// set message-specific properties
telemetryBuilder.setMessage(event.getName());
// set message-specific properties
telemetryBuilder.setMessage(event.getName());

consumer.accept(telemetryBuilder.build());
}
consumer.accept(telemetryBuilder.build());
}

private TelemetryItem createExceptionTelemetryItem(
Expand Down