Skip to content

Commit

Permalink
Simplified API, added tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jan 20, 2022
1 parent abaeafe commit 9d5935e
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 57 deletions.
154 changes: 97 additions & 57 deletions prometheus/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,67 +26,84 @@ import (
dto "github.com/prometheus/client_model/go"
)

var _ rawCollector = &CachedCollector{}
var _ TransactionalGatherer = &CachedTGatherer{}

// CachedCollector allows creating allocation friendly metrics which change less frequently than scrape time, yet
// label values can are changing over time. This collector
// CachedTGatherer is a transactional gatherer that allows maintaining set of metrics which
// change less frequently than scrape time, yet label values and values change over time.
//
// If you happen to use NewDesc, NewConstMetric or MustNewConstMetric inside Collector.Collect routine, consider
// using CachedCollector instead.
type CachedCollector struct {
// using CachedTGatherer instead.
//
// Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers.
// TODO(bwplotka): Add non-session update API if useful for watcher-like mechanic.
type CachedTGatherer struct {
metrics map[uint64]*dto.Metric
metricFamilyByName map[string]*dto.MetricFamily
mMu sync.RWMutex

pendingSession bool
psMu sync.Mutex
}

func NewCachedCollector() *CachedCollector {
return &CachedCollector{
func NewCachedTGatherer() *CachedTGatherer {
return &CachedTGatherer{
metrics: make(map[uint64]*dto.Metric),
metricFamilyByName: map[string]*dto.MetricFamily{},
}
}

func (c *CachedCollector) Collect() []*dto.MetricFamily {
// TODO(bwplotka): Optimize potential penalty here.
return internal.NormalizeMetricFamilies(c.metricFamilyByName)
// Gather implements TransactionalGatherer interface.
func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
c.mMu.RLock()
// TODO(bwplotka): Consider caching slice and normalizing on write.
return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil
}

// NewSession allows to collect all metrics in one go and update cache as much in-place
// as possible to save allocations.
// NOTE: Not concurrency safe and only one allowed at the time (until commit).
func (c *CachedCollector) NewSession() *CollectSession {
// NewSession allows to recreate state of all metrics in CachedTGatherer in
// one go and update cache in-place to save allocations.
// Only one session is allowed at the time.
//
// Session is not concurrency safe.
func (c *CachedTGatherer) NewSession() (*CollectSession, error) {
c.psMu.Lock()
if c.pendingSession {
c.psMu.Unlock()
return nil, errors.New("only one session allowed, one already pending")
}
c.pendingSession = true
c.psMu.Unlock()

return &CollectSession{
c: c,
currentMetrics: make(map[uint64]*dto.Metric, len(c.metrics)),
currentByName: make(map[string]*dto.MetricFamily, len(c.metricFamilyByName)),
}
}, nil
}

type CollectSession struct {
closed bool

c *CachedCollector
c *CachedTGatherer
currentMetrics map[uint64]*dto.Metric
currentByName map[string]*dto.MetricFamily
}

// MustAddMetric is an AddMetric that panics on error.
func (s *CollectSession) MustAddMetric(fqName, help string, labelNames, labelValues []string, valueType ValueType, value float64, ts *time.Time) {
if err := s.AddMetric(fqName, help, labelNames, labelValues, valueType, value, ts); err != nil {
panic(err)
}
}

// AddMetric ...
// AddMetric adds metrics to current session. No changes will be updated in CachedTGatherer until Commit.
// TODO(bwplotka): Add validation.
func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues []string, valueType ValueType, value float64, ts *time.Time) error {
if s.closed {
return errors.New("new metric: collect session is closed, but was attempted to be used")
}

// Label names can be unsorted, will be sorting them later. The only implication is cachability if
// consumer provide non-deterministic order of those (unlikely since label values has to be matched).
// Label names can be unsorted, we will be sorting them later. The only implication is cachability if
// consumer provide non-deterministic order of those.

if len(labelNames) != len(labelValues) {
return errors.New("new metric: label name has different len than values")
Expand Down Expand Up @@ -116,7 +133,8 @@ func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues
h := xxhash.New()
h.WriteString(fqName)
h.Write(separatorByteSlice)
for i := range labelNames { // Ofc not in the same order...

for i := range labelNames {
h.WriteString(labelNames[i])
h.Write(separatorByteSlice)
h.WriteString(labelValues[i])
Expand Down Expand Up @@ -178,75 +196,96 @@ func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues
}
s.currentMetrics[hSum] = m

// Will be sorted later.
// Will be sorted later anyway, skip for now.
d.Metric = append(d.Metric, m)
return nil
}

func (s *CollectSession) Commit() {
// TODO(bwplotka): Sort metrics within family.
s.c.mMu.Lock()
// TODO(bwplotka): Sort metrics within family?
s.c.metricFamilyByName = s.currentByName
s.c.metrics = s.currentMetrics
s.c.mMu.Unlock()

s.c.psMu.Lock()
s.closed = true
s.c.pendingSession = false
s.c.psMu.Unlock()
}

type BlockingRegistry struct {
*Registry

// rawCollector represents special collectors which requires blocking collect for the whole duration
// of returned dto.MetricFamily usage.
rawCollectors []rawCollector
mu sync.Mutex
}

func NewBlockingRegistry() *BlockingRegistry {
return &BlockingRegistry{
Registry: NewRegistry(),
}
}
var _ TransactionalGatherer = &MultiTRegistry{}

type rawCollector interface {
Collect() []*dto.MetricFamily
}

func (b *BlockingRegistry) RegisterRaw(r rawCollector) error {
// TODO(bwplotka): Register, I guess for dups/check purposes?
b.rawCollectors = append(b.rawCollectors, r)
return nil
// MultiTRegistry is a TransactionalGatherer that joins gathered metrics from multiple
// transactional gatherers.
//
// It is caller responsibility to ensure two registries have mutually exclusive metric families,
// no deduplication will happen.
type MultiTRegistry struct {
tGatherers []TransactionalGatherer
}

func (b *BlockingRegistry) MustRegisterRaw(r rawCollector) {
if err := b.RegisterRaw(r); err != nil {
panic(err)
// NewMultiTRegistry creates MultiTRegistry.
func NewMultiTRegistry(tGatherers ...TransactionalGatherer) *MultiTRegistry {
return &MultiTRegistry{
tGatherers: tGatherers,
}
}

func (b *BlockingRegistry) Gather() (_ []*dto.MetricFamily, done func(), err error) {
b.mu.Lock()
mfs, err := b.Registry.Gather()
// Gather implements TransactionalGatherer interface.
func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err error) {
errs := MultiError{}

// TODO(bwplotka): Returned mfs are sorted, so sort raw ones and inject?
dFns := make([]func(), 0, len(r.tGatherers))
// TODO(bwplotka): Implement concurrency for those?
for _, r := range b.rawCollectors {
// TODO(bwplotka): Check for duplicates.
mfs = append(mfs, r.Collect()...)
for _, g := range r.tGatherers {
// TODO(bwplotka): Check for duplicates?
m, d, err := g.Gather()
errs.Append(err)

mfs = append(mfs, m...)
dFns = append(dFns, d)
}

// TODO(bwplotka): Consider sort in place, given metric family in gather is sorted already.
sort.Slice(mfs, func(i, j int) bool {
return *mfs[i].Name < *mfs[j].Name
})
return mfs, func() { b.mu.Unlock() }, err
return mfs, func() {
for _, d := range dFns {
d()
}
}, errs.MaybeUnwrap()
}

// TransactionalGatherer ...
// TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory
// used by metric family is no longer used by a caller. This allows implementations with cache.
type TransactionalGatherer interface {
// Gather ...
// Gather returns metrics in a lexicographically sorted slice
// of uniquely named MetricFamily protobufs. Gather ensures that the
// returned slice is valid and self-consistent so that it can be used
// for valid exposition. As an exception to the strict consistency
// requirements described for metric.Desc, Gather will tolerate
// different sets of label names for metrics of the same metric family.
//
// Even if an error occurs, Gather attempts to gather as many metrics as
// possible. Hence, if a non-nil error is returned, the returned
// MetricFamily slice could be nil (in case of a fatal error that
// prevented any meaningful metric collection) or contain a number of
// MetricFamily protobufs, some of which might be incomplete, and some
// might be missing altogether. The returned error (which might be a
// MultiError) explains the details. Note that this is mostly useful for
// debugging purposes. If the gathered protobufs are to be used for
// exposition in actual monitoring, it is almost always better to not
// expose an incomplete result and instead disregard the returned
// MetricFamily protobufs in case the returned error is non-nil.
//
// Important: done is expected to be triggered (even if the error occurs!)
// once caller does not need returned slice of dto.MetricFamily.
Gather() (_ []*dto.MetricFamily, done func(), err error)
}

// ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function.
func ToTransactionalGatherer(g Gatherer) TransactionalGatherer {
return &noTransactionGatherer{g: g}
}
Expand All @@ -255,6 +294,7 @@ type noTransactionGatherer struct {
g Gatherer
}

// Gather implements TransactionalGatherer interface.
func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
mfs, err := g.g.Gather()
return mfs, func() {}, err
Expand Down
125 changes: 125 additions & 0 deletions prometheus/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package prometheus

import (
"errors"
"strings"
"testing"
"time"

dto "github.com/prometheus/client_model/go"
)

func TestCachedTGatherer(t *testing.T) {
c := NewCachedTGatherer()
mfs, done, err := c.Gather()
if err != nil {
t.Error("gather failed:", err)
}
done()
if got := mfsToString(mfs); got != "" {
t.Error("unexpected metric family", got)
}

s, err := c.NewSession()
if err != nil {
t.Error("session failed:", err)
}

_, err = c.NewSession()
if err == nil {
t.Error("second session expected to fail, got nil")
}

// WIP.
time.Parse()

s.AddMetric("a", "help a", []string{"b", "c"}, []string{"valb", "valc"}, GaugeValue, 1)

}

func mfsToString(mfs []*dto.MetricFamily) string {
ret := make([]string, 0, len(mfs))
for _, m := range mfs {
ret = append(ret, m.String())
}
return strings.Join(ret, ",")
}

type tGatherer struct {
done bool
err error
}

func (g *tGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
name := "g1"
val := 1.0
return []*dto.MetricFamily{
{Name: &name, Metric: []*dto.Metric{{Gauge: &dto.Gauge{Value: &val}}}},
}, func() { g.done = true }, g.err
}

func TestNewMultiTRegistry(t *testing.T) {
treg := &tGatherer{}

t.Run("one registry", func(t *testing.T) {
m := NewMultiTRegistry(treg)
ret, done, err := m.Gather()
if err != nil {
t.Error("gather failed:", err)
}
done()
if len(ret) != 1 {
t.Error("unexpected number of metric families, expected 1, got", ret)
}
if !treg.done {
t.Error("inner transactional registry not marked as done")
}
})

reg := NewRegistry()
if err := reg.Register(NewCounter(CounterOpts{Name: "c1", Help: "help c1"})); err != nil {
t.Error("registration failed:", err)
}

// Note on purpose two registries will have exactly same metric family name (but with different string).
// This behaviour is undefined at the moment.
if err := reg.Register(NewGauge(GaugeOpts{Name: "g1", Help: "help g1"})); err != nil {
t.Error("registration failed:", err)
}
treg.done = false

t.Run("two registries", func(t *testing.T) {
m := NewMultiTRegistry(ToTransactionalGatherer(reg), treg)
ret, done, err := m.Gather()
if err != nil {
t.Error("gather failed:", err)
}
done()
if len(ret) != 3 {
t.Error("unexpected number of metric families, expected 3, got", ret)
}
if !treg.done {
t.Error("inner transactional registry not marked as done")
}
})

treg.done = false
// Inject error.
treg.err = errors.New("test err")

t.Run("two registries, one with error", func(t *testing.T) {
m := NewMultiTRegistry(ToTransactionalGatherer(reg), treg)
ret, done, err := m.Gather()
if err != treg.err {
t.Error("unexpected error:", err)
}
done()
if len(ret) != 3 {
t.Error("unexpected number of metric families, expected 3, got", ret)
}
// Still on error, we expect done to be triggered.
if !treg.done {
t.Error("inner transactional registry not marked as done")
}
})
}

0 comments on commit 9d5935e

Please sign in to comment.