diff --git a/internal/backends/mysql/backend.go b/internal/backends/mysql/backend.go index 8234cff16919..5629f1d32e4e 100644 --- a/internal/backends/mysql/backend.go +++ b/internal/backends/mysql/backend.go @@ -15,7 +15,9 @@ package mysql import ( + "cmp" "context" + "slices" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -43,7 +45,14 @@ type NewBackendParams struct { // NewBackend creates a new Backend. func NewBackend(params *NewBackendParams) (backends.Backend, error) { - return nil, lazyerrors.New("not yet implemented") + r, err := metadata.NewRegistry(params.URI, params.L, params.P) + if err != nil { + return nil, err + } + + return backends.BackendContract(&backend{ + r: r, + }), nil } // Close implements backends.Backend interface. @@ -53,7 +62,56 @@ func (b *backend) Close() { // Status implements backends.Backend interface. func (b *backend) Status(ctx context.Context, params *backends.StatusParams) (*backends.StatusResult, error) { - return nil, lazyerrors.New("not yet implemented.") + dbs, err := b.r.DatabaseList(ctx) + if err != nil { + return nil, lazyerrors.Error(err) + } + + var res backends.StatusResult + + var pingSucceeded bool + + for _, dbName := range dbs { + var cs []*metadata.Collection + + if cs, err = b.r.CollectionList(ctx, dbName); err != nil { + return nil, lazyerrors.Error(err) + } + + res.CountCollections += int64(len(cs)) + + colls, err := newDatabase(b.r, dbName).ListCollections(ctx, new(backends.ListCollectionsParams)) + if err != nil { + return nil, lazyerrors.Error(err) + } + + for _, cInfo := range colls.Collections { + if cInfo.Capped() { + res.CountCappedCollections++ + } + } + + if pingSucceeded { + continue + } + + p, err := b.r.DatabaseGetExisting(ctx, dbName) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if p == nil { + continue + } + + if err = p.Ping(ctx); err != nil { + return nil, lazyerrors.Error(err) + } + + pingSucceeded = true + } + + return &res, nil } // Database implements backends.Backend interface. @@ -65,22 +123,62 @@ func (b *backend) Database(name string) (backends.Database, error) { // //nolint:lll // for readability func (b *backend) ListDatabases(ctx context.Context, params *backends.ListDatabasesParams) (*backends.ListDatabasesResult, error) { - return nil, lazyerrors.New("not yet implemented") + list, err := b.r.DatabaseList(ctx) + if err != nil { + return nil, err + } + + var res *backends.ListDatabasesResult + + if params != nil && len(params.Name) > 0 { + i, found := slices.BinarySearchFunc(list, params.Name, func(dbName, t string) int { + return cmp.Compare(dbName, t) + }) + + var filteredList []string + + if found { + filteredList = append(filteredList, list[i]) + } + + list = filteredList + } + + res = &backends.ListDatabasesResult{ + Databases: make([]backends.DatabaseInfo, 0, len(list)), + } + + for _, dbName := range list { + res.Databases = append(res.Databases, backends.DatabaseInfo{ + Name: dbName, + }) + } + + return res, nil } // DropDatabase implements backends.Backend interface. func (b *backend) DropDatabase(ctx context.Context, params *backends.DropDatabaseParams) error { - return lazyerrors.New("not yet implemented.") + dropped, err := b.r.DatabaseDrop(ctx, params.Name) + if err != nil { + return lazyerrors.Error(err) + } + + if !dropped { + return backends.NewError(backends.ErrorCodeDatabaseDoesNotExist, nil) + } + + return nil } // Describe implements prometheus.Collector. func (b *backend) Describe(ch chan<- *prometheus.Desc) { - // b.r.Describe(ch) + b.r.Describe(ch) } // Collect implements prometheus.Collector. func (b *backend) Collect(ch chan<- prometheus.Metric) { - // b.r.Collect(ch) + b.r.Collect(ch) } // check interfaces diff --git a/internal/backends/mysql/collection.go b/internal/backends/mysql/collection.go index 2a7b080fbb2a..13c8341b7045 100644 --- a/internal/backends/mysql/collection.go +++ b/internal/backends/mysql/collection.go @@ -189,7 +189,7 @@ func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllPa } q := fmt.Sprintf( - `UPDATE %s.%s SET %s = ? WHERE %s = ?`, + `UPDATE %q.%q SET %s = ? WHERE %s = ?`, c.dbName, meta.TableName, metadata.DefaultColumn, metadata.IDColumn, @@ -272,7 +272,7 @@ func (c *collection) DeleteAll(ctx context.Context, params *backends.DeleteAllPa } q := fmt.Sprintf( - `DELETE FROM %s.%s WHERE %s IN (%s)`, + `DELETE FROM %q.%q WHERE %s IN (%s)`, c.dbName, meta.TableName, column, strings.Join(placeholders, ", "), @@ -396,7 +396,7 @@ func (c *collection) Compact(ctx context.Context, params *backends.CompactParams } q := "OPTIMIZE TABLE " - q += fmt.Sprintf("%s.%s", c.dbName, coll.TableName) + q += fmt.Sprintf("%q.%q", c.dbName, coll.TableName) if _, err = p.ExecContext(ctx, q); err != nil { return nil, lazyerrors.Error(err) diff --git a/internal/backends/mysql/insert.go b/internal/backends/mysql/insert.go index 3bb664b470b2..0413e2fbcbad 100644 --- a/internal/backends/mysql/insert.go +++ b/internal/backends/mysql/insert.go @@ -54,7 +54,7 @@ func prepareInsertStatement(schema, tableName string, capped bool, docs []*types } return fmt.Sprintf( - `INSERT INTO %s.%s (%s) VALUES %s`, + `INSERT INTO %q.%q (%s) VALUES %s`, schema, tableName, columns, strings.Join(rows, ", "), diff --git a/internal/util/fsql/db.go b/internal/util/fsql/db.go index d21e80ddf98c..f550c0306705 100644 --- a/internal/util/fsql/db.go +++ b/internal/util/fsql/db.go @@ -67,6 +67,12 @@ func (db *DB) Close() error { return db.sqlDB.Close() } +// Ping calls [*sql.DB.Ping]. +func (db *DB) Ping(ctx context.Context) error { + defer observability.FuncCall(ctx)() + return db.sqlDB.Ping() +} + // QueryContext calls [*sql.DB.QueryContext]. func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) { defer observability.FuncCall(ctx)()