Skip to content

Commit

Permalink
Added Transactional Gatherer allowed cached solutions (#989)
Browse files Browse the repository at this point in the history
* Added cached collector.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

update.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Attempt 2

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Added blocking registry, with raw collector and transactional handler.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Added fast path to normal (empty) registry to save 8 allocs and 3K5B per Gather.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Simplified API, added tests.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Fix.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Simplified implementation.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Added benchmark.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Optimized.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Optimization attempt.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Revert "Optimization attempt."

This reverts commit 2fcaf51.

Optimization was not worth it:

 benchstat v1.txt v2.txt
name                                                           old time/op    new time/op    delta
CachedTGatherer_Update/Update_of_one_element_without_reset-12    2.64µs ± 0%    4.05µs ± 0%   ~     (p=1.000 n=1+1)
CachedTGatherer_Update/Update_of_all_elements_with_reset-12       701ms ± 0%     358ms ± 0%   ~     (p=1.000 n=1+1)
CachedTGatherer_Update/Gather-12                                  535µs ± 0%  703934µs ± 0%   ~     (p=1.000 n=1+1)

name                                                           old alloc/op   new alloc/op   delta
CachedTGatherer_Update/Update_of_one_element_without_reset-12      208B ± 0%      208B ± 0%   ~     (all equal)
CachedTGatherer_Update/Update_of_all_elements_with_reset-12      40.2MB ± 0%    41.1MB ± 0%   ~     (p=1.000 n=1+1)
CachedTGatherer_Update/Gather-12                                 48.6kB ± 0%    84.3kB ± 0%   ~     (p=1.000 n=1+1)

name                                                           old allocs/op  new allocs/op  delta
CachedTGatherer_Update/Update_of_one_element_without_reset-12      3.00 ± 0%      3.00 ± 0%   ~     (all equal)
CachedTGatherer_Update/Update_of_all_elements_with_reset-12        6.00 ± 0%   4003.00 ± 0%   ~     (p=1.000 n=1+1)
CachedTGatherer_Update/Gather-12                                  1.00k ± 0%     2.01k ± 0%   ~     (p=1.000 n=1+1)

* nit.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Another optimization attempt.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* rename and further optimization.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Hopefully final optimization.

benchstat -delta-test=none v6.txt v9.txt
name                                                           old time/op    new time/op    delta
CachedTGatherer_Update/Update_of_one_element_without_reset-12    13.1ms ± 0%     0.0ms ± 0%  -99.81%
CachedTGatherer_Update/Update_of_all_elements_with_reset-12       309ms ± 0%     282ms ± 0%   -8.77%
CachedTGatherer_Update/Gather-12                                  422ms ± 0%       0ms ± 0%  -99.95%

name                                                           old alloc/op   new alloc/op   delta
CachedTGatherer_Update/Update_of_one_element_without_reset-12      208B ± 0%      208B ± 0%    0.00%
CachedTGatherer_Update/Update_of_all_elements_with_reset-12      2.47kB ± 0%    1.67kB ± 0%  -32.56%
CachedTGatherer_Update/Gather-12                                 52.8kB ± 0%    24.6kB ± 0%  -53.34%

name                                                           old allocs/op  new allocs/op  delta
CachedTGatherer_Update/Update_of_one_element_without_reset-12      3.00 ± 0%      3.00 ± 0%    0.00%
CachedTGatherer_Update/Update_of_all_elements_with_reset-12        0.00           0.00         0.00%
CachedTGatherer_Update/Gather-12                                  1.00k ± 0%     0.00k ± 0%  -99.60%

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Removed obsolete comment

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed tests.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Removed cache.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed tests.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Re-add cache.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Removed cache.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Feb 23, 2022
1 parent f3021b0 commit 1f81b3e
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 64 deletions.
4 changes: 3 additions & 1 deletion prometheus/desc.go
Expand Up @@ -20,6 +20,8 @@ import (
"strings"

"github.com/cespare/xxhash/v2"
"github.com/prometheus/client_golang/prometheus/internal"

//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
"github.com/golang/protobuf/proto"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -154,7 +156,7 @@ func NewDesc(fqName, help string, variableLabels []string, constLabels Labels) *
Value: proto.String(v),
})
}
sort.Sort(labelPairSorter(d.constLabelPairs))
sort.Sort(internal.LabelPairSorter(d.constLabelPairs))
return d
}

Expand Down
28 changes: 22 additions & 6 deletions prometheus/internal/metric.go
Expand Up @@ -19,18 +19,34 @@ import (
dto "github.com/prometheus/client_model/go"
)

// metricSorter is a sortable slice of *dto.Metric.
type metricSorter []*dto.Metric
// LabelPairSorter implements sort.Interface. It is used to sort a slice of
// dto.LabelPair pointers.
type LabelPairSorter []*dto.LabelPair

