Skip to content

Commit

Permalink
metrics: remove metrics proxy (#6948)
Browse files Browse the repository at this point in the history
ref #4287
  • Loading branch information
GMHDBJD committed Sep 6, 2022
1 parent 6f9c00d commit 7774275
Show file tree
Hide file tree
Showing 24 changed files with 155 additions and 1,016 deletions.
4 changes: 1 addition & 3 deletions dm/dumpling/dumpling.go
Expand Up @@ -30,7 +30,6 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
"github.com/pingcap/tiflow/engine/pkg/promutil"

"github.com/pingcap/tiflow/dm/config"
Expand Down Expand Up @@ -81,8 +80,7 @@ func (m *Dumpling) Init(ctx context.Context) error {
// will register and deregister metrics, so we must use NoopRegistry
// to avoid duplicated registration.
m.metricProxies = &metricProxies{}
m.metricProxies.dumplingExitWithErrorCounter = metricsproxy.NewCounterVec(
m.cfg.MetricsFactory,
m.metricProxies.dumplingExitWithErrorCounter = m.cfg.MetricsFactory.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "dumpling",
Expand Down
10 changes: 5 additions & 5 deletions dm/dumpling/metrics.go
Expand Up @@ -14,18 +14,18 @@
package dumpling

import (
"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
)

type metricProxies struct {
dumplingExitWithErrorCounter *metricsproxy.CounterVecProxy
dumplingExitWithErrorCounter *prometheus.CounterVec
}

var f = &promutil.PromFactory{}

var defaultMetricProxies = &metricProxies{
dumplingExitWithErrorCounter: metricsproxy.NewCounterVec(
&promutil.PromFactory{},
dumplingExitWithErrorCounter: f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "dumpling",
Expand All @@ -41,5 +41,5 @@ func RegisterMetrics(registry *prometheus.Registry) {

func (m *Dumpling) removeLabelValuesWithTaskInMetrics(task, source string) {
labels := prometheus.Labels{"task": task, "source_id": source}
m.metricProxies.dumplingExitWithErrorCounter.DeleteAllAboutLabels(labels)
m.metricProxies.dumplingExitWithErrorCounter.DeletePartialMatch(labels)
}
43 changes: 21 additions & 22 deletions dm/loader/metrics.go
Expand Up @@ -16,21 +16,20 @@ package loader
import (
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
)

var (
f = &promutil.PromFactory{}
// should error.
tidbExecutionErrorCounter = metricsproxy.NewCounterVec(&promutil.PromFactory{},
tidbExecutionErrorCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "tidb_execution_error",
Help: "Total count of tidb execution errors",
}, []string{"task", "source_id"})

queryHistogram = metricsproxy.NewHistogramVec(&promutil.PromFactory{},
queryHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -39,7 +38,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"task", "source_id"})

txnHistogram = metricsproxy.NewHistogramVec(&promutil.PromFactory{},
txnHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -48,7 +47,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"task", "worker", "source_id", "target_schema", "target_table"})

stmtHistogram = metricsproxy.NewHistogramVec(&promutil.PromFactory{},
stmtHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -57,31 +56,31 @@ var (
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"type", "task"})

dataFileGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
dataFileGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "data_file_gauge",
Help: "data files in total",
}, []string{"task", "source_id"})

tableGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
tableGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "table_gauge",
Help: "tables in total",
}, []string{"task", "source_id"})

dataSizeGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
dataSizeGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "data_size_gauge",
Help: "data size in total",
}, []string{"task", "source_id"})

progressGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
progressGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -90,15 +89,15 @@ var (
}, []string{"task", "source_id"})

// should alert.
loaderExitWithErrorCounter = metricsproxy.NewCounterVec(&promutil.PromFactory{},
loaderExitWithErrorCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "exit_with_error_count",
Help: "counter for loader exits with error",
}, []string{"task", "source_id"})

remainingTimeGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
remainingTimeGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -122,14 +121,14 @@ func RegisterMetrics(registry *prometheus.Registry) {
}

