Skip to content

Commit

Permalink
feat(sdk-metrics-base): bootstrap metrics exemplars (#2641)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Dyla <dyladan@users.noreply.github.com>
  • Loading branch information
srikanthccv and dyladan committed Dec 28, 2021
1 parent ddede49 commit bef840b
Show file tree
Hide file tree
Showing 11 changed files with 582 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import { Context, HrTime } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics-wip';
import { FixedSizeExemplarReservoirBase } from './ExemplarReservoir';


/**
* AlignedHistogramBucketExemplarReservoir takes the same boundaries
* configuration of a Histogram. This alogorithm keeps the last seen measurement
* that falls within a histogram bucket.
* https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar-defaults
*/
export class AlignedHistogramBucketExemplarReservoir extends FixedSizeExemplarReservoirBase {
private _boundaries: number[];
constructor(boundaries: number[]) {
super(boundaries.length+1);
this._boundaries = boundaries;
}

private _findBucketIndex(value: number, _timestamp: HrTime, _attributes: Attributes, _ctx: Context) {
for(let i = 0; i < this._boundaries.length; i++) {
if (value <= this._boundaries[i]) {
return i;
}
}
return this._boundaries.length;
}

offer(value: number, timestamp: HrTime, attributes: Attributes, ctx: Context): void {
const index = this._findBucketIndex(value, timestamp, attributes, ctx);
this._reservoirStorage[index].offer(value, timestamp, attributes, ctx);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Attributes } from '@opentelemetry/api-metrics-wip'
import { Context, HrTime } from '@opentelemetry/api'
import { ExemplarFilter } from './ExemplarFilter';


export class AlwaysSampleExemplarFilter implements ExemplarFilter {

shouldSample(
_value: number,
_timestamp: HrTime,
_attributes: Attributes,
_ctx: Context
): boolean {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { HrTime } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics-wip';

/**
* A representation of an exemplar, which is a sample input measurement.
* Exemplars also hold information about the environment when the measurement
* was recorded, for example the span and trace ID of the active span when the
* exemplar was recorded.
*/
export type Exemplar = {
// The set of key/value pairs that were filtered out by the aggregator, but
// recorded alongside the original measurement. Only key/value pairs that were
// filtered out by the aggregator should be included
filteredAttributes: Attributes;

// The value of the measurement that was recorded.
value: number;

// timestamp is the exact time when this exemplar was recorded
timestamp: HrTime;

// (Optional) Span ID of the exemplar trace.
// span_id may be missing if the measurement is not recorded inside a trace
// or if the trace is not sampled.
spanId?: string;

// (Optional) Trace ID of the exemplar trace.
// trace_id may be missing if the measurement is not recorded inside a trace
// or if the trace is not sampled.
traceId?: string;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Attributes } from '@opentelemetry/api-metrics-wip'
import { Context, HrTime } from '@opentelemetry/api'

/**
* This interface represents a ExemplarFilter. Exemplar filters are
* used to filter measurements before attempting to store them in a
* reservoir.
*/
export interface ExemplarFilter {
/**
* Returns whether or not a reservoir should attempt to filter a measurement.
*
* @param value The value of the measurement
* @param timestamp A timestamp that best represents when the measurement was taken
* @param attributes The complete set of Attributes of the measurement
* @param context The Context of the measurement
*/
shouldSample(
value: number,
timestamp: HrTime,
attributes: Attributes,
ctx: Context
): boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Attributes } from '@opentelemetry/api-metrics-wip'
import { Context, HrTime, isSpanContextValid, trace } from '@opentelemetry/api'
import { Exemplar } from './Exemplar'


/**
* An interface for an exemplar reservoir of samples.
*/
export interface ExemplarReservoir {

/** Offers a measurement to be sampled. */
offer(
value: number,
timestamp: HrTime,
attributes: Attributes,
ctx: Context
): void;
/**
* Returns accumulated Exemplars and also resets the reservoir
* for the next sampling period
*
* @param pointAttributes The attributes associated with metric point.
*
* @returns a list of {@link Exemplar}s. Retuned exemplars contain the attributes that were filtered out by the
* aggregator, but recorded alongside the original measurement.
*/
collect(pointAttributes: Attributes): Exemplar[];
}


class ExemplarBucket {
private value: number = 0;
private attributes: Attributes = {};
private timestamp: HrTime = [0, 0];
private spanId?: string;
private traceId?: string;
private _offered: boolean = false;

offer(value: number, timestamp: HrTime, attributes: Attributes, ctx: Context) {
this.value = value;
this.timestamp = timestamp;
this.attributes = attributes;
const spanContext = trace.getSpanContext(ctx);
if (spanContext && isSpanContextValid(spanContext)) {
this.spanId = spanContext.spanId;
this.traceId = spanContext.traceId;
}
this._offered = true;
}

collect(pointAttributes: Attributes): Exemplar | null {
if (!this._offered) return null;
const currentAttributes = this.attributes;
// filter attributes
Object.keys(pointAttributes).forEach(key => {
if (pointAttributes[key] === currentAttributes[key]) {
delete currentAttributes[key];
}
});
const retVal: Exemplar = {
filteredAttributes: currentAttributes,
value: this.value,
timestamp: this.timestamp,
spanId: this.spanId,
traceId: this.traceId
};
this.attributes = {};
this.value = 0;
this.timestamp = [0, 0];
this.spanId = undefined;
this.traceId = undefined;
this._offered = false;
return retVal;
}
}


export abstract class FixedSizeExemplarReservoirBase implements ExemplarReservoir {
protected _reservoirStorage: ExemplarBucket[];
protected _size: number;

constructor(size: number) {
this._size = size;
this._reservoirStorage = new Array<ExemplarBucket>(size);
for(let i = 0; i < this._size; i++) {
this._reservoirStorage[i] = new ExemplarBucket();
}
}

abstract offer(value: number, timestamp: HrTime, attributes: Attributes, ctx: Context): void;

maxSize(): number {
return this._size;
}

/**
* Resets the reservoir
*/
protected reset(): void {}

collect(pointAttributes: Attributes): Exemplar[] {
const exemplars: Exemplar[] = [];
this._reservoirStorage.forEach(storageItem => {
const res = storageItem.collect(pointAttributes);
if (res !== null) {
exemplars.push(res);
}
});
this.reset();
return exemplars;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Attributes } from '@opentelemetry/api-metrics-wip'
import { Context, HrTime } from '@opentelemetry/api'
import { ExemplarFilter } from './ExemplarFilter';

export class NeverSampleExemplarFilter implements ExemplarFilter {

shouldSample(
_value: number,
_timestamp: HrTime,
_attributes: Attributes,
_ctx: Context
): boolean {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Context, HrTime } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics-wip';
import { FixedSizeExemplarReservoirBase } from './ExemplarReservoir';

/**
* Fixed size reservoir that uses equivalent of naive reservoir sampling
* algorithm to accept measurements.
*
*/
export class SimpleFixedSizeExemplarReservoir extends FixedSizeExemplarReservoirBase {
private _numMeasurementsSeen: number;
constructor(size: number) {
super(size);
this._numMeasurementsSeen = 0;
}

private getRandomInt(min: number, max: number) { //[min, max)
return Math.floor(Math.random() * (max - min) + min);
}

private _findBucketIndex(_value: number, _timestamp: HrTime, _attributes: Attributes, _ctx: Context) {
if (this._numMeasurementsSeen < this._size ) return this._numMeasurementsSeen++;
const index = this.getRandomInt(0, ++this._numMeasurementsSeen);
return index < this._size ? index: -1;
}

offer(value: number, timestamp: HrTime, attributes: Attributes, ctx: Context): void {
const index = this._findBucketIndex(value, timestamp, attributes, ctx);
if (index !== -1) {
this._reservoirStorage[index].offer(value, timestamp, attributes, ctx)
}
}

override reset() {
this._numMeasurementsSeen = 0;
}
}

0 comments on commit bef840b

Please sign in to comment.