Skip to content

Commit

Permalink
introduce metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Zach Zhu <zzqshu@126.com>
  • Loading branch information
zqzten committed May 25, 2022
1 parent dc877c3 commit ff97cfb
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 23 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -12,6 +12,7 @@ require (
github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee
github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/rancher/wrangler v0.8.3
github.com/shengdoushi/base58 v1.0.0
github.com/sirupsen/logrus v1.7.0
Expand Down
21 changes: 20 additions & 1 deletion main.go
Expand Up @@ -5,16 +5,20 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/endpoint"
"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/version"
"github.com/rancher/wrangler/pkg/signals"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

var (
config endpoint.Config
config endpoint.Config
metricsConfig metrics.Config
)

func main() {
Expand Down Expand Up @@ -76,6 +80,18 @@ func main() {
Usage: "Key file for DB connection",
Destination: &config.BackendTLSConfig.KeyFile,
},
cli.StringFlag{
Name: "metrics-bind-address",
Usage: "The address the metric endpoint binds to. Default :8080, set 0 to disable metrics serving.",
Destination: &metricsConfig.ServerAddress,
Value: ":8080",
},
cli.DurationFlag{
Name: "slow-sql-threshold",
Usage: "The duration which SQL executed longer than will be logged. Default 1s, set <= 0 to disable slow SQL log.",
Destination: &generic.SlowSQLThreshold,
Value: time.Second,
},
cli.BoolFlag{Name: "debug"},
}
app.Action = run
Expand All @@ -92,6 +108,9 @@ func run(c *cli.Context) error {
logrus.SetLevel(logrus.TraceLevel)
}
ctx := signals.SetupSignalHandler(context.Background())
metricsConfig.ServerTLSConfig = config.ServerTLSConfig
go metrics.Serve(ctx, metricsConfig)
config.MetricsRegisterer = metrics.Registry
_, err := endpoint.Listen(ctx, config)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions pkg/drivers/dqlite/dqlite.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/k3s-io/kine/pkg/drivers/sqlite"
"github.com/k3s-io/kine/pkg/server"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -68,7 +69,7 @@ outer:
return nil
}