func (l *Loader) removeLabelValuesWithTaskInMetrics(task string) {
tidbExecutionErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
txnHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
queryHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
stmtHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataFileGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
tableGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataSizeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
progressGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
loaderExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
remainingTimeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
tidbExecutionErrorCounter.DeletePartialMatch(prometheus.Labels{"task": task})
txnHistogram.DeletePartialMatch(prometheus.Labels{"task": task})
queryHistogram.DeletePartialMatch(prometheus.Labels{"task": task})
stmtHistogram.DeletePartialMatch(prometheus.Labels{"task": task})
dataFileGauge.DeletePartialMatch(prometheus.Labels{"task": task})
tableGauge.DeletePartialMatch(prometheus.Labels{"task": task})
dataSizeGauge.DeletePartialMatch(prometheus.Labels{"task": task})
progressGauge.DeletePartialMatch(prometheus.Labels{"task": task})
loaderExitWithErrorCounter.DeletePartialMatch(prometheus.Labels{"task": task})
remainingTimeGauge.DeletePartialMatch(prometheus.Labels{"task": task})
}
15 changes: 7 additions & 8 deletions dm/master/metrics/metrics.go
Expand Up @@ -20,8 +20,6 @@ import (
cpu "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
)

// used for ddlPendingCounter, no "Resolved" lock because they will be
Expand All @@ -48,7 +46,8 @@ const (
)

var (
workerState = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
f = &promutil.PromFactory{}
workerState = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "master",
Expand All @@ -64,23 +63,23 @@ var (
Help: "the cpu usage of master",
})

ddlPendingCounter = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
ddlPendingCounter = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "master",
Name: "ddl_state_number",
Help: "number of pending DDL in different states, Un-synced (waiting all upstream), Synced (all upstream finished, waiting all downstream)",
}, []string{"task", "type"})

ddlErrCounter = metricsproxy.NewCounterVec(&promutil.PromFactory{},
ddlErrCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "master",
Name: "shard_ddl_error",
Help: "number of shard DDL lock/operation error",
}, []string{"task", "type"})

workerEventErrCounter = metricsproxy.NewCounterVec(&promutil.PromFactory{},
workerEventErrCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "master",
Expand Down Expand Up @@ -137,7 +136,7 @@ func ReportWorkerStage(name string, state float64) {

// RemoveWorkerState cleans state of deleted worker.
func RemoveWorkerState(name string) {
workerState.DeleteAllAboutLabels(prometheus.Labels{"worker": name})
workerState.DeletePartialMatch(prometheus.Labels{"worker": name})
}

// ReportDDLPending inc/dec by 1 to ddlPendingCounter.
Expand All @@ -152,7 +151,7 @@ func ReportDDLPending(task, oldStatus, newStatus string) {

// RemoveDDLPending removes all counter of this task.
func RemoveDDLPending(task string) {
ddlPendingCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
ddlPendingCounter.DeletePartialMatch(prometheus.Labels{"task": task})
}

// ReportDDLError is a setter for ddlErrCounter.
Expand Down
6 changes: 3 additions & 3 deletions dm/pkg/conn/baseconn.go
Expand Up @@ -23,11 +23,11 @@ import (
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-sql-driver/mysql"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -117,7 +117,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil).
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
// inject an error to trigger retry, this should be placed before the real execution of the SQL statement.
failpoint.Inject("retryableError", func(val failpoint.Value) {
if mark, ok := val.(string); ok {
Expand Down Expand Up @@ -216,7 +216,7 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *me
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil).
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, queries []string, args ...[]interface{}) (int, error) {
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}) (int, error) {
return conn.ExecuteSQLWithIgnoreError(tctx, hVec, task, nil, queries, args...)
}

Expand Down
5 changes: 3 additions & 2 deletions dm/pkg/conn/baseconn_test.go
Expand Up @@ -19,7 +19,6 @@ import (
"testing"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/engine/pkg/promutil"
Expand All @@ -37,7 +36,9 @@ var _ = Suite(&testBaseConnSuite{})

type testBaseConnSuite struct{}

var testStmtHistogram = metricsproxy.NewHistogramVec(&promutil.PromFactory{},
var f = &promutil.PromFactory{}

var testStmtHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "conn",
Expand Down
105 changes: 0 additions & 105 deletions dm/pkg/metricsproxy/countervec.go

This file was deleted.

0 comments on commit 7774275

Please sign in to comment.