Skip to content

Commit

Permalink
introduce metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Zach Zhu <zzqshu@126.com>
  • Loading branch information
zqzten committed May 29, 2022
1 parent dc877c3 commit d1727ab
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 40 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -12,6 +12,7 @@ require (
github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee
github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/rancher/wrangler v0.8.3
github.com/shengdoushi/base58 v1.0.0
github.com/sirupsen/logrus v1.7.0
Expand Down
20 changes: 19 additions & 1 deletion main.go
Expand Up @@ -5,16 +5,19 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/k3s-io/kine/pkg/endpoint"
"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/version"
"github.com/rancher/wrangler/pkg/signals"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

var (
config endpoint.Config
config endpoint.Config
metricsConfig metrics.Config
)

func main() {
Expand Down Expand Up @@ -76,6 +79,18 @@ func main() {
Usage: "Key file for DB connection",
Destination: &config.BackendTLSConfig.KeyFile,
},
cli.StringFlag{
Name: "metrics-bind-address",
Usage: "The address the metric endpoint binds to. Default :8080, set 0 to disable metrics serving.",
Destination: &metricsConfig.ServerAddress,
Value: ":8080",
},
cli.DurationFlag{
Name: "slow-sql-threshold",
Usage: "The duration which SQL executed longer than will be logged. Default 1s, set <= 0 to disable slow SQL log.",
Destination: &metrics.SlowSQLThreshold,
Value: time.Second,
},
cli.BoolFlag{Name: "debug"},
}
app.Action = run
Expand All @@ -92,6 +107,9 @@ func run(c *cli.Context) error {
logrus.SetLevel(logrus.TraceLevel)
}
ctx := signals.SetupSignalHandler(context.Background())
metricsConfig.ServerTLSConfig = config.ServerTLSConfig
go metrics.Serve(ctx, metricsConfig)
config.MetricsRegisterer = metrics.Registry
_, err := endpoint.Listen(ctx, config)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions pkg/drivers/dqlite/dqlite.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/k3s-io/kine/pkg/drivers/sqlite"
"github.com/k3s-io/kine/pkg/server"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -68,7 +69,7 @@ outer:
return nil
}

func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
opts, err := parseOpts(datasourceName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -97,7 +98,7 @@ func New(ctx context.Context, datasourceName string, connPoolConfig generic.Conn
}

sql.Register("dqlite", d)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig, metricsRegisterer)
if err != nil {
return nil, errors.Wrap(err, "sqlite client")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/drivers/dqlite/no_dqlite.go
Expand Up @@ -9,8 +9,9 @@ import (

"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/server"
"github.com/prometheus/client_golang/prometheus"
)

func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
return nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`)
}
39 changes: 26 additions & 13 deletions pkg/drivers/generic/generic.go
Expand Up @@ -13,7 +13,11 @@ import (

"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -66,15 +70,9 @@ var (
`, revSQL, compactRevSQL, columns)
)

type Stripped string

func (s Stripped) String() string {
str := strings.ReplaceAll(string(s), "\n", "")
return regexp.MustCompile("[\t ]+").ReplaceAllString(str, " ")
}

type ErrRetry func(error) bool
type TranslateErr func(error) error
type ErrCode func(error) string

type ConnectionPoolConfig struct {
MaxIdle int // zero means defaultMaxIdleConns; negative means 0
Expand Down Expand Up @@ -105,6 +103,7 @@ type Generic struct {
GetSizeSQL string
Retry ErrRetry
TranslateErr TranslateErr
ErrCode ErrCode
}

func q(sql, param string, numbered bool) string {
Expand Down Expand Up @@ -179,7 +178,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
return db, nil
}

func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) {
func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool, metricsRegisterer prometheus.Registerer) (*Generic, error) {
var (
db *sql.DB
err error
Expand All @@ -201,6 +200,10 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig

configureConnectionPooling(connPoolConfig, db, driverName)

if metricsRegisterer != nil {
metricsRegisterer.MustRegister(collectors.NewDBStatsCollector(db, "kine"))
}

return &Generic{
DB: db,

Expand Down Expand Up @@ -248,13 +251,21 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
}, err
}

func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
logrus.Tracef("QUERY %v : %s", args, Stripped(sql))
func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) {
logrus.Tracef("QUERY %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, d.ErrCode(err), util.Stripped(sql), args)
}()
return d.DB.QueryContext(ctx, sql, args...)
}

