Skip to content

Commit

Permalink
refactor: change metrics export data model to match OTLP protos (#2809)
Browse files Browse the repository at this point in the history
* refactor: match metrics export model with proto

* refactor: get rid of MetricsData

* fix: don't filter empty instrumentation library metrics
  • Loading branch information
seemk committed Mar 6, 2022
1 parent 82192b5 commit c6dab2a
Show file tree
Hide file tree
Showing 28 changed files with 91 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { Counter, Histogram, UpDownCounter } from './Instruments';
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { MetricData } from './export/MetricData';
import { InstrumentationLibraryMetrics } from './export/MetricData';
import { isNotNullish } from './utils';
import { MetricCollectorHandle } from './state/MetricCollector';
import { HrTime } from '@opentelemetry/api';
Expand Down Expand Up @@ -128,16 +128,18 @@ export class Meter implements metrics.Meter {
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<MetricData[]> {
const result = await Promise.all(this._metricStorageRegistry.getStorages().map(metricStorage => {
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
const metricData = await Promise.all(this._metricStorageRegistry.getStorages().map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.resource,
this._instrumentationLibrary,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
}));
return result.filter(isNotNullish);

return {
instrumentationLibrary: this._instrumentationLibrary,
metrics: metricData.filter(isNotNullish),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/

import { HrTime } from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand All @@ -43,8 +41,6 @@ export class DropAggregator implements Aggregator<undefined> {
}

toMetricData(
_resource: Resource,
_instrumentationLibrary: InstrumentationLibrary,
_instrumentDescriptor: InstrumentDescriptor,
_accumulationByAttributes: AccumulationRecord<undefined>[],
_startTime: HrTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import {
Histogram,
} from './types';
import { HistogramMetricData, PointDataType } from '../export/MetricData';
import { Resource } from '@opentelemetry/resources';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { HrTime } from '@opentelemetry/api';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand Down Expand Up @@ -138,15 +136,11 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
}

toMetricData(
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
metricDescriptor: InstrumentDescriptor,
accumulationByAttributes: AccumulationRecord<HistogramAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<HistogramMetricData> {
return {
resource,
instrumentationLibrary,
instrumentDescriptor: metricDescriptor,
pointDataType: PointDataType.HISTOGRAM,
pointData: accumulationByAttributes.map(([attributes, accumulation]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

import { LastValue, AggregatorKind, Aggregator, Accumulation, AccumulationRecord } from './types';
import { HrTime } from '@opentelemetry/api';
import { hrTime, hrTimeToMicroseconds, InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { hrTime, hrTimeToMicroseconds } from '@opentelemetry/core';
import { PointDataType, SingularMetricData } from '../export/MetricData';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand Down Expand Up @@ -67,15 +66,11 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
}

toMetricData(
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
instrumentDescriptor: InstrumentDescriptor,
accumulationByAttributes: AccumulationRecord<LastValueAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<SingularMetricData> {
return {
resource,
instrumentationLibrary,
instrumentDescriptor,
pointDataType: PointDataType.SINGULAR,
pointData: accumulationByAttributes.map(([attributes, accumulation]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import { Sum, AggregatorKind, Aggregator, Accumulation, AccumulationRecord } from './types';
import { HrTime } from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { PointDataType, SingularMetricData } from '../export/MetricData';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand Down Expand Up @@ -57,15 +55,11 @@ export class SumAggregator implements Aggregator<SumAccumulation> {
}

toMetricData(
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
instrumentDescriptor: InstrumentDescriptor,
accumulationByAttributes: AccumulationRecord<SumAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<SingularMetricData> {
return {
resource,
instrumentationLibrary,
instrumentDescriptor,
pointDataType: PointDataType.SINGULAR,
pointData: accumulationByAttributes.map(([attributes, accumulation]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import { HrTime } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand Down Expand Up @@ -116,9 +114,7 @@ export interface Aggregator<T> {
* @param endTime the end time of the metric data.
* @return the {@link MetricData} that this {@link Aggregator} will produce.
*/
toMetricData(resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
instrumentDescriptor: InstrumentDescriptor,
toMetricData(instrumentDescriptor: InstrumentDescriptor,
accumulationByAttributes: AccumulationRecord<T>[],
startTime: HrTime,
endTime: HrTime): Maybe<MetricData>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,45 @@ import { Histogram } from '../aggregator/types';
* Basic metric data fields.
*/
export interface BaseMetricData {
/**
* Resource associated with metric telemetry.
*/
readonly resource: Resource;
/**
* InstrumentationLibrary which created the metric instrument.
*/
readonly instrumentationLibrary: InstrumentationLibrary;
/**
* InstrumentDescriptor which describes the metric instrument.
*/
readonly instrumentDescriptor: InstrumentDescriptor;
/**
* PointDataType of the metric instrument.
*/
readonly pointDataType: PointDataType,
readonly pointDataType: PointDataType;
}

/**
* Represents a metric data aggregated by either a LastValueAggregation or
* SumAggregation.
*/
export interface SingularMetricData extends BaseMetricData {
readonly pointDataType: PointDataType.SINGULAR,
readonly pointData: PointData<number>[],
readonly pointDataType: PointDataType.SINGULAR;
readonly pointData: PointData<number>[];
}

/**
* Represents a metric data aggregated by a HistogramAggregation.
*/
export interface HistogramMetricData extends BaseMetricData {
readonly pointDataType: PointDataType.HISTOGRAM,
readonly pointData: PointData<Histogram>[],
readonly pointDataType: PointDataType.HISTOGRAM;
readonly pointData: PointData<Histogram>[];
}

/**
* Represents an aggregated metric data.
*/
export type MetricData = SingularMetricData | HistogramMetricData;

export interface InstrumentationLibraryMetrics {
instrumentationLibrary: InstrumentationLibrary;
metrics: MetricData[];
}

export interface ResourceMetrics {
resource: Resource;
instrumentationLibraryMetrics: InstrumentationLibraryMetrics[];
}

/**
* The aggregated point data type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { AggregationTemporality } from './AggregationTemporality';
import { MetricData } from './MetricData';
import { ResourceMetrics } from './MetricData';
import {
ExportResult,
ExportResultCode,
Expand All @@ -26,7 +26,7 @@ import {

export interface PushMetricExporter {

export(batch: MetricData[], resultCallback: (result: ExportResult) => void): void;
export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void;

forceFlush(): Promise<void>;

Expand All @@ -39,7 +39,7 @@ export interface PushMetricExporter {
export class ConsoleMetricExporter implements PushMetricExporter {
protected _shutdown = true;

export(_batch: MetricData[], resultCallback: (result: ExportResult) => void) {
export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void) {
return resultCallback({
code: ExportResultCode.FAILED,
error: new Error('Method not implemented')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

import { MetricData } from './MetricData';
import { ResourceMetrics } from './MetricData';

/**
* This is a public interface that represent an export state of a MetricReader.
*/
export interface MetricProducer {
collect(): Promise<MetricData[]>;
collect(): Promise<ResourceMetrics>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricProducer } from './MetricProducer';
import { MetricData } from './MetricData';
import { callWithTimeout } from '../utils';
import { ResourceMetrics } from './MetricData';
import { callWithTimeout, Maybe } from '../utils';


export type ReaderOptions = {
timeoutMillis?: number
Expand Down Expand Up @@ -91,15 +92,15 @@ export abstract class MetricReader {
/**
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: ReaderCollectionOptions): Promise<MetricData[]> {
async collect(options?: ReaderCollectionOptions): Promise<Maybe<ResourceMetrics>> {
if (this._metricProducer === undefined) {
throw new Error('MetricReader is not bound to a MetricProducer');
}

// Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls.
if (this._shutdown) {
api.diag.warn('Collection is not allowed after shutdown');
return [];
return undefined;
}

// No timeout if timeoutMillis is undefined or null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ export class PeriodicExportingMetricReader extends MetricReader {

private async _runOnce(): Promise<void> {
const metrics = await this.collect({});

if (metrics === undefined) {
return;
}

return new Promise((resolve, reject) => {
this._exporter.export(metrics, result => {
if (result.code !== ExportResultCode.SUCCESS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import {
} from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
import { TemporalMetricProcessor } from './TemporalMetricProcessor';
Expand Down Expand Up @@ -72,8 +70,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
async collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
Expand All @@ -87,8 +83,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
resource,
instrumentationLibrary,
this._instrumentDescriptor,
accumulations,
sdkStartTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import { hrTime } from '@opentelemetry/core';
import { AggregationTemporality } from '../export/AggregationTemporality';
import { MetricData } from '../export/MetricData';
import { ResourceMetrics } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { MeterProviderSharedState } from './MeterProviderSharedState';
Expand All @@ -32,12 +32,15 @@ export class MetricCollector implements MetricProducer {
this.aggregatorTemporality = this._metricReader.getPreferredAggregationTemporality();
}

async collect(): Promise<MetricData[]> {
async collect(): Promise<ResourceMetrics> {
const collectionTime = hrTime();
const results = await Promise.all(this._sharedState.meters
.map(meter => meter.collect(this, collectionTime)));
const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meters
.map(meter => meter.collect(this, collectionTime))));

return results.reduce((cumulation, current) => cumulation.concat(current), []);
return {
resource: this._sharedState.resource,
instrumentationLibraryMetrics,
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/

import { HrTime } from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
Expand All @@ -40,8 +38,6 @@ export abstract class MetricStorage {
abstract collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import {
} from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
import { TemporalMetricProcessor } from './TemporalMetricProcessor';
Expand Down Expand Up @@ -66,8 +64,6 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStor
async collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
Expand All @@ -76,8 +72,6 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStor
return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
resource,
instrumentationLibrary,
this._instrumentDescriptor,
accumulations,
sdkStartTime,
Expand Down

0 comments on commit c6dab2a

Please sign in to comment.