Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the boundary inclusivity of exponential histogram #254

Merged
merged 2 commits into from Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 42 additions & 30 deletions lightstep/sdk/metric/aggregator/histogram/structure/README.md
Expand Up @@ -36,19 +36,34 @@ bucketsNeeded(minValue, maxValue, scale) <= maxSize`. This
implementation maintains the best scale by rescaling as needed to stay
within the maximum size.

### UpdateByIncr interface
## Layout

The OpenTelemetry metrics SDK `Aggregator` type supports an `Update()`
interface which implies updating the histogram by a count of 1. This
implementation also supports `UpdateByIncr()`, which makes it possible
to support counting multiple observations in a single API call. This
extension is useful in applying `Histogram` aggregation to _sampled_
metric events (e.g. in the [OpenTelemetry statsd
receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/statsdreceiver)).
### Mapping function

Another use for `UpdateByIncr` is in a Span-to-metrics pipeline
following [probability sampling in OpenTelemetry tracing
(WIP)](https://github.com/open-telemetry/opentelemetry-specification/pull/2047).
The `mapping` sub-package contains the equations specified in the [data
model for Exponential Histogram data
points](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#exponentialhistogram).

There are two mapping functions used, depending on the sign of the
scale. Negative and zero scales use the `mapping/exponent` mapping
function, which computes the bucket index directly from the bits of
the `float64` exponent. This mapping function is used with scale `-10
<= scale <= 0`. Scales smaller than -10 map the entire normal
`float64` number range into a single bucket, thus are not considered
useful.

The `mapping/logarithm` mapping function uses `math.Log(value)` times
the scaling factor `math.Ldexp(math.Log2E, scale)`. This mapping
function is used with `0 < scale <= 20`. The maximum scale is
selected because at scale 21, simply, it becomes difficult to test
correctness--at this point `math.MaxFloat64` maps to index
`math.MaxInt32` and the `math/big` logic used in testing breaks down.

### Data structure

The `structure` sub-package contains a Histogram aggregator for use by
the OpenTelemetry-Go Metrics SDK as well as OpenTelemetry Collector
receivers, processors, and exporters.

## Implementation

Expand Down Expand Up @@ -83,7 +98,7 @@ histogram to maintain the ideal scale without shifting values inside
the array.

The `indexStart` and `indexEnd` fields store the current minimum and
maximum bucket number. The initial conditition is `indexBase ==
maximum bucket number. The initial condition is `indexBase ==
indexStart == indexEnd`, representing a single bucket.

Following the first observation, new observations may fall into a
Expand All @@ -92,31 +107,14 @@ adjusting either `indexEnd` or `indexStart` as long as the constraint
`indexEnd-indexStart < size` remains true.

Bucket numbers in the range `[indexBase, indexEnd]` are stored in the
interval `[0, indexEnd-indexBase+1]` of the backing array. Buckets in
interval `[0, indexEnd-indexBase]` of the backing array. Buckets in
the range `[indexStart, indexBase-1]` are stored in the interval
`[size+indexStart-indexBase, size-1]` of the backing array.

Considering the `aggregation.Buckets` interface, `Offset()` returns
`indexStart`, `Len()` returns `indexEnd-indexStart+1`, and `At()`
locates the correct bucket in the circular array.

### Mapping function

There are two mapping functions used, depending on the sign of the
scale. Negative and zero scales use the `mapping/exponent` mapping
function, which computes the bucket index directly from the bits of
the `float64` exponent. This mapping function is used with scale `-10
<= scale <= 0`. Scales smaller than -10 map the entire normal
`float64` number range into a single bucket, thus are not considered
useful.

The `mapping/logarithm` mapping function uses `math.Log(value)` times
the scaling factor `math.Ldexp(math.Log2E, scale)`. This mapping
function is used with `0 < scale <= 20`. The maximum scale is
selected because at scale 21, simply, it becomes difficult to test
correctness--at this point `math.MaxFloat64` maps to index
`math.MaxInt32` and the `math/big` logic used in testing breaks down.

### Determining change of scale

The algorithm used to determine the (best) change of scale when a new
Expand Down Expand Up @@ -192,6 +190,20 @@ greater maximum scale. Supporting numbers smaller than 0x1p-1022
would mean changing the valid scale interval to [-11,19] compared with
[-10,20].

### UpdateByIncr interface

The OpenTelemetry metrics SDK `Aggregator` type supports an `Update()`
interface which implies updating the histogram by a count of 1. This
implementation also supports `UpdateByIncr()`, which makes it possible
to support counting multiple observations in a single API call. This
extension is useful in applying `Histogram` aggregation to _sampled_
metric events (e.g. in the [OpenTelemetry statsd
receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/statsdreceiver)).

