Skip to content

Commit

Permalink
export: support dataflow engine metrics factory (#34946)
Browse files Browse the repository at this point in the history
ref #34948
  • Loading branch information
lance6716 committed May 30, 2022
1 parent 1fac141 commit 6fbda74
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 123 deletions.
31 changes: 18 additions & 13 deletions dumpling/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/promutil"
filter "github.com/pingcap/tidb/util/table-filter"
)

Expand Down Expand Up @@ -80,7 +81,6 @@ const (
// Config is the dump config for dumpling
type Config struct {
storage.BackendOptions
ExtStorage storage.ExternalStorage `json:"-"`

specifiedTables bool
AllowCleartextPasswords bool
Expand Down Expand Up @@ -124,21 +124,26 @@ type Config struct {
CsvDelimiter string
Databases []string

TableFilter filter.Filter `json:"-"`
Where string
FileType string
ServerInfo version.ServerInfo
Logger *zap.Logger `json:"-"`
OutputFileTemplate *template.Template `json:"-"`
Rows uint64
ReadTimeout time.Duration
TiDBMemQuotaQuery uint64
FileSize uint64
StatementSize uint64
SessionParams map[string]interface{}
TableFilter filter.Filter `json:"-"`
Where string
FileType string
ServerInfo version.ServerInfo
Logger *zap.Logger `json:"-"`
OutputFileTemplate *template.Template `json:"-"`
Rows uint64
ReadTimeout time.Duration
TiDBMemQuotaQuery uint64
FileSize uint64
StatementSize uint64
SessionParams map[string]interface{}
// TODO: deprecate it
Labels prometheus.Labels `json:"-"`
Tables DatabaseTables
CollationCompatible string

// fields below are injected from DM or dataflow engine
ExtStorage storage.ExternalStorage `json:"-"`
PromFactory promutil.Factory
}

// ServerInfoUnknown is the unknown database type to dumpling
Expand Down
24 changes: 16 additions & 8 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ var emptyHandleValsErr = errors.New("empty handleVals for TiDB table")
// Dumper is the dump progress structure
type Dumper struct {
tctx *tcontext.Context
conf *Config
cancelCtx context.CancelFunc
conf *Config
metrics *metrics

extStore storage.ExternalStorage
dbHandle *sql.DB
Expand Down Expand Up @@ -79,6 +80,13 @@ func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) {
cancelCtx: cancelFn,
selectTiDBTableRegionFunc: selectTiDBTableRegion,
}

if conf.PromFactory == nil {
d.metrics = defaultMetrics
} else {
d.metrics = newMetrics(conf.PromFactory, []string{})
}

err := adjustConfig(conf,
registerTLSConfig,
validateSpecifiedSQL,
Expand Down Expand Up @@ -211,7 +219,7 @@ func (d *Dumper) Dump() (dumpErr error) {
}

taskChan := make(chan Task, defaultDumpThreads)
AddGauge(taskChannelCapacity, conf.Labels, defaultDumpThreads)
AddGauge(d.metrics.taskChannelCapacity, conf.Labels, defaultDumpThreads)
wg, writingCtx := errgroup.WithContext(tctx)
writerCtx := tctx.WithContext(writingCtx)
writers, tearDownWriters, err := d.startWriters(writerCtx, wg, taskChan, rebuildConn)
Expand Down Expand Up @@ -290,11 +298,11 @@ func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskCh
if err != nil {
return nil, func() {}, err
}
writer := NewWriter(tctx, int64(i), conf, conn, d.extStore)
writer := NewWriter(tctx, int64(i), conf, conn, d.extStore, d.metrics)
writer.rebuildConnFn = rebuildConnFn
writer.setFinishTableCallBack(func(task Task) {
if _, ok := task.(*TaskTableData); ok {
IncCounter(finishedTablesCounter, conf.Labels)
IncCounter(d.metrics.finishedTablesCounter, conf.Labels)
// FIXME: actually finishing the last chunk doesn't means this table is 'finished'.
// We can call this table is 'finished' if all its chunks are finished.
// Comment this log now to avoid ambiguity.
Expand All @@ -304,7 +312,7 @@ func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskCh
}
})
writer.setFinishTaskCallBack(func(task Task) {
IncGauge(taskChannelCapacity, conf.Labels)
IncGauge(d.metrics.taskChannelCapacity, conf.Labels)
if td, ok := task.(*TaskTableData); ok {
tctx.L().Debug("finish dumping table data task",
zap.String("database", td.Meta.DatabaseName()),
Expand Down Expand Up @@ -560,7 +568,7 @@ func (d *Dumper) dumpTableData(tctx *tcontext.Context, conn *BaseConn, meta Tabl
// Update total rows
fieldName, _ := pickupPossibleField(tctx, meta, conn)
c := estimateCount(tctx, meta.DatabaseName(), meta.TableName(), conn, fieldName, conf)
AddCounter(estimateTotalRowsCounter, conf.Labels, float64(c))
AddCounter(d.metrics.estimateTotalRowsCounter, conf.Labels, float64(c))

if conf.Rows == UnspecifiedSize {
return d.sequentialDumpTable(tctx, conn, meta, taskChan)
Expand Down Expand Up @@ -765,7 +773,7 @@ func (d *Dumper) sendTaskToChan(tctx *tcontext.Context, task Task, taskChan chan
case taskChan <- task:
tctx.L().Debug("send task to writer",
zap.String("task", task.Brief()))
DecGauge(taskChannelCapacity, conf.Labels)
DecGauge(d.metrics.taskChannelCapacity, conf.Labels)
return false
}
}
Expand Down Expand Up @@ -1201,7 +1209,7 @@ func (d *Dumper) dumpSQL(tctx *tcontext.Context, metaConn *BaseConn, taskChan ch
data := newTableData(conf.SQL, 0, true)
task := NewTaskTableData(meta, data, 0, 1)
c := detectEstimateRows(tctx, metaConn, fmt.Sprintf("EXPLAIN %s", conf.SQL), []string{"rows", "estRows", "count"})
AddCounter(estimateTotalRowsCounter, conf.Labels, float64(c))
AddCounter(d.metrics.estimateTotalRowsCounter, conf.Labels, float64(c))
atomic.StoreInt64(&d.totalTables, int64(1))
d.sendTaskToChan(tctx, task, taskChan)
}
Expand Down
4 changes: 3 additions & 1 deletion dumpling/export/ir_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/tidb/util/promutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -98,7 +99,8 @@ func TestChunkRowIter(t *testing.T) {
sqlRowIter := newRowIter(rows, 2)

res := newSimpleRowReceiver(2)
wp := newWriterPipe(nil, testFileSize, testStatementSize, nil)
metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{})
wp := newWriterPipe(nil, testFileSize, testStatementSize, metrics, nil)

var resSize [][]uint64
for sqlRowIter.HasNext() {
Expand Down
79 changes: 44 additions & 35 deletions dumpling/export/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ package export
import (
"math"

"github.com/pingcap/tidb/util/promutil"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

var (
type metrics struct {
finishedSizeGauge *prometheus.GaugeVec
finishedRowsGauge *prometheus.GaugeVec
finishedTablesCounter *prometheus.CounterVec
Expand All @@ -18,103 +19,111 @@ var (
receiveWriteChunkTimeHistogram *prometheus.HistogramVec
errorCount *prometheus.CounterVec
taskChannelCapacity *prometheus.GaugeVec
)
}

// InitMetricsVector inits metrics vectors.
// This function must run before RegisterMetrics
func InitMetricsVector(labels prometheus.Labels) {
labelNames := make([]string, 0, len(labels))
for name := range labels {
labelNames = append(labelNames, name)
}
finishedSizeGauge = prometheus.NewGaugeVec(
var defaultMetrics *metrics

func newMetrics(f promutil.Factory, labelNames []string) *metrics {
m := metrics{}
m.finishedSizeGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_size",
Help: "counter for dumpling finished file size",
}, labelNames)
estimateTotalRowsCounter = prometheus.NewCounterVec(
m.estimateTotalRowsCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "estimate_total_rows",
Help: "estimate total rows for dumpling tables",
}, labelNames)
finishedRowsGauge = prometheus.NewGaugeVec(
m.finishedRowsGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_rows",
Help: "counter for dumpling finished rows",
}, labelNames)
finishedTablesCounter = prometheus.NewCounterVec(
m.finishedTablesCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_tables",
Help: "counter for dumpling finished tables",
}, labelNames)
writeTimeHistogram = prometheus.NewHistogramVec(
m.writeTimeHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dumpling",
Subsystem: "write",
Name: "write_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, labelNames)
receiveWriteChunkTimeHistogram = prometheus.NewHistogramVec(
m.receiveWriteChunkTimeHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dumpling",
Subsystem: "write",
Name: "receive_chunk_duration_time",
Help: "Bucketed histogram of receiving time (s) of chunks",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, labelNames)
errorCount = prometheus.NewCounterVec(
m.errorCount = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "error_count",
Help: "Total error count during dumping progress",
}, labelNames)
taskChannelCapacity = prometheus.NewGaugeVec(
m.taskChannelCapacity = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "channel_capacity",
Help: "The task channel capacity during dumping progress",
}, labelNames)
return &m
}

// InitMetricsVector inits metrics vectors.
// This function must run before RegisterMetrics
func InitMetricsVector(labels prometheus.Labels) {
labelNames := make([]string, 0, len(labels))
for name := range labels {
labelNames = append(labelNames, name)
}
defaultMetrics = newMetrics(&promutil.PlainNoAutoRegisterFactory{}, labelNames)
}

// RegisterMetrics registers metrics.
func RegisterMetrics(registry *prometheus.Registry) {
if finishedSizeGauge == nil {
if defaultMetrics == nil || defaultMetrics.finishedSizeGauge == nil {
return
}
registry.MustRegister(finishedSizeGauge)
registry.MustRegister(finishedRowsGauge)
registry.MustRegister(estimateTotalRowsCounter)
registry.MustRegister(finishedTablesCounter)
registry.MustRegister(writeTimeHistogram)
registry.MustRegister(receiveWriteChunkTimeHistogram)
registry.MustRegister(errorCount)
registry.MustRegister(taskChannelCapacity)
registry.MustRegister(defaultMetrics.finishedSizeGauge)
registry.MustRegister(defaultMetrics.finishedRowsGauge)
registry.MustRegister(defaultMetrics.estimateTotalRowsCounter)
registry.MustRegister(defaultMetrics.finishedTablesCounter)
registry.MustRegister(defaultMetrics.writeTimeHistogram)
registry.MustRegister(defaultMetrics.receiveWriteChunkTimeHistogram)
registry.MustRegister(defaultMetrics.errorCount)
registry.MustRegister(defaultMetrics.taskChannelCapacity)
}

// RemoveLabelValuesWithTaskInMetrics removes metrics of specified labels.
func RemoveLabelValuesWithTaskInMetrics(labels prometheus.Labels) {
if finishedSizeGauge == nil {
if defaultMetrics.finishedSizeGauge == nil {
return
}
finishedSizeGauge.Delete(labels)
finishedRowsGauge.Delete(labels)
estimateTotalRowsCounter.Delete(labels)
finishedTablesCounter.Delete(labels)
writeTimeHistogram.Delete(labels)
receiveWriteChunkTimeHistogram.Delete(labels)
errorCount.Delete(labels)
taskChannelCapacity.Delete(labels)
defaultMetrics.finishedSizeGauge.Delete(labels)
defaultMetrics.finishedRowsGauge.Delete(labels)
defaultMetrics.estimateTotalRowsCounter.Delete(labels)
defaultMetrics.finishedTablesCounter.Delete(labels)
defaultMetrics.writeTimeHistogram.Delete(labels)
defaultMetrics.receiveWriteChunkTimeHistogram.Delete(labels)
defaultMetrics.errorCount.Delete(labels)
defaultMetrics.taskChannelCapacity.Delete(labels)
}

// ReadCounter reports the current value of the counter.
Expand Down
10 changes: 10 additions & 0 deletions dumpling/export/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/util/promutil"

"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -535,11 +536,13 @@ func TestBuildTableSampleQueries(t *testing.T) {
require.NoError(t, err)
baseConn := newBaseConn(conn, true, nil)
tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{})

d := &Dumper{
tctx: tctx,
conf: DefaultConfig(),
cancelCtx: cancel,
metrics: metrics,
selectTiDBTableRegionFunc: selectTiDBTableRegion,
}
d.conf.ServerInfo = version.ServerInfo{
Expand Down Expand Up @@ -945,11 +948,13 @@ func TestBuildRegionQueriesWithoutPartition(t *testing.T) {
require.NoError(t, err)
baseConn := newBaseConn(conn, true, nil)
tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{})

d := &Dumper{
tctx: tctx,
conf: DefaultConfig(),
cancelCtx: cancel,
metrics: metrics,
selectTiDBTableRegionFunc: selectTiDBTableRegion,
}
d.conf.ServerInfo = version.ServerInfo{
Expand Down Expand Up @@ -1104,11 +1109,13 @@ func TestBuildRegionQueriesWithPartitions(t *testing.T) {
require.NoError(t, err)
baseConn := newBaseConn(conn, true, nil)
tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{})

d := &Dumper{
tctx: tctx,
conf: DefaultConfig(),
cancelCtx: cancel,
metrics: metrics,
selectTiDBTableRegionFunc: selectTiDBTableRegion,
}
d.conf.ServerInfo = version.ServerInfo{
Expand Down Expand Up @@ -1360,10 +1367,13 @@ func TestBuildVersion3RegionQueries(t *testing.T) {
{"t4", 0, TableTypeBase},
},
}
metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{})

d := &Dumper{
tctx: tctx,
conf: conf,
cancelCtx: cancel,
metrics: metrics,
selectTiDBTableRegionFunc: selectTiDBTableRegion,
}
showStatsHistograms := buildMockNewRows(mock, []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Update_time", "Distinct_count", "Null_count", "Avg_col_size", "Correlation"},
Expand Down
8 changes: 4 additions & 4 deletions dumpling/export/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ func (d *Dumper) GetParameters() (midparams *Midparams) {
conf := d.conf
mid := &Midparams{}
mid.TotalTables = atomic.LoadInt64(&d.totalTables)
mid.CompletedTables = ReadCounter(finishedTablesCounter, conf.Labels)
mid.FinishedBytes = ReadGauge(finishedSizeGauge, conf.Labels)
mid.FinishedRows = ReadGauge(finishedRowsGauge, conf.Labels)
mid.EstimateTotalRows = ReadCounter(estimateTotalRowsCounter, conf.Labels)
mid.CompletedTables = ReadCounter(d.metrics.finishedTablesCounter, conf.Labels)
mid.FinishedBytes = ReadGauge(d.metrics.finishedSizeGauge, conf.Labels)
mid.FinishedRows = ReadGauge(d.metrics.finishedRowsGauge, conf.Labels)
mid.EstimateTotalRows = ReadCounter(d.metrics.estimateTotalRowsCounter, conf.Labels)
return mid
}

Expand Down

0 comments on commit 6fbda74

Please sign in to comment.