Skip to content

Commit

Permalink
histograms: Add timer to reset ASAP after bucket limiting has happened (
Browse files Browse the repository at this point in the history
#1367)

Fixes #1248. See issue description for all the details.

Signed-off-by: beorn7 <beorn@grafana.com>
  • Loading branch information
beorn7 committed Oct 19, 2023
1 parent c3e797e commit cd8cba2
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 16 deletions.
56 changes: 51 additions & 5 deletions prometheus/histogram.go
Expand Up @@ -475,6 +475,9 @@ type HistogramOpts struct {

// now is for testing purposes, by default it's time.Now.
now func() time.Time

// afterFunc is for testing purposes, by default it's time.AfterFunc.
afterFunc func(time.Duration, func()) *time.Timer
}

// HistogramVecOpts bundles the options to create a HistogramVec metric.
Expand Down Expand Up @@ -526,7 +529,9 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
if opts.now == nil {
opts.now = time.Now
}

if opts.afterFunc == nil {
opts.afterFunc = time.AfterFunc
}
h := &histogram{
desc: desc,
upperBounds: opts.Buckets,
Expand All @@ -536,6 +541,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
nativeHistogramMinResetDuration: opts.NativeHistogramMinResetDuration,
lastResetTime: opts.now(),
now: opts.now,
afterFunc: opts.afterFunc,
}
if len(h.upperBounds) == 0 && opts.NativeHistogramBucketFactor <= 1 {
h.upperBounds = DefBuckets
Expand Down Expand Up @@ -716,9 +722,16 @@ type histogram struct {
nativeHistogramMinResetDuration time.Duration
// lastResetTime is protected by mtx. It is also used as created timestamp.
lastResetTime time.Time
// resetScheduled is protected by mtx. It is true if a reset is
// scheduled for a later time (when nativeHistogramMinResetDuration has
// passed).
resetScheduled bool

// now is for testing purposes, by default it's time.Now.
now func() time.Time

// afterFunc is for testing purposes, by default it's time.AfterFunc.
afterFunc func(time.Duration, func()) *time.Timer
}

func (h *histogram) Desc() *Desc {
Expand Down Expand Up @@ -874,21 +887,31 @@ func (h *histogram) limitBuckets(counts *histogramCounts, value float64, bucket
if h.maybeReset(hotCounts, coldCounts, coldIdx, value, bucket) {
return
}
// One of the other strategies will happen. To undo what they will do as
// soon as enough time has passed to satisfy
// h.nativeHistogramMinResetDuration, schedule a reset at the right time
// if we haven't done so already.
if h.nativeHistogramMinResetDuration > 0 && !h.resetScheduled {
h.resetScheduled = true
h.afterFunc(h.nativeHistogramMinResetDuration-h.now().Sub(h.lastResetTime), h.reset)
}

if h.maybeWidenZeroBucket(hotCounts, coldCounts) {
return
}
h.doubleBucketWidth(hotCounts, coldCounts)
}

// maybeReset resets the whole histogram if at least h.nativeHistogramMinResetDuration
// has been passed. It returns true if the histogram has been reset. The caller
// must have locked h.mtx.
// maybeReset resets the whole histogram if at least
// h.nativeHistogramMinResetDuration has been passed. It returns true if the
// histogram has been reset. The caller must have locked h.mtx.
func (h *histogram) maybeReset(
hot, cold *histogramCounts, coldIdx uint64, value float64, bucket int,
) bool {
// We are using the possibly mocked h.now() rather than
// time.Since(h.lastResetTime) to enable testing.
if h.nativeHistogramMinResetDuration == 0 ||
if h.nativeHistogramMinResetDuration == 0 || // No reset configured.
h.resetScheduled || // Do not interefere if a reset is already scheduled.
h.now().Sub(h.lastResetTime) < h.nativeHistogramMinResetDuration {
return false
}
Expand All @@ -906,6 +929,29 @@ func (h *histogram) maybeReset(
return true
}

// reset resets the whole histogram. It locks h.mtx itself, i.e. it has to be
// called without having locked h.mtx.
func (h *histogram) reset() {
h.mtx.Lock()
defer h.mtx.Unlock()

n := atomic.LoadUint64(&h.countAndHotIdx)
hotIdx := n >> 63
coldIdx := (^n) >> 63
hot := h.counts[hotIdx]
cold := h.counts[coldIdx]
// Completely reset coldCounts.
h.resetCounts(cold)
// Make coldCounts the new hot counts while resetting countAndHotIdx.
n = atomic.SwapUint64(&h.countAndHotIdx, coldIdx<<63)
count := n & ((1 << 63) - 1)
waitForCooldown(count, hot)
// Finally, reset the formerly hot counts, too.
h.resetCounts(hot)
h.lastResetTime = h.now()
h.resetScheduled = false
}

// maybeWidenZeroBucket widens the zero bucket until it includes the existing
// buckets closest to the zero bucket (which could be two, if an equidistant
// negative and a positive bucket exists, but usually it's only one bucket to be
Expand Down
36 changes: 25 additions & 11 deletions prometheus/histogram_test.go
Expand Up @@ -925,16 +925,16 @@ func TestNativeHistogram(t *testing.T) {
maxBuckets: 4,
minResetDuration: 9 * time.Minute,
want: &dto.Histogram{
SampleCount: proto.Uint64(2),
SampleSum: proto.Float64(7),
SampleCount: proto.Uint64(3),
SampleSum: proto.Float64(12.1),
Schema: proto.Int32(2),
ZeroThreshold: proto.Float64(2.938735877055719e-39),
ZeroCount: proto.Uint64(0),
PositiveSpan: []*dto.BucketSpan{
{Offset: proto.Int32(7), Length: proto.Uint32(2)},
{Offset: proto.Int32(7), Length: proto.Uint32(4)},
},
PositiveDelta: []int64{1, 0},
CreatedTimestamp: timestamppb.New(now.Add(10 * time.Minute)), // We expect reset to happen after 9 minutes.
PositiveDelta: []int64{1, 0, -1, 1},
CreatedTimestamp: timestamppb.New(now.Add(9 * time.Minute)), // We expect reset to happen after 8 minutes.
},
},
{
Expand All @@ -945,23 +945,27 @@ func TestNativeHistogram(t *testing.T) {
maxZeroThreshold: 1.2,
minResetDuration: 9 * time.Minute,
want: &dto.Histogram{
SampleCount: proto.Uint64(2),
SampleSum: proto.Float64(7),
SampleCount: proto.Uint64(3),
SampleSum: proto.Float64(12.1),
Schema: proto.Int32(2),
ZeroThreshold: proto.Float64(2.938735877055719e-39),
ZeroCount: proto.Uint64(0),
PositiveSpan: []*dto.BucketSpan{
{Offset: proto.Int32(7), Length: proto.Uint32(2)},
{Offset: proto.Int32(7), Length: proto.Uint32(4)},
},
PositiveDelta: []int64{1, 0},
CreatedTimestamp: timestamppb.New(now.Add(10 * time.Minute)), // We expect reset to happen after 9 minutes.
PositiveDelta: []int64{1, 0, -1, 1},
CreatedTimestamp: timestamppb.New(now.Add(9 * time.Minute)), // We expect reset to happen after 8 minutes.
},
},
}

for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
ts := now
var (
ts = now
funcToCall func()
whenToCall time.Duration
)

his := NewHistogram(HistogramOpts{
Name: "name",
Expand All @@ -972,12 +976,22 @@ func TestNativeHistogram(t *testing.T) {
NativeHistogramMinResetDuration: s.minResetDuration,
NativeHistogramMaxZeroThreshold: s.maxZeroThreshold,
now: func() time.Time { return ts },
afterFunc: func(d time.Duration, f func()) *time.Timer {
funcToCall = f
whenToCall = d
return nil
},
})

ts = ts.Add(time.Minute)
for _, o := range s.observations {
his.Observe(o)
ts = ts.Add(time.Minute)
whenToCall -= time.Minute
if funcToCall != nil && whenToCall <= 0 {
funcToCall()
funcToCall = nil
}
}
m := &dto.Metric{}
if err := his.Write(m); err != nil {
Expand Down

0 comments on commit cd8cba2

Please sign in to comment.