func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
opts, err := parseOpts(datasourceName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -97,7 +98,7 @@ func New(ctx context.Context, datasourceName string, connPoolConfig generic.Conn
}

sql.Register("dqlite", d)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig, metricsRegisterer)
if err != nil {
return nil, errors.Wrap(err, "sqlite client")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/drivers/dqlite/no_dqlite.go
Expand Up @@ -9,8 +9,9 @@ import (

"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/server"
"github.com/prometheus/client_golang/prometheus"
)

func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
return nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`)
}
22 changes: 19 additions & 3 deletions pkg/drivers/generic/generic.go
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/k3s-io/kine/pkg/server"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -179,7 +181,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
return db, nil
}

func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) {
func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool, metricsRegisterer prometheus.Registerer) (*Generic, error) {
var (
db *sql.DB
err error
Expand All @@ -201,6 +203,10 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig

configureConnectionPooling(connPoolConfig, db, driverName)

if metricsRegisterer != nil {
metricsRegisterer.MustRegister(collectors.NewDBStatsCollector(db, "kine"))
}

return &Generic{
DB: db,

Expand Down Expand Up @@ -248,13 +254,21 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
}, err
}

func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) {
logrus.Tracef("QUERY %v : %s", args, Stripped(sql))
startTime := time.Now()
defer func() {
ObserveSQL(startTime, err == nil, Stripped(sql), args)
}()
return d.DB.QueryContext(ctx, sql, args...)
}

func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row {
func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) {
logrus.Tracef("QUERY ROW %v : %s", args, Stripped(sql))
startTime := time.Now()
defer func() {
ObserveSQL(startTime, result.Err() == nil, Stripped(sql), args)
}()
return d.DB.QueryRowContext(ctx, sql, args...)
}

Expand All @@ -267,7 +281,9 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{})
wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond))
for i := uint(0); i < 20; i++ {
logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
startTime := time.Now()
result, err = d.DB.ExecContext(ctx, sql, args...)
ObserveSQL(startTime, err == nil, Stripped(sql), args)
if err != nil && d.Retry != nil && d.Retry(err) {
wait(i)
continue
Expand Down
46 changes: 46 additions & 0 deletions pkg/drivers/generic/metrics.go
@@ -0,0 +1,46 @@
package generic

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

const (
LabelSuccess = "success"
LabelError = "error"
)

var (
SQLTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "kine_sql_total",
Help: "Total number of SQL operations",
}, []string{"result"})

SQLTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "kine_sql_time_seconds",
Help: "Length of time per SQL operation",
Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0,
1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30},
}, []string{"result"})
)

var (
// SlowSQLThreshold is a duration which SQL executed longer than will be logged.
// This can be directly modified to override the default value when kine is used as a library.
SlowSQLThreshold = time.Second
)

func ObserveSQL(start time.Time, success bool, sql Stripped, args ...interface{}) {
result := LabelSuccess
if !success {
result = LabelError
}
SQLTotal.WithLabelValues(result).Inc()
duration := time.Since(start)
SQLTime.WithLabelValues(result).Observe(duration.Seconds())
if SlowSQLThreshold > 0 && duration >= SlowSQLThreshold {
logrus.Infof("Slow SQL (started: %v) (total time: %v): %s : %v", start, duration, sql, args)
}
}
17 changes: 15 additions & 2 deletions pkg/drivers/generic/tx.go
Expand Up @@ -3,6 +3,7 @@ package generic
import (
"context"
"database/sql"
"time"

"github.com/k3s-io/kine/pkg/server"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -97,17 +98,29 @@ func (t *Tx) CurrentRevision(ctx context.Context) (int64, error) {
return id, err
}

func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) {
logrus.Tracef("TX QUERY %v : %s", args, Stripped(sql))
startTime := time.Now()
defer func() {
ObserveSQL(startTime, err == nil, Stripped(sql), args)
}()
return t.x.QueryContext(ctx, sql, args...)
}

func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row {
func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) {
logrus.Tracef("TX QUERY ROW %v : %s", args, Stripped(sql))
startTime := time.Now()
defer func() {
ObserveSQL(startTime, result.Err() == nil, Stripped(sql), args)
}()
return t.x.QueryRowContext(ctx, sql, args...)
}

func (t *Tx) execute(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) {
logrus.Tracef("TX EXEC %v : %s", args, Stripped(sql))
startTime := time.Now()
defer func() {
ObserveSQL(startTime, err == nil, Stripped(sql), args)
}()
return t.x.ExecContext(ctx, sql, args...)
}
5 changes: 3 additions & 2 deletions pkg/drivers/mysql/mysql.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/k3s-io/kine/pkg/logstructured/sqllog"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/tls"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -43,7 +44,7 @@ var (
createDB = "CREATE DATABASE IF NOT EXISTS "
)

func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
Expand All @@ -62,7 +63,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
return nil, err
}

dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false)
dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false, metricsRegisterer)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/drivers/pgsql/pgsql.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/tls"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -44,7 +45,7 @@ var (
createDB = "CREATE DATABASE "
)

func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
parsedDSN, err := prepareDSN(dataSourceName, tlsInfo)
if err != nil {
return nil, err
Expand All @@ -54,7 +55,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
return nil, err
}

dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true)
dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true, metricsRegisterer)
if err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/drivers/sqlite/sqlite.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/k3s-io/kine/pkg/server"
"github.com/mattn/go-sqlite3"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"

// sqlite db driver
Expand Down Expand Up @@ -44,20 +45,20 @@ var (
}
)

func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig)
func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig, metricsRegisterer)
return backend, err
}

func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, *generic.Generic, error) {
if dataSourceName == "" {
if err := os.MkdirAll("./db", 0700); err != nil {
return nil, nil, err
}
dataSourceName = "./db/state.db?_journal=WAL&cache=shared"
}

dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false)
dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false, metricsRegisterer)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/drivers/sqlite/sqlite_nocgo.go
Expand Up @@ -10,15 +10,16 @@ import (

"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/server"
"github.com/prometheus/client_golang/prometheus"
)

var errNoCgo = errors.New("this binary is built without CGO, sqlite is disabled")

func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
return nil, errNoCgo
}

func NewVariant(driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
func NewVariant(driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, *generic.Generic, error) {
return nil, nil, errNoCgo
}

Expand Down

0 comments on commit ff97cfb

Please sign in to comment.