Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce metrics #121

Merged
merged 1 commit into from Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -12,6 +12,7 @@ require (
github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee
github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/rancher/wrangler v0.8.3
github.com/shengdoushi/base58 v1.0.0
github.com/sirupsen/logrus v1.7.0
Expand Down
20 changes: 19 additions & 1 deletion main.go
Expand Up @@ -5,16 +5,19 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/k3s-io/kine/pkg/endpoint"
"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/version"
"github.com/rancher/wrangler/pkg/signals"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

var (
config endpoint.Config
config endpoint.Config
metricsConfig metrics.Config
)

func main() {
Expand Down Expand Up @@ -76,6 +79,18 @@ func main() {
Usage: "Key file for DB connection",
Destination: &config.BackendTLSConfig.KeyFile,
},
cli.StringFlag{
Name: "metrics-bind-address",
Usage: "The address the metric endpoint binds to. Default :8080, set 0 to disable metrics serving.",
Destination: &metricsConfig.ServerAddress,
Value: ":8080",
},
cli.DurationFlag{
Name: "slow-sql-threshold",
Usage: "The duration which SQL executed longer than will be logged. Default 1s, set <= 0 to disable slow SQL log.",
Destination: &metrics.SlowSQLThreshold,
Value: time.Second,
},
cli.BoolFlag{Name: "debug"},
}
app.Action = run
Expand All @@ -92,6 +107,9 @@ func run(c *cli.Context) error {
logrus.SetLevel(logrus.TraceLevel)
}
ctx := signals.SetupSignalHandler(context.Background())
metricsConfig.ServerTLSConfig = config.ServerTLSConfig
go metrics.Serve(ctx, metricsConfig)
config.MetricsRegisterer = metrics.Registry
_, err := endpoint.Listen(ctx, config)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions pkg/drivers/dqlite/dqlite.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/k3s-io/kine/pkg/drivers/sqlite"
"github.com/k3s-io/kine/pkg/server"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -68,7 +69,7 @@ outer:
return nil
}

func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
opts, err := parseOpts(datasourceName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -97,7 +98,7 @@ func New(ctx context.Context, datasourceName string, connPoolConfig generic.Conn
}

sql.Register("dqlite", d)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig, metricsRegisterer)
if err != nil {
return nil, errors.Wrap(err, "sqlite client")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/drivers/dqlite/no_dqlite.go
Expand Up @@ -9,8 +9,9 @@ import (

"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/server"
"github.com/prometheus/client_golang/prometheus"
)

func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
return nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`)
}
39 changes: 26 additions & 13 deletions pkg/drivers/generic/generic.go
Expand Up @@ -13,7 +13,11 @@ import (

"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -66,15 +70,9 @@ var (
`, revSQL, compactRevSQL, columns)
)

type Stripped string

func (s Stripped) String() string {
str := strings.ReplaceAll(string(s), "\n", "")
return regexp.MustCompile("[\t ]+").ReplaceAllString(str, " ")
}

type ErrRetry func(error) bool
type TranslateErr func(error) error
type ErrCode func(error) string

type ConnectionPoolConfig struct {
MaxIdle int // zero means defaultMaxIdleConns; negative means 0
Expand Down Expand Up @@ -105,6 +103,7 @@ type Generic struct {
GetSizeSQL string
Retry ErrRetry
TranslateErr TranslateErr
ErrCode ErrCode
}

func q(sql, param string, numbered bool) string {
Expand Down Expand Up @@ -179,7 +178,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
return db, nil
}

func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) {
func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool, metricsRegisterer prometheus.Registerer) (*Generic, error) {
var (
db *sql.DB
err error
Expand All @@ -201,6 +200,10 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig

configureConnectionPooling(connPoolConfig, db, driverName)

if metricsRegisterer != nil {
metricsRegisterer.MustRegister(collectors.NewDBStatsCollector(db, "kine"))
}

return &Generic{
DB: db,

Expand Down Expand Up @@ -248,13 +251,21 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
}, err
}

func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
logrus.Tracef("QUERY %v : %s", args, Stripped(sql))
func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) {
logrus.Tracef("QUERY %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, d.ErrCode(err), util.Stripped(sql), args)
}()
return d.DB.QueryContext(ctx, sql, args...)
}

