Skip to content

Commit

Permalink
Better extension support
Browse files Browse the repository at this point in the history
  • Loading branch information
trask committed Jan 10, 2024
1 parent 147c366 commit 23b6e10
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 141 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.microsoft.applicationinsights.agent.internal.init;

import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryClient;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.spi.internal.AutoConfigureListener;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.concurrent.TimeUnit;

public class FlushOnShutdown implements AutoConfigureListener {
@Override
public void afterAutoConfigure(OpenTelemetrySdk sdk) {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> flushAll(sdk, TelemetryClient.getActive()).join(10, TimeUnit.SECONDS)));
}

private static CompletableResultCode flushAll(
OpenTelemetrySdk sdk, TelemetryClient telemetryClient) {
CompletableResultCode sdkShutdownResult = sdk.shutdown();
CompletableResultCode overallResult = new CompletableResultCode();
sdkShutdownResult.whenComplete(
() -> {
// IMPORTANT: the metric reader flush will fail if the periodic metric reader is already
// mid-exporter
CompletableResultCode telemetryClientResult = telemetryClient.forceFlush();
telemetryClientResult.whenComplete(
() -> {
if (sdkShutdownResult.isSuccess() && telemetryClientResult.isSuccess()) {
overallResult.succeed();
} else {
overallResult.fail();
}
});
});
return overallResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import static java.util.concurrent.TimeUnit.MINUTES;

import com.azure.core.util.logging.ClientLogger;
import com.azure.monitor.opentelemetry.exporter.implementation.AzureMonitorExporterProviderKeys;
import com.azure.monitor.opentelemetry.exporter.implementation.AzureMonitorLogRecordExporterProvider;
import com.azure.monitor.opentelemetry.exporter.implementation.AzureMonitorMetricExporterProvider;
import com.azure.monitor.opentelemetry.exporter.implementation.AzureMonitorSpanExporterProvider;
import com.azure.monitor.opentelemetry.exporter.implementation.LogDataMapper;
import com.azure.monitor.opentelemetry.exporter.implementation.MetricDataMapper;
import com.azure.monitor.opentelemetry.exporter.implementation.SpanDataMapper;
Expand Down Expand Up @@ -52,30 +56,25 @@
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder;
import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReaderBuilder;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.internal.view.AiViewRegistry;
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.semconv.SemanticAttributes;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -89,10 +88,6 @@ public class SecondEntryPoint implements AutoConfigurationCustomizerProvider {

@Nullable private static AgentLogExporter agentLogExporter;

@Nullable private static BatchLogRecordProcessor batchLogProcessor;
@Nullable private static BatchSpanProcessor batchSpanProcessor;
@Nullable private static MetricReader metricReader;

static File getTempDir() {
return tempDir;
}
Expand Down Expand Up @@ -239,42 +234,96 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
telemetryClient.setQuickPulse(quickPulse);

autoConfiguration
.addPropertiesSupplier(
() -> {
Map<String, String> props = new HashMap<>();
props.put("otel.traces.exporter", AzureMonitorExporterProviderKeys.EXPORTER_NAME);
props.put("otel.metrics.exporter", AzureMonitorExporterProviderKeys.EXPORTER_NAME);
props.put("otel.logs.exporter", AzureMonitorExporterProviderKeys.EXPORTER_NAME);
props.put(
AzureMonitorExporterProviderKeys.INTERNAL_USING_AZURE_MONITOR_EXPORTER_BUILDER,
"true");
props.put(
"otel.metric.export.interval",
Integer.toString(configuration.metricIntervalSeconds * 1000));
// using small interval because need to convert to TelemetryItem as soon as possible
// to grab data for live metrics. the real batching is done at a lower level
// (not using batch size 1 because that seems to cause poor performance on small
// containers)
props.put("otel.bsp.schedule.delay", "100");
return props;
})
.addPropertiesCustomizer(new AiConfigCustomizer())
.addSpanExporterCustomizer(
(spanExporter, otelConfig) -> {
if ("none".equals(otelConfig.getString("otel.traces.exporter"))) {
// in this case the spanExporter here is the noop spanExporter
return spanExporter;
(spanExporter, configProperties) -> {
if (spanExporter instanceof AzureMonitorSpanExporterProvider.MarkerSpanExporter) {
return buildTraceExporter(configuration, telemetryClient, quickPulse);
}
return wrapSpanExporter(spanExporter, configuration);
})
.addMetricExporterCustomizer(
(metricExporter, configProperties) -> {
if (metricExporter
instanceof AzureMonitorMetricExporterProvider.MarkerMetricExporter) {
return buildMetricExporter(configuration, telemetryClient, metricFilters);
} else {
return wrapSpanExporter(spanExporter, configuration);
return metricExporter;
}
})
.addTracerProviderCustomizer(
(builder, otelConfig) ->
configureTracing(builder, telemetryClient, quickPulse, otelConfig, configuration))
.addLogRecordExporterCustomizer(
(logExporter, otelConfig) -> {
if ("none".equals(otelConfig.getString("otel.logs.exporter"))) {
// in this case the logExporter here is the noop spanExporter
return logExporter;
(logRecordExporter, configProperties) -> {
if (logRecordExporter
instanceof AzureMonitorLogRecordExporterProvider.MarkerLogRecordExporter) {
return buildLogRecordExporter(configuration, telemetryClient, quickPulse);
} else {
return wrapLogExporter(logExporter, configuration);
return wrapLogExporter(logRecordExporter, configuration);
}
})
.addLoggerProviderCustomizer(
(builder, otelConfig) ->
configureLogging(builder, telemetryClient, quickPulse, otelConfig, configuration))
.addTracerProviderCustomizer(
(builder, otelConfig) -> configureTracing(builder, configuration))
.addMeterProviderCustomizer(
(builder, otelConfig) ->
configureMetrics(metricFilters, builder, telemetryClient, configuration));
(builder, otelConfig) -> configureMetrics(builder, configuration))
.addLoggerProviderCustomizer(
(builder, otelConfig) -> configureLogging(builder, configuration));

AiContextCustomizerHolder.setInstance(
new AiContextCustomizer<>(
configuration.preview.connectionStringOverrides,
configuration.preview.roleNameOverrides));
}

Runtime.getRuntime()
.addShutdownHook(new Thread(() -> flushAll(telemetryClient).join(10, TimeUnit.SECONDS)));
private static SpanExporter buildTraceExporter(
Configuration configuration, TelemetryClient telemetryClient, QuickPulse quickPulse) {
List<Configuration.SamplingOverride> exceptionSamplingOverrides =
configuration.preview.sampling.overrides.stream()
.filter(override -> override.telemetryType == SamplingTelemetryType.EXCEPTION)
.collect(Collectors.toList());
SpanExporter spanExporter =
createSpanExporter(
telemetryClient,
quickPulse,
configuration.preview.captureHttpServer4xxAsError,
new SamplingOverrides(exceptionSamplingOverrides));

return wrapSpanExporter(spanExporter, configuration);
}

private static MetricExporter buildMetricExporter(
Configuration configuration,
TelemetryClient telemetryClient,
List<MetricFilter> metricFilters) {
MetricDataMapper mapper =
new MetricDataMapper(
telemetryClient::populateDefaults, configuration.preview.captureHttpServer4xxAsError);
return new AgentMetricExporter(
metricFilters, mapper, telemetryClient.getMetricsBatchItemProcessor());
}

private static LogRecordExporter buildLogRecordExporter(
Configuration configuration, TelemetryClient telemetryClient, QuickPulse quickPulse) {
LogRecordExporter logExporter = createLogExporter(telemetryClient, quickPulse, configuration);

return wrapLogExporter(logExporter, configuration);
}

private static Set<Feature> initStatsbeatFeatureSet(Configuration config) {
Expand Down Expand Up @@ -439,42 +488,8 @@ private static Set<Feature> initStatsbeatFeatureSet(Configuration config) {
return featureList;
}

private static CompletableResultCode flushAll(TelemetryClient telemetryClient) {
List<CompletableResultCode> results = new ArrayList<>();
if (batchSpanProcessor != null) {
results.add(batchSpanProcessor.forceFlush());
}
if (metricReader != null) {
results.add(metricReader.forceFlush());
}
if (batchLogProcessor != null) {
results.add(batchLogProcessor.forceFlush());
}
CompletableResultCode overallResult = new CompletableResultCode();
CompletableResultCode initialResult = CompletableResultCode.ofAll(results);
initialResult.whenComplete(
() -> {
// IMPORTANT: the metric reader flush will fail if the periodic metric reader is already
// mid-exporter
CompletableResultCode telemetryClientResult = telemetryClient.forceFlush();
telemetryClientResult.whenComplete(
() -> {
if (initialResult.isSuccess() && telemetryClientResult.isSuccess()) {
overallResult.succeed();
} else {
overallResult.fail();
}
});
});
return overallResult;
}

private static SdkTracerProviderBuilder configureTracing(
SdkTracerProviderBuilder tracerProvider,
TelemetryClient telemetryClient,
@Nullable QuickPulse quickPulse,
ConfigProperties otelConfig,
Configuration configuration) {
SdkTracerProviderBuilder tracerProvider, Configuration configuration) {

boolean enabled = !Strings.isNullOrEmpty(configuration.connectionString);
RuntimeConfigurator.updatePropagation(
Expand Down Expand Up @@ -506,30 +521,6 @@ private static SdkTracerProviderBuilder configureTracing(
tracerProvider.addSpanProcessor(new AiLegacyHeaderSpanProcessor());
}

String tracesExporter = otelConfig.getString("otel.traces.exporter");
if ("none".equals(tracesExporter)) { // "none" is the default set in AiConfigCustomizer
List<Configuration.SamplingOverride> exceptionSamplingOverrides =
configuration.preview.sampling.overrides.stream()
.filter(override -> override.telemetryType == SamplingTelemetryType.EXCEPTION)
.collect(Collectors.toList());
SpanExporter spanExporter =
createSpanExporter(
telemetryClient,
quickPulse,
configuration.preview.captureHttpServer4xxAsError,
new SamplingOverrides(exceptionSamplingOverrides));

spanExporter = wrapSpanExporter(spanExporter, configuration);

// using BatchSpanProcessor in order to get off of the application thread as soon as possible
batchSpanProcessor =
BatchSpanProcessor.builder(spanExporter)
.setScheduleDelay(getBatchProcessorDelay())
.build();

tracerProvider.addSpanProcessor(batchSpanProcessor);
}

return tracerProvider;
}

Expand Down Expand Up @@ -617,11 +608,7 @@ private static List<ProcessorConfig> getSpanProcessorConfigs(Configuration confi
// telemetry and exception telemetry can be reported as either span events or as log records with
// an exception stack traces
private static SdkLoggerProviderBuilder configureLogging(
SdkLoggerProviderBuilder builder,
TelemetryClient telemetryClient,
@Nullable QuickPulse quickPulse,
ConfigProperties otelConfig,
Configuration configuration) {
SdkLoggerProviderBuilder builder, Configuration configuration) {

builder.addLogRecordProcessor(new AzureMonitorLogProcessor());

Expand All @@ -641,21 +628,6 @@ private static SdkLoggerProviderBuilder configureLogging(
// "ai.preview.service_name" being set programmatically on CONSUMER spans
builder.addLogRecordProcessor(new InheritedRoleNameLogProcessor());

String logsExporter = otelConfig.getString("otel.logs.exporter");
if ("none".equals(logsExporter)) { // "none" is the default set in AiConfigCustomizer
LogRecordExporter logExporter = createLogExporter(telemetryClient, quickPulse, configuration);

logExporter = wrapLogExporter(logExporter, configuration);

// using BatchLogProcessor in order to get off of the application thread as soon as possible
batchLogProcessor =
BatchLogRecordProcessor.builder(logExporter)
.setScheduleDelay(getBatchProcessorDelay())
.build();

builder.addLogRecordProcessor(batchLogProcessor);
}

return builder;
}

Expand Down Expand Up @@ -724,39 +696,16 @@ private static List<ProcessorConfig> getLogProcessorConfigs(Configuration config
.collect(Collectors.toCollection(ArrayList::new));
}

private static Duration getBatchProcessorDelay() {
String delayMillisStr = System.getenv("APPLICATIONINSIGHTS_PREVIEW_BSP_SCHEDULE_DELAY");
if (delayMillisStr != null) {
return Duration.ofMillis(Integer.parseInt(delayMillisStr));
} else {
// using small interval because need to convert to TelemetryItem as soon as possible to grab
// data for live metrics. the real batching is done at a lower level
// (not using batch size 1 because that seems to cause poor performance on small containers)
return Duration.ofMillis(100);
}
}
// TODO (trask)
// APPLICATIONINSIGHTS_PREVIEW_BSP_SCHEDULE_DELAY renamed to OTEL_BSP_SCHEDULE_DELAY

private static SdkMeterProviderBuilder configureMetrics(
List<MetricFilter> metricFilters,
SdkMeterProviderBuilder builder,
TelemetryClient telemetryClient,
Configuration configuration) {
MetricDataMapper mapper =
new MetricDataMapper(
telemetryClient::populateDefaults, configuration.preview.captureHttpServer4xxAsError);
PeriodicMetricReaderBuilder readerBuilder =
PeriodicMetricReader.builder(
new AgentMetricExporter(
metricFilters, mapper, telemetryClient.getMetricsBatchItemProcessor()));
int intervalMillis =
Integer.getInteger(
"applicationinsights.testing.metric-reader-interval-millis",
configuration.metricIntervalSeconds * 1000);
metricReader = readerBuilder.setInterval(Duration.ofMillis(intervalMillis)).build();
SdkMeterProviderBuilder builder, Configuration configuration) {

if (configuration.internal.preAggregatedStandardMetrics.enabled) {
AiViewRegistry.registerViews(builder);
}
return builder.registerMetricReader(metricReader);
return builder;
}

private static class BackCompatHttpUrlProcessor implements SpanExporter {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.microsoft.applicationinsights.agent.internal.init.FlushOnShutdown
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.monitor.opentelemetry.exporter.implementation.AzureMonitorLogRecordExporterProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.monitor.opentelemetry.exporter.implementation.AzureMonitorMetricExporterProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.monitor.opentelemetry.exporter.implementation.AzureMonitorSpanExporterProvider
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ private void startTestApplicationContainer() throws Exception {
.withEnv("APPLICATIONINSIGHTS_CONNECTION_STRING", connectionString)
.withEnv("APPLICATIONINSIGHTS_SELF_DIAGNOSTICS_LEVEL", selfDiagnosticsLevel)
.withEnv("OTEL_RESOURCE_ATTRIBUTES", otelResourceAttributesEnvVar)
.withEnv("APPLICATIONINSIGHTS_METRIC_INTERVAL_SECONDS", "1")
.withNetwork(network)
.withExposedPorts(8080)
.withFileSystemBind(
Expand All @@ -409,7 +410,6 @@ private void startTestApplicationContainer() throws Exception {

List<String> javaToolOptions = new ArrayList<>();
javaToolOptions.add("-Dapplicationinsights.testing.batch-schedule-delay-millis=500");
javaToolOptions.add("-Dapplicationinsights.testing.metric-reader-interval-millis=500");
if (agentExtensionFile != null) {
javaToolOptions.add("-Dotel.javaagent.extensions=/" + agentExtensionFile.getName());
}
Expand All @@ -422,7 +422,6 @@ private void startTestApplicationContainer() throws Exception {
javaToolOptions.add("-Dotel.metrics.exporter=otlp");
javaToolOptions.add("-Dotel.exporter.otlp.metrics.endpoint=" + FAKE_OTLP_INGESTION_ENDPOINT);
javaToolOptions.add("-Dotel.exporter.otlp.protocol=http/protobuf");
javaToolOptions.add("-Dotel.metric.export.interval=5000");
}
if (REMOTE_DEBUG) {
javaToolOptions.add(
Expand Down

0 comments on commit 23b6e10

Please sign in to comment.