Skip to content

Commit

Permalink
Add MySQL backend (#4137)
Browse files Browse the repository at this point in the history
Closes #3413.
  • Loading branch information
adetunjii committed Mar 4, 2024
1 parent 1615d26 commit dc757b9
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 10 deletions.
110 changes: 104 additions & 6 deletions internal/backends/mysql/backend.go
Expand Up @@ -15,7 +15,9 @@
package mysql

import (
"cmp"
"context"
"slices"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -43,7 +45,14 @@ type NewBackendParams struct {

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
return nil, lazyerrors.New("not yet implemented")
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
if err != nil {
return nil, err
}

return backends.BackendContract(&backend{
r: r,
}), nil
}

// Close implements backends.Backend interface.
Expand All @@ -53,7 +62,56 @@ func (b *backend) Close() {

// Status implements backends.Backend interface.
func (b *backend) Status(ctx context.Context, params *backends.StatusParams) (*backends.StatusResult, error) {
return nil, lazyerrors.New("not yet implemented.")
dbs, err := b.r.DatabaseList(ctx)
if err != nil {
return nil, lazyerrors.Error(err)
}

var res backends.StatusResult

var pingSucceeded bool

for _, dbName := range dbs {
var cs []*metadata.Collection

if cs, err = b.r.CollectionList(ctx, dbName); err != nil {
return nil, lazyerrors.Error(err)
}

res.CountCollections += int64(len(cs))

colls, err := newDatabase(b.r, dbName).ListCollections(ctx, new(backends.ListCollectionsParams))
if err != nil {
return nil, lazyerrors.Error(err)
}

for _, cInfo := range colls.Collections {
if cInfo.Capped() {
res.CountCappedCollections++
}
}

if pingSucceeded {
continue
}

p, err := b.r.DatabaseGetExisting(ctx, dbName)
if err != nil {
return nil, lazyerrors.Error(err)
}

if p == nil {
continue
}

if err = p.Ping(ctx); err != nil {
return nil, lazyerrors.Error(err)
}

pingSucceeded = true
}

return &res, nil
}

// Database implements backends.Backend interface.
Expand All @@ -65,22 +123,62 @@ func (b *backend) Database(name string) (backends.Database, error) {
//
//nolint:lll // for readability
func (b *backend) ListDatabases(ctx context.Context, params *backends.ListDatabasesParams) (*backends.ListDatabasesResult, error) {
return nil, lazyerrors.New("not yet implemented")
list, err := b.r.DatabaseList(ctx)
if err != nil {
return nil, err
}

var res *backends.ListDatabasesResult

if params != nil && len(params.Name) > 0 {
i, found := slices.BinarySearchFunc(list, params.Name, func(dbName, t string) int {
return cmp.Compare(dbName, t)
})

var filteredList []string

if found {
filteredList = append(filteredList, list[i])
}

list = filteredList
}

res = &backends.ListDatabasesResult{
Databases: make([]backends.DatabaseInfo, 0, len(list)),
}

for _, dbName := range list {
res.Databases = append(res.Databases, backends.DatabaseInfo{
Name: dbName,
})
}

return res, nil
}

// DropDatabase implements backends.Backend interface.
func (b *backend) DropDatabase(ctx context.Context, params *backends.DropDatabaseParams) error {
return lazyerrors.New("not yet implemented.")
dropped, err := b.r.DatabaseDrop(ctx, params.Name)
if err != nil {
return lazyerrors.Error(err)
}

if !dropped {
return backends.NewError(backends.ErrorCodeDatabaseDoesNotExist, nil)
}

return nil
}

// Describe implements prometheus.Collector.
func (b *backend) Describe(ch chan<- *prometheus.Desc) {
// b.r.Describe(ch)
b.r.Describe(ch)
}

// Collect implements prometheus.Collector.
func (b *backend) Collect(ch chan<- prometheus.Metric) {
// b.r.Collect(ch)
b.r.Collect(ch)
}

// check interfaces
Expand Down
6 changes: 3 additions & 3 deletions internal/backends/mysql/collection.go
Expand Up @@ -189,7 +189,7 @@ func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllPa
}

q := fmt.Sprintf(
`UPDATE %s.%s SET %s = ? WHERE %s = ?`,
`UPDATE %q.%q SET %s = ? WHERE %s = ?`,
c.dbName, meta.TableName,
metadata.DefaultColumn,
metadata.IDColumn,
Expand Down Expand Up @@ -272,7 +272,7 @@ func (c *collection) DeleteAll(ctx context.Context, params *backends.DeleteAllPa
}

q := fmt.Sprintf(
`DELETE FROM %s.%s WHERE %s IN (%s)`,
`DELETE FROM %q.%q WHERE %s IN (%s)`,
c.dbName, meta.TableName,
column,
strings.Join(placeholders, ", "),
Expand Down Expand Up @@ -396,7 +396,7 @@ func (c *collection) Compact(ctx context.Context, params *backends.CompactParams
}

q := "OPTIMIZE TABLE "
q += fmt.Sprintf("%s.%s", c.dbName, coll.TableName)
q += fmt.Sprintf("%q.%q", c.dbName, coll.TableName)

if _, err = p.ExecContext(ctx, q); err != nil {
return nil, lazyerrors.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/backends/mysql/insert.go
Expand Up @@ -54,7 +54,7 @@ func prepareInsertStatement(schema, tableName string, capped bool, docs []*types
}

return fmt.Sprintf(
`INSERT INTO %s.%s (%s) VALUES %s`,
`INSERT INTO %q.%q (%s) VALUES %s`,
schema, tableName,
columns,
strings.Join(rows, ", "),
Expand Down
6 changes: 6 additions & 0 deletions internal/util/fsql/db.go
Expand Up @@ -67,6 +67,12 @@ func (db *DB) Close() error {
return db.sqlDB.Close()
}

// Ping calls [*sql.DB.Ping].
func (db *DB) Ping(ctx context.Context) error {
defer observability.FuncCall(ctx)()
return db.sqlDB.Ping()
}

// QueryContext calls [*sql.DB.QueryContext].
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
defer observability.FuncCall(ctx)()
Expand Down

0 comments on commit dc757b9

Please sign in to comment.