Another use for `UpdateByIncr` is in a Span-to-metrics pipeline
following [probability sampling in OpenTelemetry tracing
(WIP)](https://github.com/open-telemetry/opentelemetry-specification/pull/2047).

## Acknowledgements

This implementation is based on work by [Yuke
Expand Down
10 changes: 6 additions & 4 deletions lightstep/sdk/metric/aggregator/histogram/structure/config.go
Expand Up @@ -21,15 +21,16 @@ import "fmt"
// OpenTelemetry--yields a maximum relative error of less than 5% for
// data with contrast 10**5 (e.g., latencies in the range 1ms to 100s).
// See the derivation here:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exponential-histogram-aggregation
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exponential-bucket-histogram-aggregation
const DefaultMaxSize int32 = 160

// MinSize is the smallest reasonable configuration, which is small
// enough to contain the entire normal floating point range at
// MinScale.
const MinSize = 2

// MaximumMaxSize is an arbitrary limit.
// MaximumMaxSize is an arbitrary limit meant to limit accidental use
// of giant histograms.
const MaximumMaxSize = 16384

// Config contains configuration for exponential histogram creation.
Expand All @@ -43,7 +44,8 @@ type Option interface {
apply(Config) Config
}

// WithMaxSize sets
// WithMaxSize sets the maximum size of each range (positive and/or
// negative) in the histogram.
func WithMaxSize(size int32) Option {
return maxSize(size)
}
Expand Down Expand Up @@ -73,7 +75,7 @@ func (c Config) Valid() bool {
return err == nil
}

// Validate returns the nearest valid Config object to th einput and a
// Validate returns the nearest valid Config object to the input and a
// boolean indicating whether the the input was a valid
// configurations.
func (c Config) Validate() (Config, error) {
Expand Down
81 changes: 63 additions & 18 deletions lightstep/sdk/metric/aggregator/histogram/structure/exponential.go
Expand Up @@ -16,7 +16,6 @@ package structure // import "github.com/lightstep/otel-launcher-go/lightstep/sdk

import (
"fmt"
"math/bits"

"go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/mapping"
"go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/mapping/exponent"
Expand All @@ -28,6 +27,10 @@ type (
// buckets. It is configured with a maximum scale factor
// which determines resolution. Scale is automatically
// adjusted to accommodate the range of input data.
//
// Note that the generic type `N` determines the type of the
// Sum, Min, and Max fields. Bucket boundaries are handled in
// floating point regardless of the type of N.
Histogram[N ValueType] struct {
// maxSize is the maximum capacity of the positive and
// negative ranges. it is set by Init(), preserved by
Expand Down Expand Up @@ -63,9 +66,22 @@ type (
// backing is a slice of nil, []uint8, []uint16, []uint32, or []uint64
backing bucketsBacking

// The term "index" refers to the number of the
// histogram bucket used to determine its boundaries.
// The lower-boundary of a bucket is determined by
// formula base**index and the upper-boundary of a
// bucket is base**(index+1). Index values are signed
// to account for values less than or equal to 1.
//
// Note that the width of this field is determined by
// the field being stated as int32 in the OTLP
// protocol. The meaning of this field can be
// extended to wider types, however this it would
// would be an extremely high-resolution histogram.

// indexBase is index of the 0th position in the
// backing array, i.e., backing[0] is the count associated with
// indexBase which is in [indexStart, indexEnd]
// backing array, i.e., backing[0] is the count
// in the bucket with index `indexBase`.
indexBase int32

// indexStart is the smallest index value represented
Expand Down Expand Up @@ -97,16 +113,23 @@ type (
bucketsBacking interface {
// size returns the physical size of the backing
// array, which is >= buckets.Len() the number allocated.
//
// Note this is logically an unsigned quantity,
// however it creates fewer type conversions in the
// code with this as int32, because: (a) this is not
// allowed to grow to outside the range of a signed
// int32, and (b) this is frequently involved in
// arithmetic with signed index values.
size() int32
// growTo grows a backing array and copies old enties
// growTo grows a backing array and copies old entries
// into their correct new positions.
growTo(newSize, oldPositiveLimit, newPositiveLimit int32)
// reverse reverse the items in a backing array in the
// range [from, limit).
reverse(from, limit int32)
// moveCount empies the count from a bucket, for
// emptyBucket empties the count from a bucket, for
// moving into another.
moveCount(src int32) uint64
emptyBucket(src int32) uint64
// tryIncrement increments a bucket by `incr`, returns
// false if the result would overflow the current
// backing width.
Expand All @@ -125,10 +148,10 @@ type (
high int32
}

// Int64 is an integer histogram.
// Int64 is an integer-valued histogram.
Int64 = Histogram[int64]

// Float64 is an integer histogram.
// Float64 is a float64-valued histogram.
Float64 = Histogram[float64]
)

Expand All @@ -138,8 +161,8 @@ func (h *Histogram[N]) Init(cfg Config) {

h.maxSize = cfg.maxSize

mapping, _ := newMapping(logarithm.MaxScale)
h.mapping = mapping
m, _ := newMapping(logarithm.MaxScale)
h.mapping = m
}

// Sum implements aggregation.Histogram.
Expand Down Expand Up @@ -257,8 +280,7 @@ func (h *Histogram[N]) CopyInto(dest *Histogram[N]) {
dest.MergeFrom(h)
}

// UpdateByIncr supports updating a histogram with a non-negative
// increment.
// Update supports updating a histogram with a single count.
func (h *Histogram[N]) Update(number N) {
h.UpdateByIncr(number, 1)
}
Expand Down Expand Up @@ -305,6 +327,9 @@ func (h *Histogram[N]) UpdateByIncr(number N, incr uint64) {

// downscale subtracts `change` from the current mapping scale.
func (h *Histogram[N]) downscale(change int32) {
if change == 0 {
return
}
if change < 0 {
panic(fmt.Sprint("impossible change of scale", change))
}
Expand Down Expand Up @@ -349,7 +374,7 @@ func (h *Histogram[N]) update(b *Buckets, value float64, incr uint64) {
}
}

// increment determines if the index lies inside the current range
// incrementIndexBy determines if the index lies inside the current range
// [indexStart, indexEnd] and, if not, returns the minimum size (up to
// maxSize) will satisfy the new value.
func (h *Histogram[N]) incrementIndexBy(b *Buckets, index int32, incr uint64) (highLow, bool) {
Expand Down Expand Up @@ -401,22 +426,42 @@ func (h *Histogram[N]) incrementIndexBy(b *Buckets, index int32, incr uint64) (h
return highLow{}, true
}

// powTwoRoundedUp computes the next largest power-of-two, which
// ensures power-of-two slices are allocated.
func powTwoRoundedUp(v int32) int32 {
// The following expression computes the least power-of-two
// that is >= v. There are a number of tricky ways to
// do this, see https://stackoverflow.com/questions/466204/rounding-up-to-next-power-of-2
//
// One equivalent expression:
//
// v = int32(1) << (32 - bits.LeadingZeros32(uint32(v-1)))
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
return v
}

// grow resizes the backing array by doubling in size up to maxSize.
// this extends the array with a bunch of zeros and copies the
// existing counts to the same position.
func (h *Histogram[N]) grow(b *Buckets, needed int32) {
size := b.backing.size()
bias := b.indexBase - b.indexStart
oldPositiveLimit := size - bias
newSize := int32(1) << (32 - bits.LeadingZeros32(uint32(needed)))
newSize := powTwoRoundedUp(needed)
if newSize > h.maxSize {
newSize = h.maxSize
}
newPositiveLimit := newSize - bias
b.backing.growTo(newSize, oldPositiveLimit, newPositiveLimit)
}

// downscale first rotates, then collapses 2**`by`-to-1 buckets
// downscale first rotates, then collapses 2**`by`-to-1 buckets.
func (b *Buckets) downscale(by int32) {
b.rotate()

Expand Down Expand Up @@ -467,7 +512,7 @@ func (b *Buckets) relocateBucket(dest, src int32) {
return
}

b.incrementBucket(dest, b.backing.moveCount(src))
b.incrementBucket(dest, b.backing.emptyBucket(src))
}

// incrementBucket increments the backing array index by `incr`.
Expand Down Expand Up @@ -578,7 +623,7 @@ func (h *highLow) with(o highLow) highLow {
}
}

// empty indicates whether there are any values in a highLow
// empty indicates whether there are any values in a highLow.
func (h *highLow) empty() bool {
return h.low > h.high
}
Expand Down Expand Up @@ -630,7 +675,7 @@ func (b *bucketsVarwidth[N]) reverse(from, limit int32) {
}
}

func (b *bucketsVarwidth[N]) moveCount(src int32) uint64 {
func (b *bucketsVarwidth[N]) emptyBucket(src int32) uint64 {
tmp := b.counts[src]
b.counts[src] = 0
return uint64(tmp)
Expand Down