From d1727ab7ed4565fc28090c82cb457c1877358cb0 Mon Sep 17 00:00:00 2001 From: Zach Zhu Date: Tue, 24 May 2022 22:39:47 +0800 Subject: [PATCH] introduce metrics Signed-off-by: Zach Zhu --- go.mod | 1 + main.go | 20 +++++++++- pkg/drivers/dqlite/dqlite.go | 5 ++- pkg/drivers/dqlite/no_dqlite.go | 3 +- pkg/drivers/generic/generic.go | 39 ++++++++++++------ pkg/drivers/generic/tx.go | 25 +++++++++--- pkg/drivers/mysql/mysql.go | 18 +++++++-- pkg/drivers/pgsql/pgsql.go | 19 +++++++-- pkg/drivers/sqlite/sqlite.go | 22 ++++++++--- pkg/drivers/sqlite/sqlite_nocgo.go | 5 ++- pkg/endpoint/endpoint.go | 19 +++++++-- pkg/logstructured/sqllog/sql.go | 4 ++ pkg/metrics/metrics.go | 48 +++++++++++++++++++++++ pkg/metrics/registry.go | 22 +++++++++++ pkg/metrics/server.go | 63 ++++++++++++++++++++++++++++++ pkg/util/string.go | 13 ++++++ 16 files changed, 286 insertions(+), 40 deletions(-) create mode 100644 pkg/metrics/metrics.go create mode 100644 pkg/metrics/registry.go create mode 100644 pkg/metrics/server.go create mode 100644 pkg/util/string.go diff --git a/go.mod b/go.mod index d586c4b7..a2192bfb 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.11.0 github.com/rancher/wrangler v0.8.3 github.com/shengdoushi/base58 v1.0.0 github.com/sirupsen/logrus v1.7.0 diff --git a/main.go b/main.go index e8c9a9d8..eb1e6b5d 100644 --- a/main.go +++ b/main.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "os" + "time" "github.com/k3s-io/kine/pkg/endpoint" + "github.com/k3s-io/kine/pkg/metrics" "github.com/k3s-io/kine/pkg/version" "github.com/rancher/wrangler/pkg/signals" "github.com/sirupsen/logrus" @@ -14,7 +16,8 @@ import ( ) var ( - config endpoint.Config + config endpoint.Config + metricsConfig metrics.Config ) func main() { @@ -76,6 +79,18 @@ func main() { Usage: "Key file for DB connection", Destination: &config.BackendTLSConfig.KeyFile, }, + cli.StringFlag{ + Name: "metrics-bind-address", + Usage: "The address the metric endpoint binds to. Default :8080, set 0 to disable metrics serving.", + Destination: &metricsConfig.ServerAddress, + Value: ":8080", + }, + cli.DurationFlag{ + Name: "slow-sql-threshold", + Usage: "The duration which SQL executed longer than will be logged. Default 1s, set <= 0 to disable slow SQL log.", + Destination: &metrics.SlowSQLThreshold, + Value: time.Second, + }, cli.BoolFlag{Name: "debug"}, } app.Action = run @@ -92,6 +107,9 @@ func run(c *cli.Context) error { logrus.SetLevel(logrus.TraceLevel) } ctx := signals.SetupSignalHandler(context.Background()) + metricsConfig.ServerTLSConfig = config.ServerTLSConfig + go metrics.Serve(ctx, metricsConfig) + config.MetricsRegisterer = metrics.Registry _, err := endpoint.Listen(ctx, config) if err != nil { return err diff --git a/pkg/drivers/dqlite/dqlite.go b/pkg/drivers/dqlite/dqlite.go index efd203b6..9c7602dd 100644 --- a/pkg/drivers/dqlite/dqlite.go +++ b/pkg/drivers/dqlite/dqlite.go @@ -19,6 +19,7 @@ import ( "github.com/k3s-io/kine/pkg/drivers/sqlite" "github.com/k3s-io/kine/pkg/server" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -68,7 +69,7 @@ outer: return nil } -func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { opts, err := parseOpts(datasourceName) if err != nil { return nil, err @@ -97,7 +98,7 @@ func New(ctx context.Context, datasourceName string, connPoolConfig generic.Conn } sql.Register("dqlite", d) - backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig) + backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig, metricsRegisterer) if err != nil { return nil, errors.Wrap(err, "sqlite client") } diff --git a/pkg/drivers/dqlite/no_dqlite.go b/pkg/drivers/dqlite/no_dqlite.go index b0f48a41..751e9d78 100644 --- a/pkg/drivers/dqlite/no_dqlite.go +++ b/pkg/drivers/dqlite/no_dqlite.go @@ -9,8 +9,9 @@ import ( "github.com/k3s-io/kine/pkg/drivers/generic" "github.com/k3s-io/kine/pkg/server" + "github.com/prometheus/client_golang/prometheus" ) -func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { return nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`) } diff --git a/pkg/drivers/generic/generic.go b/pkg/drivers/generic/generic.go index 864c5d33..54e71344 100644 --- a/pkg/drivers/generic/generic.go +++ b/pkg/drivers/generic/generic.go @@ -13,7 +13,11 @@ import ( "github.com/Rican7/retry/backoff" "github.com/Rican7/retry/strategy" + "github.com/k3s-io/kine/pkg/metrics" "github.com/k3s-io/kine/pkg/server" + "github.com/k3s-io/kine/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" "github.com/sirupsen/logrus" ) @@ -66,15 +70,9 @@ var ( `, revSQL, compactRevSQL, columns) ) -type Stripped string - -func (s Stripped) String() string { - str := strings.ReplaceAll(string(s), "\n", "") - return regexp.MustCompile("[\t ]+").ReplaceAllString(str, " ") -} - type ErrRetry func(error) bool type TranslateErr func(error) error +type ErrCode func(error) string type ConnectionPoolConfig struct { MaxIdle int // zero means defaultMaxIdleConns; negative means 0 @@ -105,6 +103,7 @@ type Generic struct { GetSizeSQL string Retry ErrRetry TranslateErr TranslateErr + ErrCode ErrCode } func q(sql, param string, numbered bool) string { @@ -179,7 +178,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) { return db, nil } -func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) { +func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool, metricsRegisterer prometheus.Registerer) (*Generic, error) { var ( db *sql.DB err error @@ -201,6 +200,10 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig configureConnectionPooling(connPoolConfig, db, driverName) + if metricsRegisterer != nil { + metricsRegisterer.MustRegister(collectors.NewDBStatsCollector(db, "kine")) + } + return &Generic{ DB: db, @@ -248,13 +251,21 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig }, err } -func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { - logrus.Tracef("QUERY %v : %s", args, Stripped(sql)) +func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) { + logrus.Tracef("QUERY %v : %s", args, util.Stripped(sql)) + startTime := time.Now() + defer func() { + metrics.ObserveSQL(startTime, d.ErrCode(err), util.Stripped(sql), args) + }() return d.DB.QueryContext(ctx, sql, args...) } -func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row { - logrus.Tracef("QUERY ROW %v : %s", args, Stripped(sql)) +func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) { + logrus.Tracef("QUERY ROW %v : %s", args, util.Stripped(sql)) + startTime := time.Now() + defer func() { + metrics.ObserveSQL(startTime, d.ErrCode(result.Err()), util.Stripped(sql), args) + }() return d.DB.QueryRowContext(ctx, sql, args...) } @@ -266,8 +277,10 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{}) wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond)) for i := uint(0); i < 20; i++ { - logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql)) + logrus.Tracef("EXEC (try: %d) %v : %s", i, args, util.Stripped(sql)) + startTime := time.Now() result, err = d.DB.ExecContext(ctx, sql, args...) + metrics.ObserveSQL(startTime, d.ErrCode(err), util.Stripped(sql), args) if err != nil && d.Retry != nil && d.Retry(err) { wait(i) continue diff --git a/pkg/drivers/generic/tx.go b/pkg/drivers/generic/tx.go index 092e8107..a67a105a 100644 --- a/pkg/drivers/generic/tx.go +++ b/pkg/drivers/generic/tx.go @@ -3,8 +3,11 @@ package generic import ( "context" "database/sql" + "time" + "github.com/k3s-io/kine/pkg/metrics" "github.com/k3s-io/kine/pkg/server" + "github.com/k3s-io/kine/pkg/util" "github.com/sirupsen/logrus" ) @@ -97,17 +100,29 @@ func (t *Tx) CurrentRevision(ctx context.Context) (int64, error) { return id, err } -func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { - logrus.Tracef("TX QUERY %v : %s", args, Stripped(sql)) +func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) { + logrus.Tracef("TX QUERY %v : %s", args, util.Stripped(sql)) + startTime := time.Now() + defer func() { + metrics.ObserveSQL(startTime, t.d.ErrCode(err), util.Stripped(sql), args) + }() return t.x.QueryContext(ctx, sql, args...) } -func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row { - logrus.Tracef("TX QUERY ROW %v : %s", args, Stripped(sql)) +func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) { + logrus.Tracef("TX QUERY ROW %v : %s", args, util.Stripped(sql)) + startTime := time.Now() + defer func() { + metrics.ObserveSQL(startTime, t.d.ErrCode(result.Err()), util.Stripped(sql), args) + }() return t.x.QueryRowContext(ctx, sql, args...) } func (t *Tx) execute(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) { - logrus.Tracef("TX EXEC %v : %s", args, Stripped(sql)) + logrus.Tracef("TX EXEC %v : %s", args, util.Stripped(sql)) + startTime := time.Now() + defer func() { + metrics.ObserveSQL(startTime, t.d.ErrCode(err), util.Stripped(sql), args) + }() return t.x.ExecContext(ctx, sql, args...) } diff --git a/pkg/drivers/mysql/mysql.go b/pkg/drivers/mysql/mysql.go index 0e7c1629..763955fb 100644 --- a/pkg/drivers/mysql/mysql.go +++ b/pkg/drivers/mysql/mysql.go @@ -4,6 +4,7 @@ import ( "context" cryptotls "crypto/tls" "database/sql" + "fmt" "github.com/go-sql-driver/mysql" "github.com/k3s-io/kine/pkg/drivers/generic" @@ -11,6 +12,8 @@ import ( "github.com/k3s-io/kine/pkg/logstructured/sqllog" "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" + "github.com/k3s-io/kine/pkg/util" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -43,7 +46,7 @@ var ( createDB = "CREATE DATABASE IF NOT EXISTS " ) -func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { tlsConfig, err := tlsInfo.ClientConfig() if err != nil { return nil, err @@ -62,7 +65,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo return nil, err } - dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false) + dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false, metricsRegisterer) if err != nil { return nil, err } @@ -95,6 +98,15 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo } return err } + dialect.ErrCode = func(err error) string { + if err == nil { + return "" + } + if err, ok := err.(*mysql.MySQLError); ok { + return fmt.Sprint(err.Number) + } + return err.Error() + } if err := setup(dialect.DB); err != nil { return nil, err } @@ -107,7 +119,7 @@ func setup(db *sql.DB) error { logrus.Infof("Configuring database table schema and indexes, this may take a moment...") for _, stmt := range schema { - logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt)) + logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt)) _, err := db.Exec(stmt) if err != nil { if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 { diff --git a/pkg/drivers/pgsql/pgsql.go b/pkg/drivers/pgsql/pgsql.go index 328b800f..4fe3d297 100644 --- a/pkg/drivers/pgsql/pgsql.go +++ b/pkg/drivers/pgsql/pgsql.go @@ -13,7 +13,9 @@ import ( "github.com/k3s-io/kine/pkg/logstructured/sqllog" "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" + "github.com/k3s-io/kine/pkg/util" "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -44,7 +46,7 @@ var ( createDB = "CREATE DATABASE " ) -func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { parsedDSN, err := prepareDSN(dataSourceName, tlsInfo) if err != nil { return nil, err @@ -54,7 +56,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo return nil, err } - dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true) + dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true, metricsRegisterer) if err != nil { return nil, err } @@ -82,6 +84,15 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo } return err } + dialect.ErrCode = func(err error) string { + if err == nil { + return "" + } + if err, ok := err.(*pq.Error); ok { + return string(err.Code) + } + return err.Error() + } if err := setup(dialect.DB); err != nil { return nil, err @@ -95,7 +106,7 @@ func setup(db *sql.DB) error { logrus.Infof("Configuring database table schema and indexes, this may take a moment...") for _, stmt := range schema { - logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt)) + logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt)) _, err := db.Exec(stmt) if err != nil { return err @@ -136,7 +147,7 @@ func createDBIfNotExist(dataSourceName string) error { } defer db.Close() stmt := createDB + dbName + ";" - logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt)) + logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt)) _, err = db.Exec(stmt) if err != nil { return err diff --git a/pkg/drivers/sqlite/sqlite.go b/pkg/drivers/sqlite/sqlite.go index 2fb9b1b2..17973ec9 100644 --- a/pkg/drivers/sqlite/sqlite.go +++ b/pkg/drivers/sqlite/sqlite.go @@ -6,6 +6,7 @@ package sqlite import ( "context" "database/sql" + "fmt" "os" "time" @@ -13,8 +14,10 @@ import ( "github.com/k3s-io/kine/pkg/logstructured" "github.com/k3s-io/kine/pkg/logstructured/sqllog" "github.com/k3s-io/kine/pkg/server" + "github.com/k3s-io/kine/pkg/util" "github.com/mattn/go-sqlite3" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" // sqlite db driver @@ -44,12 +47,12 @@ var ( } ) -func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { - backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig) +func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { + backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig, metricsRegisterer) return backend, err } -func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) { +func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, *generic.Generic, error) { if dataSourceName == "" { if err := os.MkdirAll("./db", 0700); err != nil { return nil, nil, err @@ -57,7 +60,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connPool dataSourceName = "./db/state.db?_journal=WAL&cache=shared" } - dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false) + dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false, metricsRegisterer) if err != nil { return nil, nil, err } @@ -88,6 +91,15 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connPool } return err } + dialect.ErrCode = func(err error) string { + if err == nil { + return "" + } + if err, ok := err.(sqlite3.Error); ok { + return fmt.Sprint(err.ExtendedCode) + } + return err.Error() + } // this is the first SQL that will be executed on a new DB conn so // loop on failure here because in the case of dqlite it could still be initializing @@ -116,7 +128,7 @@ func setup(db *sql.DB) error { logrus.Infof("Configuring database table schema and indexes, this may take a moment...") for _, stmt := range schema { - logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt)) + logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt)) _, err := db.Exec(stmt) if err != nil { return err diff --git a/pkg/drivers/sqlite/sqlite_nocgo.go b/pkg/drivers/sqlite/sqlite_nocgo.go index 31062cdd..d730b406 100644 --- a/pkg/drivers/sqlite/sqlite_nocgo.go +++ b/pkg/drivers/sqlite/sqlite_nocgo.go @@ -10,15 +10,16 @@ import ( "github.com/k3s-io/kine/pkg/drivers/generic" "github.com/k3s-io/kine/pkg/server" + "github.com/prometheus/client_golang/prometheus" ) var errNoCgo = errors.New("this binary is built without CGO, sqlite is disabled") -func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { return nil, errNoCgo } -func NewVariant(driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) { +func NewVariant(driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, *generic.Generic, error) { return nil, nil, errNoCgo } diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index bee1cf86..189a9cad 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -13,9 +13,11 @@ import ( "github.com/k3s-io/kine/pkg/drivers/mysql" "github.com/k3s-io/kine/pkg/drivers/pgsql" "github.com/k3s-io/kine/pkg/drivers/sqlite" + "github.com/k3s-io/kine/pkg/metrics" "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "github.com/soheilhy/cmux" "go.etcd.io/etcd/server/v3/embed" @@ -41,6 +43,7 @@ type Config struct { ConnectionPoolConfig generic.ConnectionPoolConfig ServerTLSConfig tls.Config BackendTLSConfig tls.Config + MetricsRegisterer prometheus.Registerer } type ETCDConfig struct { @@ -64,6 +67,14 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { return ETCDConfig{}, errors.Wrap(err, "building kine") } + if config.MetricsRegisterer != nil { + config.MetricsRegisterer.MustRegister( + metrics.SQLTotal, + metrics.SQLTime, + metrics.CompactTotal, + ) + } + if err := backend.Start(ctx); err != nil { return ETCDConfig{}, errors.Wrap(err, "starting kine backend") } @@ -228,13 +239,13 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) switch driver { case SQLiteBackend: leaderElect = false - backend, err = sqlite.New(ctx, dsn, cfg.ConnectionPoolConfig) + backend, err = sqlite.New(ctx, dsn, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case DQLiteBackend: - backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig) + backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case PostgresBackend: - backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) + backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case MySQLBackend: - backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) + backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case JetStreamBackend: backend, err = jetstream.New(ctx, dsn, cfg.BackendTLSConfig) default: diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index 8f137993..8018ab5c 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -7,6 +7,7 @@ import ( "time" "github.com/k3s-io/kine/pkg/broadcaster" + "github.com/k3s-io/kine/pkg/metrics" "github.com/k3s-io/kine/pkg/server" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -150,6 +151,7 @@ outer: break } else { logrus.Errorf("Compact failed: %v", err) + metrics.CompactTotal.WithLabelValues(metrics.ResultError).Inc() continue outer } } @@ -162,6 +164,8 @@ outer: // Record the final results for the outer loop compactRev = compactedRev targetCompactRev = currentRev + + metrics.CompactTotal.WithLabelValues(metrics.ResultSuccess).Inc() } } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 00000000..1eac3621 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,48 @@ +package metrics + +import ( + "time" + + "github.com/k3s-io/kine/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +const ( + ResultSuccess = "success" + ResultError = "error" +) + +var ( + SQLTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kine_sql_total", + Help: "Total number of SQL operations", + }, []string{"error_code"}) + + SQLTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "kine_sql_time_seconds", + Help: "Length of time per SQL operation", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, + 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30}, + }, []string{"error_code"}) + + CompactTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kine_compact_total", + Help: "Total number of compactions", + }, []string{"result"}) +) + +var ( + // SlowSQLThreshold is a duration which SQL executed longer than will be logged. + // This can be directly modified to override the default value when kine is used as a library. + SlowSQLThreshold = time.Second +) + +func ObserveSQL(start time.Time, errCode string, sql util.Stripped, args ...interface{}) { + SQLTotal.WithLabelValues(errCode).Inc() + duration := time.Since(start) + SQLTime.WithLabelValues(errCode).Observe(duration.Seconds()) + if SlowSQLThreshold > 0 && duration >= SlowSQLThreshold { + logrus.Infof("Slow SQL (started: %v) (total time: %v): %s : %v", start, duration, sql, args) + } +} diff --git a/pkg/metrics/registry.go b/pkg/metrics/registry.go new file mode 100644 index 00000000..159c457f --- /dev/null +++ b/pkg/metrics/registry.go @@ -0,0 +1,22 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" +) + +type RegistererGatherer interface { + prometheus.Registerer + prometheus.Gatherer +} + +var Registry RegistererGatherer = prometheus.NewRegistry() + +func init() { + Registry.MustRegister( + // expose process metrics like CPU, Memory, file descriptor usage etc. + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + // expose Go runtime metrics like GC stats, memory stats etc. + collectors.NewGoCollector(), + ) +} diff --git a/pkg/metrics/server.go b/pkg/metrics/server.go new file mode 100644 index 00000000..c8d0a5b1 --- /dev/null +++ b/pkg/metrics/server.go @@ -0,0 +1,63 @@ +package metrics + +import ( + "context" + "net" + "net/http" + + "github.com/k3s-io/kine/pkg/tls" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +type Config struct { + ServerAddress string + ServerTLSConfig tls.Config +} + +const ( + defaultBindAddress = ":8080" + metricsPath = "/metrics" +) + +func Serve(ctx context.Context, config Config) { + if config.ServerAddress == "" { + config.ServerAddress = defaultBindAddress + } + if config.ServerAddress == "0" { + return + } + + logrus.Infof("metrics server is starting to listen at %s", config.ServerAddress) + listener, err := net.Listen("tcp", config.ServerAddress) + if err != nil { + logrus.Fatalf("error creating the metrics listener: %v", err) + } + + handler := promhttp.HandlerFor(Registry, promhttp.HandlerOpts{ + ErrorHandling: promhttp.HTTPErrorOnError, + }) + mux := http.NewServeMux() + mux.Handle(metricsPath, handler) + server := http.Server{ + Handler: mux, + } + + go func() { + logrus.Infof("starting metrics server path %s", metricsPath) + var err error + if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" { + err = server.ServeTLS(listener, config.ServerTLSConfig.CertFile, config.ServerTLSConfig.KeyFile) + } else { + err = server.Serve(listener) + } + if err != nil && err != http.ErrServerClosed { + logrus.Fatalf("error starting the metrics server: %v", err) + } + }() + + <-ctx.Done() + if err := server.Shutdown(context.Background()); err != nil { + logrus.Fatalf("error shutting down the metrics server: %v", err) + } +} diff --git a/pkg/util/string.go b/pkg/util/string.go new file mode 100644 index 00000000..67741b17 --- /dev/null +++ b/pkg/util/string.go @@ -0,0 +1,13 @@ +package util + +import ( + "regexp" + "strings" +) + +type Stripped string + +func (s Stripped) String() string { + str := strings.ReplaceAll(string(s), "\n", "") + return regexp.MustCompile("[\t ]+").ReplaceAllString(str, " ") +}