diff --git a/go.mod b/go.mod index d586c4b7..a2192bfb 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.11.0 github.com/rancher/wrangler v0.8.3 github.com/shengdoushi/base58 v1.0.0 github.com/sirupsen/logrus v1.7.0 diff --git a/main.go b/main.go index e8c9a9d8..8a0d6294 100644 --- a/main.go +++ b/main.go @@ -5,8 +5,11 @@ import ( "errors" "fmt" "os" + "time" + "github.com/k3s-io/kine/pkg/drivers/generic" "github.com/k3s-io/kine/pkg/endpoint" + "github.com/k3s-io/kine/pkg/metrics" "github.com/k3s-io/kine/pkg/version" "github.com/rancher/wrangler/pkg/signals" "github.com/sirupsen/logrus" @@ -14,7 +17,8 @@ import ( ) var ( - config endpoint.Config + config endpoint.Config + metricsConfig metrics.Config ) func main() { @@ -76,6 +80,18 @@ func main() { Usage: "Key file for DB connection", Destination: &config.BackendTLSConfig.KeyFile, }, + cli.StringFlag{ + Name: "metrics-bind-address", + Usage: "The address the metric endpoint binds to. Default :8080, set 0 to disable metrics serving.", + Destination: &metricsConfig.ServerAddress, + Value: ":8080", + }, + cli.DurationFlag{ + Name: "slow-sql-threshold", + Usage: "The duration which SQL executed longer than will be logged. Default 1s, set <= 0 to disable slow SQL log.", + Destination: &generic.SlowSQLThreshold, + Value: time.Second, + }, cli.BoolFlag{Name: "debug"}, } app.Action = run @@ -92,6 +108,9 @@ func run(c *cli.Context) error { logrus.SetLevel(logrus.TraceLevel) } ctx := signals.SetupSignalHandler(context.Background()) + metricsConfig.ServerTLSConfig = config.ServerTLSConfig + go metrics.Serve(ctx, metricsConfig) + config.MetricsRegisterer = metrics.Registry _, err := endpoint.Listen(ctx, config) if err != nil { return err diff --git a/pkg/drivers/dqlite/dqlite.go b/pkg/drivers/dqlite/dqlite.go index efd203b6..9c7602dd 100644 --- a/pkg/drivers/dqlite/dqlite.go +++ b/pkg/drivers/dqlite/dqlite.go @@ -19,6 +19,7 @@ import ( "github.com/k3s-io/kine/pkg/drivers/sqlite" "github.com/k3s-io/kine/pkg/server" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -68,7 +69,7 @@ outer: return nil } -func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { opts, err := parseOpts(datasourceName) if err != nil { return nil, err @@ -97,7 +98,7 @@ func New(ctx context.Context, datasourceName string, connPoolConfig generic.Conn } sql.Register("dqlite", d) - backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig) + backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig, metricsRegisterer) if err != nil { return nil, errors.Wrap(err, "sqlite client") } diff --git a/pkg/drivers/dqlite/no_dqlite.go b/pkg/drivers/dqlite/no_dqlite.go index b0f48a41..751e9d78 100644 --- a/pkg/drivers/dqlite/no_dqlite.go +++ b/pkg/drivers/dqlite/no_dqlite.go @@ -9,8 +9,9 @@ import ( "github.com/k3s-io/kine/pkg/drivers/generic" "github.com/k3s-io/kine/pkg/server" + "github.com/prometheus/client_golang/prometheus" ) -func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { return nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`) } diff --git a/pkg/drivers/generic/generic.go b/pkg/drivers/generic/generic.go index 864c5d33..56549f15 100644 --- a/pkg/drivers/generic/generic.go +++ b/pkg/drivers/generic/generic.go @@ -14,6 +14,8 @@ import ( "github.com/Rican7/retry/backoff" "github.com/Rican7/retry/strategy" "github.com/k3s-io/kine/pkg/server" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" "github.com/sirupsen/logrus" ) @@ -179,7 +181,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) { return db, nil } -func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) { +func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool, metricsRegisterer prometheus.Registerer) (*Generic, error) { var ( db *sql.DB err error @@ -201,6 +203,10 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig configureConnectionPooling(connPoolConfig, db, driverName) + if metricsRegisterer != nil { + metricsRegisterer.MustRegister(collectors.NewDBStatsCollector(db, "kine")) + } + return &Generic{ DB: db, @@ -248,13 +254,21 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig }, err } -func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { +func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) { logrus.Tracef("QUERY %v : %s", args, Stripped(sql)) + startTime := time.Now() + defer func() { + ObserveSQL(startTime, err == nil, Stripped(sql), args) + }() return d.DB.QueryContext(ctx, sql, args...) } -func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row { +func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) { logrus.Tracef("QUERY ROW %v : %s", args, Stripped(sql)) + startTime := time.Now() + defer func() { + ObserveSQL(startTime, result.Err() == nil, Stripped(sql), args) + }() return d.DB.QueryRowContext(ctx, sql, args...) } @@ -267,7 +281,9 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{}) wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond)) for i := uint(0); i < 20; i++ { logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql)) + startTime := time.Now() result, err = d.DB.ExecContext(ctx, sql, args...) + ObserveSQL(startTime, err == nil, Stripped(sql), args) if err != nil && d.Retry != nil && d.Retry(err) { wait(i) continue diff --git a/pkg/drivers/generic/metrics.go b/pkg/drivers/generic/metrics.go new file mode 100644 index 00000000..c2bd4576 --- /dev/null +++ b/pkg/drivers/generic/metrics.go @@ -0,0 +1,46 @@ +package generic + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +const ( + LabelSuccess = "success" + LabelError = "error" +) + +var ( + SQLTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kine_sql_total", + Help: "Total number of SQL operations", + }, []string{"result"}) + + SQLTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "kine_sql_time_seconds", + Help: "Length of time per SQL operation", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, + 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30}, + }, []string{"result"}) +) + +var ( + // SlowSQLThreshold is a duration which SQL executed longer than will be logged. + // This can be directly modified to override the default value when kine is used as a library. + SlowSQLThreshold = time.Second +) + +func ObserveSQL(start time.Time, success bool, sql Stripped, args ...interface{}) { + result := LabelSuccess + if !success { + result = LabelError + } + SQLTotal.WithLabelValues(result).Inc() + duration := time.Since(start) + SQLTime.WithLabelValues(result).Observe(duration.Seconds()) + if SlowSQLThreshold > 0 && duration >= SlowSQLThreshold { + logrus.Infof("Slow SQL (started: %v) (total time: %v): %s : %v", start, duration, sql, args) + } +} diff --git a/pkg/drivers/generic/tx.go b/pkg/drivers/generic/tx.go index 092e8107..11e39a74 100644 --- a/pkg/drivers/generic/tx.go +++ b/pkg/drivers/generic/tx.go @@ -3,6 +3,7 @@ package generic import ( "context" "database/sql" + "time" "github.com/k3s-io/kine/pkg/server" "github.com/sirupsen/logrus" @@ -97,17 +98,29 @@ func (t *Tx) CurrentRevision(ctx context.Context) (int64, error) { return id, err } -func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { +func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (result *sql.Rows, err error) { logrus.Tracef("TX QUERY %v : %s", args, Stripped(sql)) + startTime := time.Now() + defer func() { + ObserveSQL(startTime, err == nil, Stripped(sql), args) + }() return t.x.QueryContext(ctx, sql, args...) } -func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row { +func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) (result *sql.Row) { logrus.Tracef("TX QUERY ROW %v : %s", args, Stripped(sql)) + startTime := time.Now() + defer func() { + ObserveSQL(startTime, result.Err() == nil, Stripped(sql), args) + }() return t.x.QueryRowContext(ctx, sql, args...) } func (t *Tx) execute(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) { logrus.Tracef("TX EXEC %v : %s", args, Stripped(sql)) + startTime := time.Now() + defer func() { + ObserveSQL(startTime, err == nil, Stripped(sql), args) + }() return t.x.ExecContext(ctx, sql, args...) } diff --git a/pkg/drivers/mysql/mysql.go b/pkg/drivers/mysql/mysql.go index 0e7c1629..ca9b3da4 100644 --- a/pkg/drivers/mysql/mysql.go +++ b/pkg/drivers/mysql/mysql.go @@ -11,6 +11,7 @@ import ( "github.com/k3s-io/kine/pkg/logstructured/sqllog" "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -43,7 +44,7 @@ var ( createDB = "CREATE DATABASE IF NOT EXISTS " ) -func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { tlsConfig, err := tlsInfo.ClientConfig() if err != nil { return nil, err @@ -62,7 +63,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo return nil, err } - dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false) + dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false, metricsRegisterer) if err != nil { return nil, err } diff --git a/pkg/drivers/pgsql/pgsql.go b/pkg/drivers/pgsql/pgsql.go index 328b800f..f734a6ad 100644 --- a/pkg/drivers/pgsql/pgsql.go +++ b/pkg/drivers/pgsql/pgsql.go @@ -14,6 +14,7 @@ import ( "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -44,7 +45,7 @@ var ( createDB = "CREATE DATABASE " ) -func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { parsedDSN, err := prepareDSN(dataSourceName, tlsInfo) if err != nil { return nil, err @@ -54,7 +55,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo return nil, err } - dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true) + dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true, metricsRegisterer) if err != nil { return nil, err } diff --git a/pkg/drivers/sqlite/sqlite.go b/pkg/drivers/sqlite/sqlite.go index 2fb9b1b2..d87eb760 100644 --- a/pkg/drivers/sqlite/sqlite.go +++ b/pkg/drivers/sqlite/sqlite.go @@ -15,6 +15,7 @@ import ( "github.com/k3s-io/kine/pkg/server" "github.com/mattn/go-sqlite3" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" // sqlite db driver @@ -44,12 +45,12 @@ var ( } ) -func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { - backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig) +func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { + backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig, metricsRegisterer) return backend, err } -func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) { +func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, *generic.Generic, error) { if dataSourceName == "" { if err := os.MkdirAll("./db", 0700); err != nil { return nil, nil, err @@ -57,7 +58,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connPool dataSourceName = "./db/state.db?_journal=WAL&cache=shared" } - dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false) + dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false, metricsRegisterer) if err != nil { return nil, nil, err } diff --git a/pkg/drivers/sqlite/sqlite_nocgo.go b/pkg/drivers/sqlite/sqlite_nocgo.go index 31062cdd..d730b406 100644 --- a/pkg/drivers/sqlite/sqlite_nocgo.go +++ b/pkg/drivers/sqlite/sqlite_nocgo.go @@ -10,15 +10,16 @@ import ( "github.com/k3s-io/kine/pkg/drivers/generic" "github.com/k3s-io/kine/pkg/server" + "github.com/prometheus/client_golang/prometheus" ) var errNoCgo = errors.New("this binary is built without CGO, sqlite is disabled") -func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { +func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { return nil, errNoCgo } -func NewVariant(driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) { +func NewVariant(driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, *generic.Generic, error) { return nil, nil, errNoCgo } diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index bee1cf86..81412912 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -13,9 +13,11 @@ import ( "github.com/k3s-io/kine/pkg/drivers/mysql" "github.com/k3s-io/kine/pkg/drivers/pgsql" "github.com/k3s-io/kine/pkg/drivers/sqlite" + "github.com/k3s-io/kine/pkg/logstructured/sqllog" "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "github.com/soheilhy/cmux" "go.etcd.io/etcd/server/v3/embed" @@ -41,6 +43,7 @@ type Config struct { ConnectionPoolConfig generic.ConnectionPoolConfig ServerTLSConfig tls.Config BackendTLSConfig tls.Config + MetricsRegisterer prometheus.Registerer } type ETCDConfig struct { @@ -64,6 +67,14 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { return ETCDConfig{}, errors.Wrap(err, "building kine") } + if config.MetricsRegisterer != nil { + config.MetricsRegisterer.MustRegister( + generic.SQLTotal, + generic.SQLTime, + sqllog.CompactTotal, + ) + } + if err := backend.Start(ctx); err != nil { return ETCDConfig{}, errors.Wrap(err, "starting kine backend") } @@ -228,13 +239,13 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) switch driver { case SQLiteBackend: leaderElect = false - backend, err = sqlite.New(ctx, dsn, cfg.ConnectionPoolConfig) + backend, err = sqlite.New(ctx, dsn, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case DQLiteBackend: - backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig) + backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case PostgresBackend: - backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) + backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case MySQLBackend: - backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) + backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case JetStreamBackend: backend, err = jetstream.New(ctx, dsn, cfg.BackendTLSConfig) default: diff --git a/pkg/logstructured/sqllog/metrics.go b/pkg/logstructured/sqllog/metrics.go new file mode 100644 index 00000000..6a18b733 --- /dev/null +++ b/pkg/logstructured/sqllog/metrics.go @@ -0,0 +1,15 @@ +package sqllog + +import "github.com/prometheus/client_golang/prometheus" + +const ( + LabelSuccess = "success" + LabelError = "error" +) + +var ( + CompactTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kine_compact_total", + Help: "Total number of compactions", + }, []string{"result"}) +) diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index 8f137993..242caa1d 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -150,6 +150,7 @@ outer: break } else { logrus.Errorf("Compact failed: %v", err) + CompactTotal.WithLabelValues(LabelError).Inc() continue outer } } @@ -162,6 +163,8 @@ outer: // Record the final results for the outer loop compactRev = compactedRev targetCompactRev = currentRev + + CompactTotal.WithLabelValues(LabelSuccess).Inc() } } diff --git a/pkg/metrics/registry.go b/pkg/metrics/registry.go new file mode 100644 index 00000000..159c457f --- /dev/null +++ b/pkg/metrics/registry.go @@ -0,0 +1,22 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" +) + +type RegistererGatherer interface { + prometheus.Registerer + prometheus.Gatherer +} + +var Registry RegistererGatherer = prometheus.NewRegistry() + +func init() { + Registry.MustRegister( + // expose process metrics like CPU, Memory, file descriptor usage etc. + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + // expose Go runtime metrics like GC stats, memory stats etc. + collectors.NewGoCollector(), + ) +} diff --git a/pkg/metrics/server.go b/pkg/metrics/server.go new file mode 100644 index 00000000..c8d0a5b1 --- /dev/null +++ b/pkg/metrics/server.go @@ -0,0 +1,63 @@ +package metrics + +import ( + "context" + "net" + "net/http" + + "github.com/k3s-io/kine/pkg/tls" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +type Config struct { + ServerAddress string + ServerTLSConfig tls.Config +} + +const ( + defaultBindAddress = ":8080" + metricsPath = "/metrics" +) + +func Serve(ctx context.Context, config Config) { + if config.ServerAddress == "" { + config.ServerAddress = defaultBindAddress + } + if config.ServerAddress == "0" { + return + } + + logrus.Infof("metrics server is starting to listen at %s", config.ServerAddress) + listener, err := net.Listen("tcp", config.ServerAddress) + if err != nil { + logrus.Fatalf("error creating the metrics listener: %v", err) + } + + handler := promhttp.HandlerFor(Registry, promhttp.HandlerOpts{ + ErrorHandling: promhttp.HTTPErrorOnError, + }) + mux := http.NewServeMux() + mux.Handle(metricsPath, handler) + server := http.Server{ + Handler: mux, + } + + go func() { + logrus.Infof("starting metrics server path %s", metricsPath) + var err error + if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" { + err = server.ServeTLS(listener, config.ServerTLSConfig.CertFile, config.ServerTLSConfig.KeyFile) + } else { + err = server.Serve(listener) + } + if err != nil && err != http.ErrServerClosed { + logrus.Fatalf("error starting the metrics server: %v", err) + } + }() + + <-ctx.Done() + if err := server.Shutdown(context.Background()); err != nil { + logrus.Fatalf("error shutting down the metrics server: %v", err) + } +}