func (s metricSorter) Len() int {
func (s LabelPairSorter) Len() int {
return len(s)
}

func (s metricSorter) Swap(i, j int) {
func (s LabelPairSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s metricSorter) Less(i, j int) bool {
func (s LabelPairSorter) Less(i, j int) bool {
return s[i].GetName() < s[j].GetName()
}

// MetricSorter is a sortable slice of *dto.Metric.
type MetricSorter []*dto.Metric

func (s MetricSorter) Len() int {
return len(s)
}

func (s MetricSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s MetricSorter) Less(i, j int) bool {
if len(s[i].Label) != len(s[j].Label) {
// This should not happen. The metrics are
// inconsistent. However, we have to deal with the fact, as
Expand Down Expand Up @@ -68,7 +84,7 @@ func (s metricSorter) Less(i, j int) bool {
// the slice, with the contained Metrics sorted within each MetricFamily.
func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily {
for _, mf := range metricFamiliesByName {
sort.Sort(metricSorter(mf.Metric))
sort.Sort(MetricSorter(mf.Metric))
}
names := make([]string, 0, len(metricFamiliesByName))
for name, mf := range metricFamiliesByName {
Expand Down
16 changes: 0 additions & 16 deletions prometheus/metric.go
Expand Up @@ -115,22 +115,6 @@ func BuildFQName(namespace, subsystem, name string) string {
return name
}

// labelPairSorter implements sort.Interface. It is used to sort a slice of
// dto.LabelPair pointers.
type labelPairSorter []*dto.LabelPair

func (s labelPairSorter) Len() int {
return len(s)
}

func (s labelPairSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s labelPairSorter) Less(i, j int) bool {
return s[i].GetName() < s[j].GetName()
}

type invalidMetric struct {
desc *Desc
err error
Expand Down
10 changes: 9 additions & 1 deletion prometheus/promhttp/http.go
Expand Up @@ -84,6 +84,13 @@ func Handler() http.Handler {
// instrumentation. Use the InstrumentMetricHandler function to apply the same
// kind of instrumentation as it is used by the Handler function.
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
return HandlerForTransactional(prometheus.ToTransactionalGatherer(reg), opts)
}

// HandlerForTransactional is like HandlerFor, but it uses transactional gather, which
// can safely change in-place returned *dto.MetricFamily before call to `Gather` and after
// call to `done` of that `Gather`.
func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler {
var (
inFlightSem chan struct{}
errCnt = prometheus.NewCounterVec(
Expand Down Expand Up @@ -123,7 +130,8 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
return
}
}
mfs, err := reg.Gather()
mfs, done, err := reg.Gather()
defer done()
if err != nil {
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error gathering metrics:", err)
Expand Down
107 changes: 83 additions & 24 deletions prometheus/promhttp/http_test.go
Expand Up @@ -16,6 +16,7 @@ package promhttp
import (
"bytes"
"errors"
"fmt"
"log"
"net/http"
"net/http/httptest"
Expand All @@ -24,6 +25,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

type errorCollector struct{}
Expand Down Expand Up @@ -56,8 +58,19 @@ func (b blockingCollector) Collect(ch chan<- prometheus.Metric) {
<-b.Block
}

func TestHandlerErrorHandling(t *testing.T) {
type mockTransactionGatherer struct {
g prometheus.Gatherer
gatherInvoked int
doneInvoked int
}

func (g *mockTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
g.gatherInvoked++
mfs, err := g.g.Gather()
return mfs, func() { g.doneInvoked++ }, err
}

func TestHandlerErrorHandling(t *testing.T) {
// Create a registry that collects a MetricFamily with two elements,
// another with one, and reports an error. Further down, we'll use the
// same registry in the HandlerOpts.
Expand Down Expand Up @@ -90,21 +103,30 @@ func TestHandlerErrorHandling(t *testing.T) {
request, _ := http.NewRequest("GET", "/", nil)
request.Header.Add("Accept", "test/plain")

errorHandler := HandlerFor(reg, HandlerOpts{
mReg := &mockTransactionGatherer{g: reg}
errorHandler := HandlerForTransactional(mReg, HandlerOpts{
ErrorLog: logger,
ErrorHandling: HTTPErrorOnError,
Registry: reg,
})
continueHandler := HandlerFor(reg, HandlerOpts{
continueHandler := HandlerForTransactional(mReg, HandlerOpts{
ErrorLog: logger,
ErrorHandling: ContinueOnError,
Registry: reg,
})
panicHandler := HandlerFor(reg, HandlerOpts{
panicHandler := HandlerForTransactional(mReg, HandlerOpts{
ErrorLog: logger,
ErrorHandling: PanicOnError,
Registry: reg,
})
// Expect gatherer not touched.
if got := mReg.gatherInvoked; got != 0 {
t.Fatalf("unexpected number of gather invokes, want 0, got %d", got)
}
if got := mReg.doneInvoked; got != 0 {
t.Fatalf("unexpected number of done invokes, want 0, got %d", got)
}

wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
`
wantErrorBody := `An error has occurred while serving metrics:
Expand Down Expand Up @@ -140,25 +162,39 @@ the_count 0
`

errorHandler.ServeHTTP(writer, request)
if got := mReg.gatherInvoked; got != 1 {
t.Fatalf("unexpected number of gather invokes, want 1, got %d", got)
}
if got := mReg.doneInvoked; got != 1 {
t.Fatalf("unexpected number of done invokes, want 1, got %d", got)
}
if got, want := writer.Code, http.StatusInternalServerError; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
if got := logBuf.String(); got != wantMsg {
t.Errorf("got log message:\n%s\nwant log message:\n%s\n", got, wantMsg)
if got, want := logBuf.String(), wantMsg; got != want {
t.Errorf("got log buf %q, want %q", got, want)
}
if got := writer.Body.String(); got != wantErrorBody {
t.Errorf("got body:\n%s\nwant body:\n%s\n", got, wantErrorBody)
if got, want := writer.Body.String(), wantErrorBody; got != want {
t.Errorf("got body %q, want %q", got, want)
}

logBuf.Reset()
writer.Body.Reset()
writer.Code = http.StatusOK

continueHandler.ServeHTTP(writer, request)

if got := mReg.gatherInvoked; got != 2 {
t.Fatalf("unexpected number of gather invokes, want 2, got %d", got)
}
if got := mReg.doneInvoked; got != 2 {
t.Fatalf("unexpected number of done invokes, want 2, got %d", got)
}
if got, want := writer.Code, http.StatusOK; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
if got := logBuf.String(); got != wantMsg {
t.Errorf("got log message %q, want %q", got, wantMsg)
if got, want := logBuf.String(), wantMsg; got != want {
t.Errorf("got log buf %q, want %q", got, want)
}
if got := writer.Body.String(); got != wantOKBody1 && got != wantOKBody2 {
t.Errorf("got body %q, want either %q or %q", got, wantOKBody1, wantOKBody2)
Expand All @@ -168,20 +204,34 @@ the_count 0
if err := recover(); err == nil {
t.Error("expected panic from panicHandler")
}
if got := mReg.gatherInvoked; got != 3 {
t.Fatalf("unexpected number of gather invokes, want 3, got %d", got)
}
if got := mReg.doneInvoked; got != 3 {
t.Fatalf("unexpected number of done invokes, want 3, got %d", got)
}
}()
panicHandler.ServeHTTP(writer, request)
}

func TestInstrumentMetricHandler(t *testing.T) {
reg := prometheus.NewRegistry()
handler := InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{}))
mReg := &mockTransactionGatherer{g: reg}
handler := InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{}))
// Do it again to test idempotency.
InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{}))
InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{}))
writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil)
request.Header.Add("Accept", "test/plain")

handler.ServeHTTP(writer, request)
if got := mReg.gatherInvoked; got != 1 {
t.Fatalf("unexpected number of gather invokes, want 1, got %d", got)
}
if got := mReg.doneInvoked; got != 1 {
t.Fatalf("unexpected number of done invokes, want 1, got %d", got)
}

if got, want := writer.Code, http.StatusOK; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
Expand All @@ -195,19 +245,28 @@ func TestInstrumentMetricHandler(t *testing.T) {
t.Errorf("got body %q, does not contain %q", got, want)
}

writer.Body.Reset()
handler.ServeHTTP(writer, request)
if got, want := writer.Code, http.StatusOK; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
for i := 0; i < 100; i++ {
writer.Body.Reset()
handler.ServeHTTP(writer, request)

want = "promhttp_metric_handler_requests_in_flight 1\n"
if got := writer.Body.String(); !strings.Contains(got, want) {
t.Errorf("got body %q, does not contain %q", got, want)
}
want = "promhttp_metric_handler_requests_total{code=\"200\"} 1\n"
if got := writer.Body.String(); !strings.Contains(got, want) {
t.Errorf("got body %q, does not contain %q", got, want)
if got, want := mReg.gatherInvoked, i+2; got != want {
t.Fatalf("unexpected number of gather invokes, want %d, got %d", want, got)
}
if got, want := mReg.doneInvoked, i+2; got != want {
t.Fatalf("unexpected number of done invokes, want %d, got %d", want, got)
}
if got, want := writer.Code, http.StatusOK; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}

want := "promhttp_metric_handler_requests_in_flight 1\n"
if got := writer.Body.String(); !strings.Contains(got, want) {
t.Errorf("got body %q, does not contain %q", got, want)
}
want = fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1)
if got := writer.Body.String(); !strings.Contains(got, want) {
t.Errorf("got body %q, does not contain %q", got, want)
}
}
}

Expand Down

0 comments on commit 1f81b3e

Please sign in to comment.