/
sync.go
425 lines (364 loc) 路 12.5 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package syncstate
import (
"context"
"runtime"
"sync"
"sync/atomic"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/fprint"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument"
"go.opentelemetry.io/otel/attribute"
)
var sortableAttributesPool = sync.Pool{
New: func() any {
return new(attribute.Sortable)
},
}
// Instrument maintains a mapping from attribute.Set to an internal
// record type for a single API-level instrument. This type is
// organized so that a single attribute.Set lookup is performed
// regardless of the number of reader and instrument-view behaviors.
// Entries in the map have their accumulator's SnapshotAndProcess()
// method called whenever they are removed from the map, which can
// happen when any reader collects the instrument.
type Instrument struct {
// descriptor is the API-provided descriptor for the
// instrument, unmodified by views.
descriptor sdkinstrument.Descriptor
// compiled will be a single compiled instrument or a
// multi-instrument in case of multiple view behaviors
// and/or readers; these distinctions do not matter
// for synchronous aggregation.
compiled viewstate.Instrument
// lock protects current.
lock sync.RWMutex
// current is protected by lock.
current map[uint64]*record
}
// NewInstruments builds a new synchronous instrument given the
// per-pipeline instrument-views compiled. Note that the unused
// second parameter is an opaque value used in the asyncstate package,
// passed here to make these two packages generalize.
func NewInstrument(desc sdkinstrument.Descriptor, _ interface{}, compiled pipeline.Register[viewstate.Instrument]) *Instrument {
var nonnil []viewstate.Instrument
for _, comp := range compiled {
if comp != nil {
nonnil = append(nonnil, comp)
}
}
if nonnil == nil {
// When no readers enable the instrument, no need for an instrument.
return nil
}
return &Instrument{
descriptor: desc,
current: map[uint64]*record{},
// Note that viewstate.Combine is used to eliminate
// the per-pipeline distinction that is useful in the
// asyncstate package. Here, in the common case there
// will be one pipeline and one view, such that
// viewstate.Combine produces a single concrete
// viewstate.Instrument. Only when there are multiple
// views or multiple pipelines will the combination
// produce a viewstate.multiInstrument here.
compiled: viewstate.Combine(desc, nonnil...),
}
}
// SnapshotAndProcess calls SnapshotAndProcess() for all live
// accumulators of this instrument. Inactive accumulators will be
// subsequently removed from the map.
func (inst *Instrument) SnapshotAndProcess() {
inst.lock.Lock()
defer inst.lock.Unlock()
for key, reclist := range inst.current {
// reclist is a list of records for this fingerprint.
var head *record
var tail *record
// Scan reclist and modify the list. We're holding the
// lock giving exclusive access to the head-of-list
// and each next field, so the process here builds a new
// linked list after filtering records that are no longer
// in use.
for rec := reclist; rec != nil; rec = rec.next {
if inst.singleSnapshotAndProcess(key, rec) {
if head == nil {
// The first time a record will be kept,
// it becomes the head and tail.
head = rec
tail = rec
} else {
// Subsequently, update the tail of the
// list. Note that this creates a
// temporarily invalid list will be
// repaired outside the loop, below.
tail.next = rec
tail = rec
}
}
}
// When records are kept, delete the map entry.
if head == nil {
delete(inst.current, key)
continue
}
// Otherwise, terminate the list that was built.
tail.next = nil
if head != reclist {
// If the head changes, update the map.
inst.current[key] = head
}
}
}
// singleSnapshotAndProcess
func (inst *Instrument) singleSnapshotAndProcess(fp uint64, rec *record) bool {
if rec.conditionalSnapshotAndProcess(false) {
return true
}
// Having no updates since last collection, try to unmap:
unmapped := rec.refMapped.tryUnmap()
// The first rec.conditionalSnapshotAndProcess
// returned false indicating no change, except:
// (a) it's now possible there was a race, the collector needs to
// see it.
// (b) if this is indeed the last reference, the collector needs the
// release signal.
_ = rec.conditionalSnapshotAndProcess(unmapped)
// When `unmapped` is true, any other goroutines are now
// trying to re-insert this entry in the map, they are busy
// calling Gosched() waiting for this record to disappear.
return !unmapped
}
// record consists of an accumulator, a reference count, the number of
// updates, and the number of collected updates.
type record struct {
// refMapped tracks concurrent references to the record in
// order to keep the record mapped as long as it is active or
// uncollected.
refMapped refcountMapped
// updateCount is incremented on every Update.
updateCount int64
// collectedCount is set to updateCount on collection,
// supports checking for no updates during a round.
collectedCount int64
// accumulator can be a multi-accumulator if there
// are multiple behaviors or multiple readers, but
// these distinctions are not relevant for synchronous
// instruments.
accumulator viewstate.Accumulator
// attributeSet is ordered and deduplicated
attributeSet attribute.Set
// attributeList is in user-specified order, may contain duplicates.
attributeList []attribute.KeyValue
// next is protected by the instrument's RWLock.
next *record
}
// conditionalSnapshotAndProcess checks whether the accumulator has been
// modified since the last collection (by any reader), returns a
// boolean indicating whether the record is active. If active, calls
// SnapshotAndProcess on the associated accumulator and returns true.
// If updates happened since the last collection (by any reader),
// returns false.
func (rec *record) conditionalSnapshotAndProcess(release bool) bool {
mods := atomic.LoadInt64(&rec.updateCount)
if !release {
if mods == atomic.LoadInt64(&rec.collectedCount) {
return false
}
}
rec.accumulator.SnapshotAndProcess(release)
// Updates happened in this interval, collect and continue.
atomic.StoreInt64(&rec.collectedCount, mods)
return true
}
// capture performs a single update for any synchronous instrument.
func capture[N number.Any, Traits number.Traits[N]](_ context.Context, inst *Instrument, num N, attrs []attribute.KeyValue) {
if inst == nil {
// Instrument was completely disabled by the view.
return
}
// Note: Here, this is the place to use context, e.g., extract baggage.
if !aggregator.RangeTest[N, Traits](num, inst.descriptor) {
return
}
rec := acquireRecord[N](inst, attrs)
defer rec.refMapped.unref()
rec.accumulator.(viewstate.Updater[N]).Update(num)
// Record was modified.
atomic.AddInt64(&rec.updateCount, 1)
}
func fingerprintAttributes(attrs []attribute.KeyValue) uint64 {
var fp uint64
for _, attr := range attrs {
fp += fprint.Mix(
fprint.FingerprintString(string(attr.Key)),
fingerprintValue(attr.Value),
)
}
return fp
}
func fingerprintSlice[T any](slice []T, f func(T) uint64) uint64 {
var fp uint64
for _, item := range slice {
fp += f(item)
}
return fp
}
func fingerprintValue(value attribute.Value) uint64 {
switch value.Type() {
case attribute.BOOL:
return fprint.FingerprintBool(value.AsBool())
case attribute.INT64:
return fprint.FingerprintInt64(value.AsInt64())
case attribute.FLOAT64:
return fprint.FingerprintFloat64(value.AsFloat64())
case attribute.STRING:
return fprint.FingerprintString(value.AsString())
case attribute.BOOLSLICE:
return fingerprintSlice(value.AsBoolSlice(), fprint.FingerprintBool)
case attribute.INT64SLICE:
return fingerprintSlice(value.AsInt64Slice(), fprint.FingerprintInt64)
case attribute.FLOAT64SLICE:
return fingerprintSlice(value.AsFloat64Slice(), fprint.FingerprintFloat64)
case attribute.STRINGSLICE:
return fingerprintSlice(value.AsStringSlice(), fprint.FingerprintString)
}
return 0
}
func sliceEqual[T comparable](a, b []T) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
// attributesEqual returns true if two slices are exactly equal.
func attributesEqual(a, b []attribute.KeyValue) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i].Value.Type() != b[i].Value.Type() {
return false
}
if a[i].Key != b[i].Key {
return false
}
switch a[i].Value.Type() {
case attribute.INVALID, attribute.BOOL, attribute.INT64, attribute.FLOAT64, attribute.STRING:
if a[i].Value != b[i].Value {
return false
}
case attribute.BOOLSLICE:
if !sliceEqual(a[i].Value.AsBoolSlice(), b[i].Value.AsBoolSlice()) {
return false
}
case attribute.INT64SLICE:
if !sliceEqual(a[i].Value.AsInt64Slice(), b[i].Value.AsInt64Slice()) {
return false
}
case attribute.FLOAT64SLICE:
if !sliceEqual(a[i].Value.AsFloat64Slice(), b[i].Value.AsFloat64Slice()) {
return false
}
case attribute.STRINGSLICE:
if !sliceEqual(a[i].Value.AsStringSlice(), b[i].Value.AsStringSlice()) {
return false
}
}
}
return true
}
// acquireRead acquires the read lock and searches for a `*record`.
func acquireRead(inst *Instrument, fp uint64, attrs []attribute.KeyValue) *record {
inst.lock.RLock()
defer inst.lock.RUnlock()
rec := inst.current[fp]
// Note: we could (optionally) allow collisions and not scan this list.
// The copied `attributeList` can be avoided in this case, as well.
for rec != nil && !attributesEqual(attrs, rec.attributeList) {
rec = rec.next
}
// Existing record case.
if rec != nil && rec.refMapped.ref() {
// At this moment it is guaranteed that the
// record is in the map and will not be removed.
return rec
}
return nil
}
// acquireRecord gets or creates a `*record` corresponding to `attrs`,
// the input attributes.
func acquireRecord[N number.Any](inst *Instrument, attrs []attribute.KeyValue) *record {
fp := fingerprintAttributes(attrs)
rec := acquireRead(inst, fp, attrs)
if rec != nil {
return rec
}
// Build the attribute set. Make a copy of the attribute list
// because we are keeping a copy in the record.
acpy := make([]attribute.KeyValue, len(attrs))
copy(acpy, attrs)
tmp := sortableAttributesPool.Get().(*attribute.Sortable)
defer sortableAttributesPool.Put(tmp)
aset := attribute.NewSetWithSortable(acpy, tmp)
// Note: the accumulator set below is created speculatively;
// it will be released if it is never returned.
newRec := &record{
refMapped: newRefcountMapped(),
accumulator: inst.compiled.NewAccumulator(aset),
attributeList: acpy,
attributeSet: aset,
}
for {
acquired, loaded := acquireWrite(inst, fp, newRec)
if !loaded {
// When this happens, we are waiting for the call to delete()
// inside SnapshotAndProcess() to complete before inserting
// a new record. This avoids busy-waiting.
runtime.Gosched()
continue
}
if acquired != newRec {
// Release the speculative accumulator, since it was not used.
newRec.accumulator.SnapshotAndProcess(true)
}
return acquired
}
}
// acquireWrite acquires the write lock and gets or sets a `*record`.
func acquireWrite(inst *Instrument, fp uint64, newRec *record) (*record, bool) {
inst.lock.Lock()
defer inst.lock.Unlock()
for oldRec := inst.current[fp]; oldRec != nil; oldRec = oldRec.next {
if attributesEqual(oldRec.attributeList, newRec.attributeList) {
if oldRec.refMapped.ref() {
return oldRec, true
}
// in which case, there's been a race
return nil, false
}
}
newRec.next = inst.current[fp]
inst.current[fp] = newRec
return newRec, true
}