Skip to content

Commit 288821c

Browse files
paivagustavojmacd
andauthoredMar 11, 2020
Make histogram aggregator checkpoint consistent (#438)
* change the histogram aggregator to have a consistent but blocking Checkpoint() * docs * wrapping docs * remove currentIdx from the 8bit alignment check * stress test * add export and move lockfreewrite algorithm to an external struct. * move state locker to another package. * add todos * minimal tests * renaming and docs * change to context.Background() * add link to algorithm and grammars Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
1 parent ae9033e commit 288821c

File tree

5 files changed

+338
-57
lines changed

5 files changed

+338
-57
lines changed
 

‎sdk/internal/state_locker.go

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2020, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package internal
16+
17+
import (
18+
"runtime"
19+
"sync"
20+
"sync/atomic"
21+
)
22+
23+
// StateLocker implements a two state lock algorithm that enabled lock free operations inside a state
24+
// and a global lock for switching between states. At every time, only one state is active and one cold state.
25+
// States are represented by int numbers 0 and 1.
26+
//
27+
// This was inspired by the algorithm used on the prometheus client library that can be found at:
28+
// https://github.com/prometheus/client_golang/blob/e7776d2c54305c1b62fdb113b5a7e9b944c5c27e/prometheus/histogram.go#L227
29+
//
30+
// To execute operations within the same state, call `Start()` before the operation and call `End(idx)`
31+
// to end this operation. The `idx` argument of `End()` is the index of the active state when the operation
32+
// started and it is returned by the `Start()` method. It is recommended to defer the call to `End(idx)`.
33+
//
34+
// One can change the active state by calling `SwapActiveState(fn)`. `fn` is a function that will be executed *before*
35+
// switching the active state. Operations such as preparing the new state shall be called by this function. This will
36+
// wait in-flight operations to end.
37+
//
38+
// Example workflow:
39+
// 1. State 0 is active.
40+
// 1.1 Operations to the active state can happen with `Start()` and `End(idx)` methods.
41+
// 2. Call to `SwitchState(fn)`
42+
// 2.1 run `fn` function to prepare the new state
43+
// 2.2 make state 1 active
44+
// 2.3 wait in-flight operations of the state 0 to end.
45+
// 3. State 1 is now active and every new operation are executed in it.
46+
//
47+
// `SwitchState(fn)` are synchronized with a mutex that can be access with the `Lock()` and `Unlock()` methods.
48+
// Access to the cold state must also be synchronized to ensure the cold state is not in the middle of state switch
49+
// since that could represent an invalid state.
50+
//
51+
type StateLocker struct {
52+
countsAndActiveIdx uint64
53+
finishedOperations [2]uint64
54+
55+
sync.Mutex
56+
}
57+
58+
// Start an operation that will happen on a state. The current active state is returned.
59+
// A call to `End(idx int)` must happens for every `Start()` call.
60+
func (c *StateLocker) Start() int {
61+
n := atomic.AddUint64(&c.countsAndActiveIdx, 1)
62+
return int(n >> 63)
63+
}
64+
65+
// End an operation that happened to the idx state.
66+
func (c *StateLocker) End(idx int) {
67+
atomic.AddUint64(&c.finishedOperations[idx], 1)
68+
}
69+
70+
// ColdIdx returns the index of the cold state.
71+
func (c *StateLocker) ColdIdx() int {
72+
return int((^c.countsAndActiveIdx) >> 63)
73+
}
74+
75+
// SwapActiveState swaps the cold and active states.
76+
//
77+
// This will wait all for in-flight operations that are happening to the current
78+
// active state to end, this ensure that all access to this state will be consistent.
79+
//
80+
// This is synchronized by a mutex.
81+
func (c *StateLocker) SwapActiveState(beforeFn func()) {
82+
c.Lock()
83+
defer c.Unlock()
84+
85+
if beforeFn != nil {
86+
// prepare the state change
87+
beforeFn()
88+
}
89+
90+
// Adding 1<<63 switches the active index (from 0 to 1 or from 1 to 0)
91+
// without touching the count bits.
92+
n := atomic.AddUint64(&c.countsAndActiveIdx, 1<<63)
93+
94+
// count represents how many operations have started *before* the state change.
95+
count := n & ((1 << 63) - 1)
96+
97+
activeFinishedOperations := &c.finishedOperations[n>>63]
98+
// coldFinishedOperations are the number of operations that have *ended* on the previous state.
99+
coldFinishedOperations := &c.finishedOperations[(^n)>>63]
100+
101+
// Await all cold writers to finish writing, when coldFinishedOperations == count, all in-flight operations
102+
// have finished and we can cleanly end the state change.
103+
for count != atomic.LoadUint64(coldFinishedOperations) {
104+
runtime.Gosched() // Let observations get work done.
105+
}
106+
107+
// Make sure that the new state keeps the same count of *ended* operations.
108+
atomic.AddUint64(activeFinishedOperations, count)
109+
atomic.StoreUint64(coldFinishedOperations, 0)
110+
}

‎sdk/internal/state_locker_test.go

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package internal
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestInflightOperationMustEndBeforeSwap(t *testing.T) {
11+
var swapped bool
12+
ch := make(chan struct{})
13+
14+
l := StateLocker{}
15+
op1 := l.Start()
16+
17+
go func() {
18+
l.SwapActiveState(func() {})
19+
swapped = true
20+
ch <- struct{}{}
21+
}()
22+
23+
require.False(t, swapped, "Swap should wait the end of the in-flight operation.")
24+
25+
l.End(op1)
26+
27+
select {
28+
case <-ch:
29+
require.True(t, swapped, "Swap should've been completed. ")
30+
case <-time.After(50 * time.Millisecond):
31+
t.Fatal("Swap was not concluded after 50 milliseconds.")
32+
}
33+
}
34+
35+
func TestEnsureIndexIsConsistent(t *testing.T) {
36+
l := StateLocker{}
37+
op1 := l.Start()
38+
l.End(op1)
39+
40+
l.SwapActiveState(func() {})
41+
42+
op2 := l.Start()
43+
l.End(op2)
44+
45+
op3 := l.Start()
46+
l.End(op3)
47+
48+
l.SwapActiveState(func() {})
49+
50+
op4 := l.Start()
51+
l.End(op4)
52+
53+
require.Equal(t, op1, op4, "two operations separated by two swaps should have the same index.")
54+
require.Equal(t, op2, op3, "two operations with no swap in between should have the same index.")
55+
56+
require.Equal(t, 0, op1, "first index should be 0")
57+
require.Equal(t, 1, op2, "second index should be 1")
58+
}
59+
60+
func TestTwoSwapsCanHappenWithoutOperationsInBetween(t *testing.T) {
61+
l := StateLocker{}
62+
63+
require.Equal(t, 1, l.ColdIdx(), "first cold index should be 1")
64+
l.SwapActiveState(func() {})
65+
require.Equal(t, 0, l.ColdIdx(), "second cold index should be 0")
66+
l.SwapActiveState(func() {})
67+
require.Equal(t, 1, l.ColdIdx(), "third cold index should be 1")
68+
}
69+
70+
func BenchmarkStateLocker_StartEnd(b *testing.B) {
71+
l := StateLocker{}
72+
73+
b.ReportAllocs()
74+
75+
for i := 0; i < b.N; i++ {
76+
l.End(l.Start())
77+
}
78+
}
79+
80+
func BenchmarkStateLocker_SwapActiveState(b *testing.B) {
81+
82+
b.ReportAllocs()
83+
84+
for i := 0; i < b.N; i++ {
85+
l := StateLocker{}
86+
l.SwapActiveState(func() {})
87+
}
88+
}

‎sdk/metric/aggregator/histogram/histogram.go

+70-41
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,25 @@ import (
2121
"go.opentelemetry.io/otel/api/core"
2222
export "go.opentelemetry.io/otel/sdk/export/metric"
2323
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
24+
"go.opentelemetry.io/otel/sdk/internal"
2425
)
2526

2627
type (
2728
// Aggregator observe events and counts them in pre-determined buckets.
2829
// It also calculates the sum and count of all events.
2930
Aggregator struct {
30-
// state needs to be aligned for 64-bit atomic operations.
31-
current state
32-
// checkpoint needs to be aligned for 64-bit atomic operations.
33-
checkpoint state
31+
// This aggregator uses the StateLocker that enables a lock-free Update()
32+
// in exchange of a blocking and consistent Checkpoint(). Since Checkpoint()
33+
// is called by the sdk itself and it is not part of a hot path,
34+
// the user is not impacted by these blocking calls.
35+
//
36+
// The algorithm keeps two states. At every instance of time there exist one current state,
37+
// in which new updates are aggregated, and one checkpoint state, that represents the state
38+
// since the last Checkpoint(). These states are swapped when a `Checkpoint()` occur.
39+
40+
// states needs to be aligned for 64-bit atomic operations.
41+
states [2]state
42+
lock internal.StateLocker
3443
boundaries []core.Number
3544
kind core.NumberKind
3645
}
@@ -74,16 +83,18 @@ func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator {
7483
agg := Aggregator{
7584
kind: desc.NumberKind(),
7685
boundaries: boundaries,
77-
current: state{
78-
buckets: aggregator.Buckets{
79-
Boundaries: boundaries,
80-
Counts: make([]core.Number, len(boundaries)+1),
86+
states: [2]state{
87+
{
88+
buckets: aggregator.Buckets{
89+
Boundaries: boundaries,
90+
Counts: make([]core.Number, len(boundaries)+1),
91+
},
8192
},
82-
},
83-
checkpoint: state{
84-
buckets: aggregator.Buckets{
85-
Boundaries: boundaries,
86-
Counts: make([]core.Number, len(boundaries)+1),
93+
{
94+
buckets: aggregator.Buckets{
95+
Boundaries: boundaries,
96+
Counts: make([]core.Number, len(boundaries)+1),
97+
},
8798
},
8899
},
89100
}
@@ -92,73 +103,91 @@ func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator {
92103

93104
// Sum returns the sum of all values in the checkpoint.
94105
func (c *Aggregator) Sum() (core.Number, error) {
95-
return c.checkpoint.sum, nil
106+
c.lock.Lock()
107+
defer c.lock.Unlock()
108+
return c.checkpoint().sum, nil
96109
}
97110

98111
// Count returns the number of values in the checkpoint.
99112
func (c *Aggregator) Count() (int64, error) {
100-
return int64(c.checkpoint.count.AsUint64()), nil
113+
c.lock.Lock()
114+
defer c.lock.Unlock()
115+
return int64(c.checkpoint().count), nil
101116
}
102117

103118
// Histogram returns the count of events in pre-determined buckets.
104119
func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
105-
return c.checkpoint.buckets, nil
120+
c.lock.Lock()
121+
defer c.lock.Unlock()
122+
return c.checkpoint().buckets, nil
106123
}
107124

108125
// Checkpoint saves the current state and resets the current state to
109126
// the empty set. Since no locks are taken, there is a chance that
110127
// the independent Sum, Count and Bucket Count are not consistent with each
111128
// other.
112129
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
113-
// N.B. There is no atomic operation that can update all three
114-
// values at once without a memory allocation.
115-
//
116-
// This aggregator is intended to trade this correctness for
117-
// speed.
118-
//
119-
// Therefore, atomically swap fields independently, knowing
120-
// that individually the three parts of this aggregation could
121-
// be spread across multiple collections in rare cases.
122-
123-
c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
124-
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
125-
126-
for i := 0; i < len(c.checkpoint.buckets.Counts); i++ {
127-
c.checkpoint.buckets.Counts[i].SetUint64(c.current.buckets.Counts[i].SwapUint64Atomic(0))
128-
}
130+
c.lock.SwapActiveState(c.resetCheckpoint)
131+
}
132+
133+
// checkpoint returns the checkpoint state by inverting the lower bit of generationAndHotIdx.
134+
func (c *Aggregator) checkpoint() *state {
135+
return &c.states[c.lock.ColdIdx()]
136+
}
137+
138+
func (c *Aggregator) resetCheckpoint() {
139+
checkpoint := c.checkpoint()
140+
141+
checkpoint.count.SetUint64(0)
142+
checkpoint.sum.SetNumber(core.Number(0))
143+
checkpoint.buckets.Counts = make([]core.Number, len(checkpoint.buckets.Counts))
129144
}
130145

131146
// Update adds the recorded measurement to the current data set.
132147
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
133148
kind := desc.NumberKind()
134149

135-
c.current.count.AddUint64Atomic(1)
136-
c.current.sum.AddNumberAtomic(kind, number)
150+
cIdx := c.lock.Start()
151+
defer c.lock.End(cIdx)
152+
153+
current := &c.states[cIdx]
154+
current.count.AddUint64Atomic(1)
155+
current.sum.AddNumberAtomic(kind, number)
137156

138157
for i, boundary := range c.boundaries {
139158
if number.CompareNumber(kind, boundary) < 0 {
140-
c.current.buckets.Counts[i].AddUint64Atomic(1)
159+
current.buckets.Counts[i].AddUint64Atomic(1)
141160
return nil
142161
}
143162
}
144163

145164
// Observed event is bigger than all defined boundaries.
146-
c.current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1)
165+
current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1)
166+
147167
return nil
148168
}
149169

150-
// Merge combines two data sets into one.
170+
// Merge combines two histograms that have the same buckets into a single one.
151171
func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error {
152172
o, _ := oa.(*Aggregator)
153173
if o == nil {
154174
return aggregator.NewInconsistentMergeError(c, oa)
155175
}
156176

157-
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
158-
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
177+
// Lock() synchronize Merge() and Checkpoint() to make sure all operations of
178+
// Merge() is done to the same state.
179+
c.lock.Lock()
180+
defer c.lock.Unlock()
181+
182+
current := c.checkpoint()
183+
// We assume that the aggregator being merged is not being updated nor checkpointed or this could be inconsistent.
184+
ocheckpoint := o.checkpoint()
185+
186+
current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum)
187+
current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count)
159188

160-
for i := 0; i < len(c.current.buckets.Counts); i++ {
161-
c.checkpoint.buckets.Counts[i].AddNumber(core.Uint64NumberKind, o.checkpoint.buckets.Counts[i])
189+
for i := 0; i < len(current.buckets.Counts); i++ {
190+
current.buckets.Counts[i].AddNumber(core.Uint64NumberKind, ocheckpoint.buckets.Counts[i])
162191
}
163192
return nil
164193
}

‎sdk/metric/aggregator/histogram/histogram_test.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package histogram
1616

1717
import (
1818
"context"
19-
"fmt"
2019
"math"
2120
"math/rand"
2221
"os"
@@ -72,12 +71,8 @@ var (
7271
func TestMain(m *testing.M) {
7372
fields := []ottest.FieldOffset{
7473
{
75-
Name: "Aggregator.current",
76-
Offset: unsafe.Offsetof(Aggregator{}.current),
77-
},
78-
{
79-
Name: "Aggregator.checkpoint",
80-
Offset: unsafe.Offsetof(Aggregator{}.checkpoint),
74+
Name: "Aggregator.states",
75+
Offset: unsafe.Offsetof(Aggregator{}.states),
8176
},
8277
{
8378
Name: "state.buckets",
@@ -92,7 +87,6 @@ func TestMain(m *testing.M) {
9287
Offset: unsafe.Offsetof(state{}.count),
9388
},
9489
}
95-
fmt.Println(fields)
9690

9791
if !ottest.Aligned8Byte(fields, os.Stderr) {
9892
os.Exit(1)
@@ -151,12 +145,12 @@ func histogram(t *testing.T, profile test.Profile, policy policy) {
151145
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
152146
require.Nil(t, err)
153147

154-
require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
148+
require.Equal(t, len(agg.checkpoint().buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
155149

156150
counts := calcBuckets(all.Points(), profile)
157151
for i, v := range counts {
158-
bCount := agg.checkpoint.buckets.Counts[i].AsUint64()
159-
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts)
152+
bCount := agg.checkpoint().buckets.Counts[i].AsUint64()
153+
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint().buckets.Counts)
160154
}
161155
}
162156

@@ -202,12 +196,12 @@ func TestHistogramMerge(t *testing.T) {
202196
require.Equal(t, all.Count(), count, "Same count - absolute")
203197
require.Nil(t, err)
204198

205-
require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
199+
require.Equal(t, len(agg1.checkpoint().buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
206200

207201
counts := calcBuckets(all.Points(), profile)
208202
for i, v := range counts {
209-
bCount := agg1.checkpoint.buckets.Counts[i].AsUint64()
210-
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts)
203+
bCount := agg1.checkpoint().buckets.Counts[i].AsUint64()
204+
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint().buckets.Counts)
211205
}
212206
})
213207
}
@@ -229,8 +223,8 @@ func TestHistogramNotSet(t *testing.T) {
229223
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
230224
require.Nil(t, err)
231225

232-
require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
233-
for i, bCount := range agg.checkpoint.buckets.Counts {
226+
require.Equal(t, len(agg.checkpoint().buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
227+
for i, bCount := range agg.checkpoint().buckets.Counts {
234228
require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i)
235229
}
236230
})

‎sdk/metric/histogram_stress_test.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// This test is too large for the race detector. This SDK uses no locks
16+
// that the race detector would help with, anyway.
17+
// +build !race
18+
19+
package metric_test
20+
21+
import (
22+
"context"
23+
"math/rand"
24+
"testing"
25+
"time"
26+
27+
"go.opentelemetry.io/otel/api/core"
28+
"go.opentelemetry.io/otel/sdk/export/metric"
29+
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
30+
)
31+
32+
func TestStressInt64Histogram(t *testing.T) {
33+
desc := metric.NewDescriptor("some_metric", metric.MeasureKind, nil, "", "", core.Int64NumberKind, false)
34+
h := histogram.New(desc, []core.Number{core.NewInt64Number(25), core.NewInt64Number(50), core.NewInt64Number(75)})
35+
36+
go func() {
37+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
38+
for {
39+
_ = h.Update(context.Background(), core.NewInt64Number(rnd.Int63()), desc)
40+
}
41+
}()
42+
43+
startTime := time.Now()
44+
for time.Since(startTime) < time.Second {
45+
h.Checkpoint(context.Background(), desc)
46+
47+
b, _ := h.Histogram()
48+
c, _ := h.Count()
49+
50+
var realCount int64
51+
for _, c := range b.Counts {
52+
v := c.AsInt64()
53+
realCount += v
54+
}
55+
56+
if realCount != c {
57+
t.Fail()
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)
Please sign in to comment.