Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send Graphite metrics with tags #668

Merged
merged 2 commits into from Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 46 additions & 15 deletions prometheus/graphite/bridge.go
Expand Up @@ -54,6 +54,9 @@ const (

// Config defines the Graphite bridge config.
type Config struct {
// Whether to use Graphite tags or not. Defaults to false.
UseTags bool

// The url to push data to. Required.
URL string

Expand All @@ -80,6 +83,7 @@ type Config struct {

// Bridge pushes metrics to the configured Graphite server.
type Bridge struct {
useTags bool
url string
prefix string
interval time.Duration
Expand All @@ -102,6 +106,8 @@ type Logger interface {
func NewBridge(c *Config) (*Bridge, error) {
b := &Bridge{}

b.useTags = c.UseTags

if c.URL == "" {
return nil, errors.New("missing URL")
}
Expand Down Expand Up @@ -178,10 +184,10 @@ func (b *Bridge) Push() error {
}
defer conn.Close()

return writeMetrics(conn, mfs, b.prefix, model.Now())
return writeMetrics(conn, mfs, b.useTags, b.prefix, model.Now())
}

func writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model.Time) error {
func writeMetrics(w io.Writer, mfs []*dto.MetricFamily, useTags bool, prefix string, now model.Time) error {
vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{
Timestamp: now,
}, mfs...)
Expand All @@ -199,7 +205,7 @@ func writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model
if err := buf.WriteByte('.'); err != nil {
return err
}
if err := writeMetric(buf, s.Metric); err != nil {
if err := writeMetric(buf, s.Metric, useTags); err != nil {
return err
}
if _, err := fmt.Fprintf(buf, " %g %d\n", s.Value, int64(s.Timestamp)/millisecondsPerSecond); err != nil {
Expand All @@ -213,43 +219,68 @@ func writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model
return nil
}

func writeMetric(buf *bufio.Writer, m model.Metric) error {
func writeMetric(buf *bufio.Writer, m model.Metric, useTags bool) error {
metricName, hasName := m[model.MetricNameLabel]
numLabels := len(m) - 1
if !hasName {
numLabels = len(m)
}

labelStrings := make([]string, 0, numLabels)
for label, value := range m {
if label != model.MetricNameLabel {
labelStrings = append(labelStrings, fmt.Sprintf("%s %s", string(label), string(value)))
}
}

var err error
switch numLabels {
case 0:
if hasName {
return writeSanitized(buf, string(metricName))
}
default:
sort.Strings(labelStrings)
if err = writeSanitized(buf, string(metricName)); err != nil {
return err
}
for _, s := range labelStrings {
if err = buf.WriteByte('.'); err != nil {
if useTags {
return writeTags(buf, m)
} else {
return writeLabels(buf, m, numLabels)
}
}
return nil
}

func writeTags(buf *bufio.Writer, m model.Metric) error {
for label, value := range m {
if label != model.MetricNameLabel {
buf.WriteRune(';')
if _, err := buf.WriteString(string(label)); err != nil {
return err
}
if err = writeSanitized(buf, s); err != nil {
buf.WriteRune('=')
if _, err := buf.WriteString(string(value)); err != nil {
return err
}
}
}
return nil
}

func writeLabels(buf *bufio.Writer, m model.Metric, numLabels int) error {
labelStrings := make([]string, 0, numLabels)
for label, value := range m {
if label != model.MetricNameLabel {
labelString := string(label) + " " + string(value)
labelStrings = append(labelStrings, labelString)
}
}
sort.Strings(labelStrings)
for _, s := range labelStrings {
if err := buf.WriteByte('.'); err != nil {
return err
}
if err := writeSanitized(buf, s); err != nil {
return err
}
}
return nil
}

func writeSanitized(buf *bufio.Writer, s string) error {
prevUnderscore := false

Expand Down
149 changes: 137 additions & 12 deletions prometheus/graphite/bridge_test.go
Expand Up @@ -22,7 +22,10 @@ import (
"log"
"net"
"os"
"reflect"
"regexp"
"sort"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -62,6 +65,11 @@ func TestSanitize(t *testing.T) {
}

func TestWriteSummary(t *testing.T) {
testWriteSummary(t, false)
testWriteSummary(t, true)
}

func testWriteSummary(t *testing.T, useTags bool) {
sumVec := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "name",
Expand Down Expand Up @@ -95,7 +103,8 @@ func TestWriteSummary(t *testing.T) {
{prefix: "pre.fix"},
}

const want = `%s.name.constname.constvalue.labelname.val1.quantile.0_5 20 1477043
var (
want = `%s.name.constname.constvalue.labelname.val1.quantile.0_5 20 1477043
%s.name.constname.constvalue.labelname.val1.quantile.0_9 30 1477043
%s.name.constname.constvalue.labelname.val1.quantile.0_99 30 1477043
%s.name_sum.constname.constvalue.labelname.val1 60 1477043
Expand All @@ -106,11 +115,28 @@ func TestWriteSummary(t *testing.T) {
%s.name_sum.constname.constvalue.labelname.val2 90 1477043
%s.name_count.constname.constvalue.labelname.val2 3 1477043
`
wantTagged = `%s.name;constname=constvalue;labelname=val1;quantile=0.5 20 1477043
%s.name;constname=constvalue;labelname=val1;quantile=0.9 30 1477043
%s.name;constname=constvalue;labelname=val1;quantile=0.99 30 1477043
%s.name_sum;constname=constvalue;labelname=val1 60 1477043
%s.name_count;constname=constvalue;labelname=val1 3 1477043
%s.name;constname=constvalue;labelname=val2;quantile=0.5 30 1477043
%s.name;constname=constvalue;labelname=val2;quantile=0.9 40 1477043
%s.name;constname=constvalue;labelname=val2;quantile=0.99 40 1477043
%s.name_sum;constname=constvalue;labelname=val2 90 1477043
%s.name_count;constname=constvalue;labelname=val2 3 1477043
`
)

if useTags {
want = wantTagged
}

for i, tc := range testCases {

now := model.Time(1477043083)
var buf bytes.Buffer
err = writeMetrics(&buf, mfs, tc.prefix, now)
err = writeMetrics(&buf, mfs, useTags, tc.prefix, now)
if err != nil {
t.Fatalf("error: %v", err)
}
Expand All @@ -119,13 +145,21 @@ func TestWriteSummary(t *testing.T) {
tc.prefix, tc.prefix, tc.prefix, tc.prefix, tc.prefix,
tc.prefix, tc.prefix, tc.prefix, tc.prefix, tc.prefix,
)
if got := buf.String(); wantWithPrefix != got {
t.Fatalf("test case index %d: wanted \n%s\n, got \n%s\n", i, wantWithPrefix, got)

got := buf.String()

if err := checkLinesAreEqual(wantWithPrefix, got, useTags); err != nil {
t.Fatalf("test case index %d:\n%s", i, err.Error())
}
}
}

func TestWriteHistogram(t *testing.T) {
testWriteHistogram(t, false)
testWriteHistogram(t, true)
}

func testWriteHistogram(t *testing.T, useTags bool) {
histVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "name",
Expand Down Expand Up @@ -153,12 +187,13 @@ func TestWriteHistogram(t *testing.T) {

now := model.Time(1477043083)
var buf bytes.Buffer
err = writeMetrics(&buf, mfs, "prefix", now)
err = writeMetrics(&buf, mfs, useTags, "prefix", now)
if err != nil {
t.Fatalf("error: %v", err)
}

want := `prefix.name_bucket.constname.constvalue.labelname.val1.le.0_01 0 1477043
var (
want = `prefix.name_bucket.constname.constvalue.labelname.val1.le.0_01 0 1477043
prefix.name_bucket.constname.constvalue.labelname.val1.le.0_02 0 1477043
prefix.name_bucket.constname.constvalue.labelname.val1.le.0_05 0 1477043
prefix.name_bucket.constname.constvalue.labelname.val1.le.0_1 0 1477043
Expand All @@ -173,12 +208,40 @@ prefix.name_sum.constname.constvalue.labelname.val2 90 1477043
prefix.name_count.constname.constvalue.labelname.val2 3 1477043
prefix.name_bucket.constname.constvalue.labelname.val2.le._Inf 3 1477043
`
if got := buf.String(); want != got {
t.Fatalf("wanted \n%s\n, got \n%s\n", want, got)
wantTagged = `prefix.name_bucket;constname=constvalue;labelname=val1;le=0.01 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val1;le=0.02 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val1;le=0.05 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val1;le=0.1 0 1477043
prefix.name_sum;constname=constvalue;labelname=val1 60 1477043
prefix.name_count;constname=constvalue;labelname=val1 3 1477043
prefix.name_bucket;constname=constvalue;labelname=val1;le=+Inf 3 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=0.01 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=0.02 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=0.05 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=0.1 0 1477043
prefix.name_sum;constname=constvalue;labelname=val2 90 1477043
prefix.name_count;constname=constvalue;labelname=val2 3 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=+Inf 3 1477043
`
)

if useTags {
want = wantTagged
}

got := buf.String()

if err := checkLinesAreEqual(want, got, useTags); err != nil {
t.Fatalf(err.Error())
}
}

func TestToReader(t *testing.T) {
testToReader(t, false)
testToReader(t, true)
}

func testToReader(t *testing.T, useTags bool) {
cntVec := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "name",
Expand All @@ -193,24 +256,86 @@ func TestToReader(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(cntVec)

want := `prefix.name.constname.constvalue.labelname.val1 1 1477043
var (
want = `prefix.name.constname.constvalue.labelname.val1 1 1477043
prefix.name.constname.constvalue.labelname.val2 1 1477043
`
wantTagged = `prefix.name;constname=constvalue;labelname=val1 1 1477043
prefix.name;constname=constvalue;labelname=val2 1 1477043
`
)

if useTags {
want = wantTagged
}

mfs, err := reg.Gather()
if err != nil {
t.Fatalf("error: %v", err)
}

now := model.Time(1477043083)
var buf bytes.Buffer
err = writeMetrics(&buf, mfs, "prefix", now)
err = writeMetrics(&buf, mfs, useTags, "prefix", now)
if err != nil {
t.Fatalf("error: %v", err)
}

if got := buf.String(); want != got {
t.Fatalf("wanted \n%s\n, got \n%s\n", want, got)
got := buf.String()

if err := checkLinesAreEqual(want, got, useTags); err != nil {
t.Fatalf(err.Error())
}
}

func checkLinesAreEqual(w, g string, useTags bool) error {
if useTags {
taggedLineRegexp := regexp.MustCompile(`;| `)

wantLines, err := stringToLines(w)
if err != nil {
return err
}

gotLines, err := stringToLines(g)
if err != nil {
return err
}

for lineInd := range gotLines {
var log string
// Tagged metric, order of tags doesn't matter
// m1 := "prefix.name;tag1=val1;tag2=val2 3 1477043"
// m2 := "prefix.name;tag2=val2;tag1=val1 3 1477043"
// m1 should be equal to m2
wantSplit := taggedLineRegexp.Split(wantLines[lineInd], -1)
gotSplit := taggedLineRegexp.Split(gotLines[lineInd], -1)
sort.Strings(wantSplit)
sort.Strings(gotSplit)

log += fmt.Sprintf("want: %v\ngot: %v\n\n", wantSplit, gotSplit)

if !reflect.DeepEqual(wantSplit, gotSplit) {
return fmt.Errorf(log)
}
}
return nil
}

if w != g {
return fmt.Errorf("wanted:\n\n%s\ngot:\n\n%s", w, g)
}

return nil
}

func stringToLines(s string) (lines []string, err error) {
scanner := bufio.NewScanner(strings.NewReader(s))
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
err = scanner.Err()
return
}

func TestPush(t *testing.T) {
Expand Down