func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row {
logrus.Tracef("QUERY ROW %v : %s", args, Stripped(sql))
func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) {
logrus.Tracef("QUERY ROW %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, d.ErrCode(result.Err()), util.Stripped(sql), args)
}()
return d.DB.QueryRowContext(ctx, sql, args...)
}

Expand All @@ -266,8 +277,10 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{})

wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond))
for i := uint(0); i < 20; i++ {
logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
logrus.Tracef("EXEC (try: %d) %v : %s", i, args, util.Stripped(sql))
startTime := time.Now()
result, err = d.DB.ExecContext(ctx, sql, args...)
metrics.ObserveSQL(startTime, d.ErrCode(err), util.Stripped(sql), args)
if err != nil && d.Retry != nil && d.Retry(err) {
wait(i)
continue
Expand Down
25 changes: 20 additions & 5 deletions pkg/drivers/generic/tx.go
Expand Up @@ -3,8 +3,11 @@ package generic
import (
"context"
"database/sql"
"time"

"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/util"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -97,17 +100,29 @@ func (t *Tx) CurrentRevision(ctx context.Context) (int64, error) {
return id, err
}

func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
logrus.Tracef("TX QUERY %v : %s", args, Stripped(sql))
func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) {
logrus.Tracef("TX QUERY %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, t.d.ErrCode(err), util.Stripped(sql), args)
}()
return t.x.QueryContext(ctx, sql, args...)
}

func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row {
logrus.Tracef("TX QUERY ROW %v : %s", args, Stripped(sql))
func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) {
logrus.Tracef("TX QUERY ROW %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, t.d.ErrCode(result.Err()), util.Stripped(sql), args)
}()
return t.x.QueryRowContext(ctx, sql, args...)
}

func (t *Tx) execute(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) {
logrus.Tracef("TX EXEC %v : %s", args, Stripped(sql))
logrus.Tracef("TX EXEC %v : %s", args, util.Stripped(sql))
startTime := time.Now()
defer func() {
metrics.ObserveSQL(startTime, t.d.ErrCode(err), util.Stripped(sql), args)
}()
return t.x.ExecContext(ctx, sql, args...)
}
18 changes: 15 additions & 3 deletions pkg/drivers/mysql/mysql.go
Expand Up @@ -4,13 +4,16 @@ import (
"context"
cryptotls "crypto/tls"
"database/sql"
"fmt"

"github.com/go-sql-driver/mysql"
"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/logstructured"
"github.com/k3s-io/kine/pkg/logstructured/sqllog"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/tls"
"github.com/k3s-io/kine/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -43,7 +46,7 @@ var (
createDB = "CREATE DATABASE IF NOT EXISTS "
)

func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
Expand All @@ -62,7 +65,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
return nil, err
}

dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false)
dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false, metricsRegisterer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -95,6 +98,15 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
}
return err
}
dialect.ErrCode = func(err error) string {
if err == nil {
return ""
}
if err, ok := err.(*mysql.MySQLError); ok {
return fmt.Sprint(err.Number)
}
return err.Error()
}
if err := setup(dialect.DB); err != nil {
return nil, err
}
Expand All @@ -107,7 +119,7 @@ func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")

for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
Expand Down
19 changes: 15 additions & 4 deletions pkg/drivers/pgsql/pgsql.go
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/k3s-io/kine/pkg/logstructured/sqllog"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/tls"
"github.com/k3s-io/kine/pkg/util"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -44,7 +46,7 @@ var (
createDB = "CREATE DATABASE "
)

func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
parsedDSN, err := prepareDSN(dataSourceName, tlsInfo)
if err != nil {
return nil, err
Expand All @@ -54,7 +56,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
return nil, err
}

dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true)
dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true, metricsRegisterer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -82,6 +84,15 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
}
return err
}
dialect.ErrCode = func(err error) string {
if err == nil {
return ""
}
if err, ok := err.(*pq.Error); ok {
return string(err.Code)
}
return err.Error()
}

if err := setup(dialect.DB); err != nil {
return nil, err
Expand All @@ -95,7 +106,7 @@ func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")

for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
return err
Expand Down Expand Up @@ -136,7 +147,7 @@ func createDBIfNotExist(dataSourceName string) error {
}
defer db.Close()
stmt := createDB + dbName + ";"
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err = db.Exec(stmt)
if err != nil {
return err
Expand Down