func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row {
logrus.Tracef("QUERY ROW %v : %s", args, Stripped(sql))
func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) {
logrus.Tracef("QUERY ROW %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, d.ErrCode(result.Err()), util.Stripped(sql), args)
}()
return d.DB.QueryRowContext(ctx, sql, args...)
}

Expand All @@ -266,8 +277,10 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{})

wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond))
for i := uint(0); i < 20; i++ {
logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
logrus.Tracef("EXEC (try: %d) %v : %s", i, args, util.Stripped(sql))
startTime := time.Now()
result, err = d.DB.ExecContext(ctx, sql, args...)
metrics.ObserveSQL(startTime, d.ErrCode(err), util.Stripped(sql), args)
if err != nil && d.Retry != nil && d.Retry(err) {
wait(i)
continue
Expand Down
25 changes: 20 additions & 5 deletions pkg/drivers/generic/tx.go
Expand Up @@ -3,8 +3,11 @@ package generic
import (
"context"
"database/sql"
"time"

"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/util"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -97,17 +100,29 @@ func (t *Tx) CurrentRevision(ctx context.Context) (int64, error) {
return id, err
}

func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
logrus.Tracef("TX QUERY %v : %s", args, Stripped(sql))
func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) {
logrus.Tracef("TX QUERY %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, t.d.ErrCode(err), util.Stripped(sql), args)
}()
return t.x.QueryContext(ctx, sql, args...)
}

func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row {
logrus.Tracef("TX QUERY ROW %v : %s", args, Stripped(sql))
func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) {
logrus.Tracef("TX QUERY ROW %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, t.d.ErrCode(result.Err()), util.Stripped(sql), args)
}()
return t.x.QueryRowContext(ctx, sql, args...)
}

func (t *Tx) execute(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) {
logrus.Tracef("TX EXEC %v : %s", args, Stripped(sql))
logrus.Tracef("TX EXEC %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, t.d.ErrCode(err), util.Stripped(sql), args)
}()
return t.x.ExecContext(ctx, sql, args...)
}
18 changes: 15 additions & 3 deletions pkg/drivers/mysql/mysql.go
Expand Up @@ -4,13 +4,16 @@ import (
"context"
cryptotls "crypto/tls"
"database/sql"
"fmt"

"github.com/go-sql-driver/mysql"
"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/logstructured"
"github.com/k3s-io/kine/pkg/logstructured/sqllog"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/tls"
"github.com/k3s-io/kine/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -43,7 +46,7 @@ var (
createDB = "CREATE DATABASE IF NOT EXISTS "
)

func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
Expand All @@ -62,7 +65,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
return nil, err
}

dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false)
dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false, metricsRegisterer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -95,6 +98,15 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
}
return err
}
dialect.ErrCode = func(err error) string {
if err == nil {
return ""
}
if err, ok := err.(*mysql.MySQLError); ok {
return fmt.Sprint(err.Number)
}
return err.Error()
}
if err := setup(dialect.DB); err != nil {
return nil, err
}
Expand All @@ -107,7 +119,7 @@ func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")

for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
Expand Down
19 changes: 15 additions & 4 deletions pkg/drivers/pgsql/pgsql.go
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/k3s-io/kine/pkg/logstructured/sqllog"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/tls"
"github.com/k3s-io/kine/pkg/util"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -44,7 +46,7 @@ var (
createDB = "CREATE DATABASE "
)

func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
parsedDSN, err := prepareDSN(dataSourceName, tlsInfo)
if err != nil {
return nil, err
Expand All @@ -54,7 +56,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
return nil, err
}

dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true)
dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true, metricsRegisterer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -82,6 +84,15 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
}
return err
}
dialect.ErrCode = func(err error) string {
if err == nil {
return ""
}
if err, ok := err.(*pq.Error); ok {
return string(err.Code)
}
return err.Error()
}

if err := setup(dialect.DB); err != nil {
return nil, err
Expand All @@ -95,7 +106,7 @@ func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")

for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
return err
Expand Down Expand Up @@ -136,7 +147,7 @@ func createDBIfNotExist(dataSourceName string) error {
}
defer db.Close()
stmt := createDB + dbName + ";"
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err = db.Exec(stmt)
if err != nil {
return err
Expand Down

0 comments on commit d1727ab

Please sign in to comment.