Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TODO, use sync.Map to avoid blocking calls #2381

Merged
merged 3 commits into from Nov 13, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 15 additions & 26 deletions sdk/metric/controller/basic/controller.go
Expand Up @@ -57,13 +57,8 @@ var ErrControllerStarted = fmt.Errorf("controller already started")
// be blocked by a pull request in the basic controller.
type Controller struct {
// lock protects libraries and synchronizes Start() and Stop().
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
lock sync.Mutex
// TODO: libraries is synchronized by lock, but could be
// accomplished using a sync.Map. The SDK specification will
// probably require this, as the draft already states that
// Stop() and MeterProvider.Meter() should not block each
// other.
libraries map[instrumentation.Library]*registry.UniqueInstrumentMeterImpl
lock sync.Mutex
libraries sync.Map
checkpointerFactory export.CheckpointerFactory

resource *resource.Resource
Expand Down Expand Up @@ -93,21 +88,18 @@ func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOptio
SchemaURL: cfg.SchemaURL(),
}

c.lock.Lock()
defer c.lock.Unlock()
m, ok := c.libraries[library]
m, ok := c.libraries.Load(library)
if !ok {
checkpointer := c.checkpointerFactory.NewCheckpointer()
accumulator := sdk.NewAccumulator(checkpointer)
m = registry.NewUniqueInstrumentMeterImpl(&accumulatorCheckpointer{
Accumulator: accumulator,
checkpointer: checkpointer,
library: library,
})

c.libraries[library] = m
m, _ = c.libraries.LoadOrStore(
library,
registry.NewUniqueInstrumentMeterImpl(&accumulatorCheckpointer{
Accumulator: sdk.NewAccumulator(checkpointer),
checkpointer: checkpointer,
library: library,
}))
}
return metric.WrapMeterImpl(m)
return metric.WrapMeterImpl(m.(*registry.UniqueInstrumentMeterImpl))
}

type accumulatorCheckpointer struct {
Expand Down Expand Up @@ -138,7 +130,6 @@ func New(checkpointerFactory export.CheckpointerFactory, opts ...Option) *Contro
}
}
return &Controller{
libraries: map[instrumentation.Library]*registry.UniqueInstrumentMeterImpl{},
checkpointerFactory: checkpointerFactory,
exporter: c.Exporter,
resource: c.Resource,
Expand Down Expand Up @@ -251,16 +242,14 @@ func (c *Controller) collect(ctx context.Context) error {
// accumulatorList returns a snapshot of current accumulators
// registered to this controller. This briefly locks the controller.
func (c *Controller) accumulatorList() []*accumulatorCheckpointer {
c.lock.Lock()
defer c.lock.Unlock()

var r []*accumulatorCheckpointer
for _, entry := range c.libraries {
acc, ok := entry.MeterImpl().(*accumulatorCheckpointer)
c.libraries.Range(func(key, value interface{}) bool {
acc, ok := value.(*registry.UniqueInstrumentMeterImpl).MeterImpl().(*accumulatorCheckpointer)
if ok {
r = append(r, acc)
}
}
return true
})
return r
}

Expand Down