-
Notifications
You must be signed in to change notification settings - Fork 269
/
metrics.go
178 lines (152 loc) · 4.85 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"context"
"time"
cpu "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
)
// used for ddlPendingCounter, no "Resolved" lock because they will be
// remove quickly and not pending anymore.
const (
DDLPendingNone = "None"
DDLPendingUnSynced = "Un-synced"
DDLPendingSynced = "Synced"
)
// used to show error type when handle DDLs.
const (
InfoErrSyncLock = "InfoPut - SyncLockError"
InfoErrHandleLock = "InfoPut - HandleLockError"
OpErrRemoveLock = "OperationPut - RemoveLockError"
OpErrLockUnSynced = "OperationPut - LockUnSyncedError"
OpErrPutNonOwnerOp = "OperationPut - PutNonOwnerOpError"
)
// used to represent worker event error type.
const (
WorkerEventHandle = "handle"
WorkerEventWatch = "watch"
)
var (
f = &promutil.PromFactory{}
workerState = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "master",
Name: "worker_state",
Help: "state of worker, -1 - unrecognized, 0 - offline, 1 - free, 2 - bound",
}, []string{"worker"})
cpuUsageGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "master",
Name: "cpu_usage",
Help: "the cpu usage of master",
})
ddlPendingCounter = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "master",
Name: "ddl_state_number",
Help: "number of pending DDL in different states, Un-synced (waiting all upstream), Synced (all upstream finished, waiting all downstream)",
}, []string{"task", "type"})
ddlErrCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "master",
Name: "shard_ddl_error",
Help: "number of shard DDL lock/operation error",
}, []string{"task", "type"})
workerEventErrCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "master",
Name: "worker_event_error",
Help: "number of error related to worker event, during handling or watching",
}, []string{"type"})
startLeaderCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "master",
Name: "start_leader_counter",
Help: "number of this dm-master try to start leader components",
})
)
func collectMetrics() {
cpuUsage := cpu.GetCPUPercentage()
cpuUsageGauge.Set(cpuUsage)
}
// RunBackgroundJob do periodic job.
func RunBackgroundJob(ctx context.Context) {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
collectMetrics()
case <-ctx.Done():
return
}
}
}
// RegistryMetrics registries metrics for worker.
func RegistryMetrics() {
registry := prometheus.DefaultRegisterer
registry.MustRegister(workerState)
registry.MustRegister(cpuUsageGauge)
registry.MustRegister(ddlPendingCounter)
registry.MustRegister(ddlErrCounter)
registry.MustRegister(workerEventErrCounter)
registry.MustRegister(startLeaderCounter)
}
// ReportWorkerStage is a setter for workerState.
func ReportWorkerStage(name string, state float64) {
workerState.WithLabelValues(name).Set(state)
}
// RemoveWorkerState cleans state of deleted worker.
func RemoveWorkerState(name string) {
workerState.DeletePartialMatch(prometheus.Labels{"worker": name})
}
// ReportDDLPending inc/dec by 1 to ddlPendingCounter.
func ReportDDLPending(task, oldStatus, newStatus string) {
if oldStatus != DDLPendingNone {
ddlPendingCounter.WithLabelValues(task, oldStatus).Dec()
}
if newStatus != DDLPendingNone {
ddlPendingCounter.WithLabelValues(task, newStatus).Inc()
}
}
// RemoveDDLPending removes all counter of this task.
func RemoveDDLPending(task string) {
ddlPendingCounter.DeletePartialMatch(prometheus.Labels{"task": task})
}
// ReportDDLError is a setter for ddlErrCounter.
func ReportDDLError(task, errType string) {
ddlErrCounter.WithLabelValues(task, errType).Inc()
}
// ReportWorkerEventErr is a setter for workerEventErrCounter.
func ReportWorkerEventErr(errType string) {
workerEventErrCounter.WithLabelValues(errType).Inc()
}
// ReportStartLeader increases startLeaderCounter by one.
func ReportStartLeader() {
startLeaderCounter.Inc()
}
// OnRetireLeader cleans some metrics when retires.
func OnRetireLeader() {
workerState.Reset()
ddlErrCounter.Reset()
ddlPendingCounter.Reset()
workerEventErrCounter.Reset()
}