From 0c84c1f72a679770e305ab0edc94b33642023f43 Mon Sep 17 00:00:00 2001 From: adetunjii Date: Thu, 15 Feb 2024 19:48:09 +0100 Subject: [PATCH 1/9] Add MySQL backend collection --- internal/backends/mysql/collection.go | 416 +++++++++++++++++- internal/backends/mysql/helpers.go | 74 ++++ internal/backends/mysql/insert.go | 62 +++ internal/backends/mysql/mysql.go | 5 + internal/backends/mysql/query_iterator.go | 21 + internal/backends/postgresql/helpers.go | 2 +- .../backends/postgresql/query_iterator.go | 2 +- 7 files changed, 571 insertions(+), 11 deletions(-) create mode 100644 internal/backends/mysql/helpers.go create mode 100644 internal/backends/mysql/insert.go diff --git a/internal/backends/mysql/collection.go b/internal/backends/mysql/collection.go index aa45cef8a1c8..cd44d73a2b5a 100644 --- a/internal/backends/mysql/collection.go +++ b/internal/backends/mysql/collection.go @@ -16,10 +16,21 @@ package mysql import ( "context" + "database/sql" + "errors" + "fmt" + "sort" + "strings" + + "github.com/go-sql-driver/mysql" "github.com/FerretDB/FerretDB/internal/backends" "github.com/FerretDB/FerretDB/internal/backends/mysql/metadata" + "github.com/FerretDB/FerretDB/internal/handler/sjson" + "github.com/FerretDB/FerretDB/internal/types" + "github.com/FerretDB/FerretDB/internal/util/fsql" "github.com/FerretDB/FerretDB/internal/util/lazyerrors" + "github.com/FerretDB/FerretDB/internal/util/must" ) // collection implements backends.Collection interface. @@ -40,27 +51,312 @@ func newCollection(r *metadata.Registry, dbName, name string) backends.Collectio // Query implements backends.Collection interface. func (c *collection) Query(ctx context.Context, params *backends.QueryParams) (*backends.QueryResult, error) { - return nil, lazyerrors.New("not yet implemented") + p, err := c.r.DatabaseGetExisting(ctx, c.dbName) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if p == nil { + return &backends.QueryResult{ + Iter: newQueryIterator(ctx, nil, params.OnlyRecordIDs), + }, nil + } + + meta, err := c.r.CollectionGet(ctx, c.dbName, c.name) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if meta == nil { + return &backends.QueryResult{ + Iter: newQueryIterator(ctx, nil, params.OnlyRecordIDs), + }, nil + } + + q := prepareSelectClause(&selectParams{ + Schema: c.dbName, + Table: meta.TableName, + Comment: params.Comment, + Capped: meta.Capped(), + OnlyRecordIDs: params.OnlyRecordIDs, + }) + + var where string + var args []any + + where, args, err = prepareWhereClause(params.Filter) + if err != nil { + return nil, lazyerrors.Error(err) + } + + q += where + + sort, sortArgs := prepareOrderByClause(params.Sort) + + q += sort + args = append(args, sortArgs...) + + if params.Limit != 0 { + q += ` LIMIT ?` + args = append(args, params.Limit) + } + + rows, err := p.QueryContext(ctx, q, args...) + if err != nil { + return nil, lazyerrors.Error(err) + } + + return &backends.QueryResult{ + Iter: newQueryIterator(ctx, rows, params.OnlyRecordIDs), + }, nil } // InsertAll implements backends.Collection interface. func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllParams) (*backends.InsertAllResult, error) { - return nil, lazyerrors.New("not yet implemented") + p, err := c.r.DatabaseGetExisting(ctx, c.dbName) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if _, err = c.r.CollectionCreate(ctx, &metadata.CollectionCreateParams{ + DBName: c.dbName, + Name: c.name, + }); err != nil { + return nil, lazyerrors.Error(err) + } + + meta, err := c.r.CollectionGet(ctx, c.dbName, c.name) + if err != nil { + return nil, lazyerrors.Error(err) + } + + err = p.InTransaction(ctx, func(tx *fsql.Tx) error { + const batchSize = 100 + + var batch []*types.Document + docs := params.Docs + + for len(docs) > 0 { + i := min(batchSize, len(docs)) + batch, docs = docs[:i], docs[i:] + + var q string + var args []any + + q, args, err = prepareInsertStatement(c.dbName, meta.TableName, meta.Capped(), batch) + if err != nil { + return lazyerrors.Error(err) + } + + if _, err = tx.ExecContext(ctx, q, args...); err != nil { + var mysqlErr *mysql.MySQLError + if errors.As(err, &mysqlErr) && mysqlErr.Number == ErrDuplicateEntry { + return backends.NewError(backends.ErrorCodeInsertDuplicateID, err) + } + + return lazyerrors.Error(err) + } + } + + return nil + }) + if err != nil { + return nil, err + } + + return new(backends.InsertAllResult), nil } // UpdateAll implements backend.Collection interface. func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllParams) (*backends.UpdateAllResult, error) { - return nil, lazyerrors.New("not yet implemented") + p, err := c.r.DatabaseGetExisting(ctx, c.dbName) + if err != nil { + return nil, lazyerrors.Error(err) + } + + var res backends.UpdateAllResult + if p == nil { + return &res, nil + } + + meta, err := c.r.CollectionGet(ctx, c.dbName, c.name) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if meta == nil { + return &res, nil + } + + q := fmt.Sprintf( + `UPDATE %s.%s SET %s = ? WHERE %s = ?`, + c.dbName, meta.TableName, + metadata.DefaultColumn, + metadata.IDColumn, + ) + + err = p.InTransaction(ctx, func(tx *fsql.Tx) error { + for _, doc := range params.Docs { + var b []byte + if b, err = sjson.Marshal(doc); err != nil { + return lazyerrors.Error(err) + } + + id, _ := doc.Get("_id") + must.NotBeZero(id) + + arg := must.NotFail(sjson.MarshalSingleValue(id)) + + var stats sql.Result + if stats, err = tx.ExecContext(ctx, q, b, arg); err != nil { + return lazyerrors.Error(err) + } + + var ra int64 + ra, err = stats.RowsAffected() + if err != nil { + return lazyerrors.Error(err) + } + + res.Updated += int32(ra) + } + + return nil + }) + if err != nil { + return nil, lazyerrors.Error(err) + } + + return &res, nil } // DeleteAll implements backend.Collection interface. func (c *collection) DeleteAll(ctx context.Context, params *backends.DeleteAllParams) (*backends.DeleteAllResult, error) { - return nil, lazyerrors.New("not yet implemented") + p, err := c.r.DatabaseGetExisting(ctx, c.dbName) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if p == nil { + return &backends.DeleteAllResult{Deleted: 0}, nil + } + + meta, err := c.r.CollectionGet(ctx, c.dbName, c.name) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if meta == nil { + return &backends.DeleteAllResult{Deleted: 0}, nil + } + + var column string + var placeholders []string + var args []any + + if params.RecordIDs == nil { + placeholders = make([]string, len(params.IDs)) + args = make([]any, len(params.RecordIDs)) + + for i, id := range params.RecordIDs { + placeholders[i] = "?" + args[i] = id + } + + column = metadata.RecordIDColumn + } + + q := fmt.Sprintf( + `DELETE FROM %s.%s WHERE %s IN (%s)`, + c.dbName, meta.TableName, + column, + strings.Join(placeholders, ", "), + ) + + res, err := p.ExecContext(ctx, q, args...) + if err != nil { + return nil, lazyerrors.Error(err) + } + + ra, err := res.RowsAffected() + if err != nil { + return nil, lazyerrors.Error(err) + } + + return &backends.DeleteAllResult{ + Deleted: int32(ra), + }, nil } // Explain implements backends.Collection interface. func (c *collection) Explain(ctx context.Context, params *backends.ExplainParams) (*backends.ExplainResult, error) { - return nil, lazyerrors.New("not yet implemented") + p, err := c.r.DatabaseGetExisting(ctx, c.dbName) + if err != nil { + return &backends.ExplainResult{ + QueryPlanner: must.NotFail(types.NewDocument()), + }, nil + } + + if p == nil { + return &backends.ExplainResult{ + QueryPlanner: must.NotFail(types.NewDocument()), + }, nil + } + + meta, err := c.r.CollectionGet(ctx, c.dbName, c.name) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if meta == nil { + return &backends.ExplainResult{ + QueryPlanner: must.NotFail(types.NewDocument()), + }, nil + } + + res := new(backends.ExplainResult) + + opts := &selectParams{ + Schema: c.dbName, + Table: meta.TableName, + Capped: meta.Capped(), + } + + q := `EXPLAIN FORMAT=JSON ` + prepareSelectClause(opts) + + where, args, err := prepareWhereClause(params.Filter) + if err != nil { + return nil, lazyerrors.Error(err) + } + + res.FilterPushdown = where != "" + + q += where + + sort, sortArgs := prepareOrderByClause(params.Sort) + + q += sort + args = append(args, sortArgs...) + + if params.Limit != 0 { + q += ` LIMIT ?` + args = append(args, params.Limit) + res.LimitPushdown = true + } + + var b []byte + if err = p.QueryRowContext(ctx, q, args...).Scan(&b); err != nil { + return nil, lazyerrors.Error(err) + } + + queryPlan, err := unmarshalExplain(b) + if err != nil { + return nil, lazyerrors.Error(err) + } + + res.QueryPlanner = queryPlan + + return res, nil } // Stats implements backends.Collection interface. @@ -70,22 +366,124 @@ func (c *collection) Stats(ctx context.Context, params *backends.CollectionStats // Compact implements backends.Collection interface. func (c *collection) Compact(ctx context.Context, params *backends.CompactParams) (*backends.CompactResult, error) { - return nil, lazyerrors.New("not yet implemented") + p, err := c.r.DatabaseGetExisting(ctx, c.dbName) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if p == nil { + return nil, backends.NewError( + backends.ErrorCodeDatabaseDoesNotExist, + lazyerrors.Errorf("no ns %s.%s", c.dbName, c.name), + ) + } + + coll, err := c.r.CollectionGet(ctx, c.dbName, c.name) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if coll == nil { + return nil, backends.NewError( + backends.ErrorCodeCollectionDoesNotExist, + lazyerrors.Errorf("no ns %s.%s", c.dbName, c.name), + ) + } + + q := "OPTIMIZE TABLE " + q += fmt.Sprintf("%s.%s", c.dbName, coll.TableName) + + if _, err = p.ExecContext(ctx, q); err != nil { + return nil, lazyerrors.Error(err) + } + + return new(backends.CompactResult), nil } // ListIndexes implements backends.Collection interface. func (c *collection) ListIndexes(ctx context.Context, params *backends.ListIndexesParams) (*backends.ListIndexesResult, error) { - return nil, lazyerrors.New("not yet implemented") + db, err := c.r.DatabaseGetExisting(ctx, c.dbName) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if db == nil { + return nil, backends.NewError( + backends.ErrorCodeCollectionDoesNotExist, + lazyerrors.Errorf("no ns %s.%s", c.dbName, c.name), + ) + } + + coll, err := c.r.CollectionGet(ctx, c.dbName, c.name) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if coll == nil { + return nil, backends.NewError( + backends.ErrorCodeCollectionDoesNotExist, + lazyerrors.Errorf("no ns %s.%s", c.dbName, c.name), + ) + } + + res := backends.ListIndexesResult{ + Indexes: make([]backends.IndexInfo, len(coll.Indexes)), + } + + for i, index := range coll.Indexes { + res.Indexes[i] = backends.IndexInfo{ + Name: index.Name, + Unique: index.Unique, + Key: make([]backends.IndexKeyPair, len(index.Key)), + } + + for j, key := range index.Key { + res.Indexes[i].Key[j] = backends.IndexKeyPair{ + Field: key.Field, + Descending: key.Descending, + } + } + } + + sort.Slice(res.Indexes, func(i, j int) bool { return res.Indexes[i].Name < res.Indexes[j].Name }) + + return &res, nil } // CreateIndexes implements backends.Collection interface. func (c *collection) CreateIndexes(ctx context.Context, params *backends.CreateIndexesParams) (*backends.CreateIndexesResult, error) { //nolint:lll // for readability - return nil, lazyerrors.New("not yet implemented") + indexes := make([]metadata.IndexInfo, len(params.Indexes)) + for i, index := range params.Indexes { + indexes[i] = metadata.IndexInfo{ + Name: index.Name, + Key: make([]metadata.IndexKeyPair, len(index.Key)), + Unique: index.Unique, + } + + for j, key := range index.Key { + indexes[i].Key[j] = metadata.IndexKeyPair{ + Field: key.Field, + Descending: key.Descending, + } + } + } + + err := c.r.IndexesCreate(ctx, c.dbName, c.name, indexes) + if err != nil { + return nil, lazyerrors.Error(err) + } + + return new(backends.CreateIndexesResult), nil } // DropIndexes implements backends.Collection interface. func (c *collection) DropIndexes(ctx context.Context, params *backends.DropIndexesParams) (*backends.DropIndexesResult, error) { - return nil, lazyerrors.New("not yet implemented") + err := c.r.IndexesDrop(ctx, c.dbName, c.name, params.Indexes) + if err != nil { + return nil, lazyerrors.Error(err) + } + + return new(backends.DropIndexesResult), nil } // check interfaces diff --git a/internal/backends/mysql/helpers.go b/internal/backends/mysql/helpers.go new file mode 100644 index 000000000000..2921070065c1 --- /dev/null +++ b/internal/backends/mysql/helpers.go @@ -0,0 +1,74 @@ +// Copyright 2021 FerretDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "encoding/json" + "errors" + "fmt" + + "golang.org/x/exp/maps" + + "github.com/FerretDB/FerretDB/internal/types" + "github.com/FerretDB/FerretDB/internal/util/lazyerrors" +) + +// unmarshalExplain unmarshalls the plan from EXPLAIN MySQL command. +// EXPLAIN result is not sjson, so it cannot be unmarshalled by sjson.Unmarshal. +func unmarshalExplain(b []byte) (*types.Document, error) { + var plans map[string]any + if err := json.Unmarshal(b, &plans); err != nil { + return nil, lazyerrors.Error(err) + } + + if len(plans) == 0 { + return nil, lazyerrors.Error(errors.New("no execution plan returned")) + } + + return convertJSON(plans).(*types.Document), nil +} + +// convertJSON transforms decoded JSON map[string]any value into *types.Document. +func convertJSON(value any) any { + switch value := value.(type) { + case map[string]any: + d := types.MakeDocument(len(value)) + keys := maps.Keys(value) + + for _, k := range keys { + v := value[k] + d.Set(k, convertJSON(v)) + } + + return d + + case []any: + a := types.MakeArray(len(value)) + for _, v := range value { + a.Append(convertJSON(v)) + } + + return a + + case nil: + return types.Null + + case float64, string, bool: + return value + + default: + panic(fmt.Sprintf("unsupported type: %[1]v (%[1]v)", value)) + } +} diff --git a/internal/backends/mysql/insert.go b/internal/backends/mysql/insert.go new file mode 100644 index 000000000000..3bb664b470b2 --- /dev/null +++ b/internal/backends/mysql/insert.go @@ -0,0 +1,62 @@ +// Copyright 2021 FerretDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "fmt" + "strings" + + "github.com/FerretDB/FerretDB/internal/backends/mysql/metadata" + "github.com/FerretDB/FerretDB/internal/handler/sjson" + "github.com/FerretDB/FerretDB/internal/types" + "github.com/FerretDB/FerretDB/internal/util/lazyerrors" +) + +// prepareInsertStatement returns a statement and arguments for inserting the given documents. +// +// If capped is true, it returns a statement and arguments for inserting record IDs and documents. +func prepareInsertStatement(schema, tableName string, capped bool, docs []*types.Document) (string, []any, error) { + var args []any + rows := make([]string, len(docs)) + + for i, doc := range docs { + b, err := sjson.Marshal(doc) + if err != nil { + return "", nil, lazyerrors.Error(err) + } + + if capped { + rows[i] = "(?, ?)" + args = append(args, doc.RecordID(), string(b)) + + continue + } + + rows[i] = "(?)" + args = append(args, string(b)) + } + + columns := metadata.DefaultColumn + if capped { + columns = strings.Join([]string{metadata.RecordIDColumn, metadata.DefaultColumn}, ", ") + } + + return fmt.Sprintf( + `INSERT INTO %s.%s (%s) VALUES %s`, + schema, tableName, + columns, + strings.Join(rows, ", "), + ), args, nil +} diff --git a/internal/backends/mysql/mysql.go b/internal/backends/mysql/mysql.go index e21ad8e84c51..3b04299563b0 100644 --- a/internal/backends/mysql/mysql.go +++ b/internal/backends/mysql/mysql.go @@ -29,6 +29,11 @@ import ( "github.com/FerretDB/FerretDB/internal/util/lazyerrors" ) +const ( + // ErrDuplicateEntry is the unique key violation error code for MySQL. + ErrDuplicateEntry = 1062 +) + // stats represents information about statistics of tables and indexes. type stats struct { countDocuments int64 diff --git a/internal/backends/mysql/query_iterator.go b/internal/backends/mysql/query_iterator.go index 093de3d7d2be..f19938e20aae 100644 --- a/internal/backends/mysql/query_iterator.go +++ b/internal/backends/mysql/query_iterator.go @@ -42,6 +42,27 @@ type queryIterator struct { onlyRecordIDs bool } +// newQueryIterator returns a new queryiterator for the given Rows. +// +// Iterator's Close method closes rows. +// They are also closed by the Next method on any error, including context cancellation, +// to make sure that the database connection is released as early as possible. +// In that case, the iterator's Close method should still be called. +// +// Nil rows are possible and return already done iterator. +// It should still be Closed. +func newQueryIterator(ctx context.Context, rows *fsql.Rows, onlyRecordIDs bool) types.DocumentsIterator { + iter := &queryIterator{ + ctx: ctx, + rows: rows, + onlyRecordIDs: onlyRecordIDs, + token: resource.NewToken(), + } + resource.Track(iter, iter.token) + + return iter +} + // Next implements iterator.Interface. func (iter *queryIterator) Next() (struct{}, *types.Document, error) { defer observability.FuncCall(iter.ctx)() diff --git a/internal/backends/postgresql/helpers.go b/internal/backends/postgresql/helpers.go index b00d958625c5..3718c8d2dcf4 100644 --- a/internal/backends/postgresql/helpers.go +++ b/internal/backends/postgresql/helpers.go @@ -26,7 +26,7 @@ import ( ) // unmarshalExplain unmarshalls the plan from EXPLAIN postgreSQL command. -// EXPLAIN result is not sjson, so it cannot be unmarshaled by sjson.Unmarshal. +// EXPLAIN result is not sjson, so it cannot be unmarshalled by sjson.Unmarshal. func unmarshalExplain(b []byte) (*types.Document, error) { var plans []map[string]any if err := json.Unmarshal(b, &plans); err != nil { diff --git a/internal/backends/postgresql/query_iterator.go b/internal/backends/postgresql/query_iterator.go index 0e770520e90d..36f51487fda5 100644 --- a/internal/backends/postgresql/query_iterator.go +++ b/internal/backends/postgresql/query_iterator.go @@ -51,7 +51,7 @@ type queryIterator struct { // In that case, the iterator's Close method should still be called. // // Nil rows are possible and return already done iterator. -// It still should be Close'd. +// It still should be Closed. func newQueryIterator(ctx context.Context, rows pgx.Rows, onlyRecordIDs bool) types.DocumentsIterator { iter := &queryIterator{ ctx: ctx, From ac76a2ae9832027bf39ec664c8df907f17e853eb Mon Sep 17 00:00:00 2001 From: adetunjii Date: Thu, 15 Feb 2024 19:57:24 +0100 Subject: [PATCH 2/9] fixed linting --- internal/backends/mysql/collection.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/backends/mysql/collection.go b/internal/backends/mysql/collection.go index cd44d73a2b5a..f3aa71adf70e 100644 --- a/internal/backends/mysql/collection.go +++ b/internal/backends/mysql/collection.go @@ -198,7 +198,8 @@ func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllPa err = p.InTransaction(ctx, func(tx *fsql.Tx) error { for _, doc := range params.Docs { var b []byte - if b, err = sjson.Marshal(doc); err != nil { + b, err = sjson.Marshal(doc) + if err != nil { return lazyerrors.Error(err) } @@ -208,7 +209,8 @@ func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllPa arg := must.NotFail(sjson.MarshalSingleValue(id)) var stats sql.Result - if stats, err = tx.ExecContext(ctx, q, b, arg); err != nil { + stats, err = tx.ExecContext(ctx, q, b, arg) + if err != nil { return lazyerrors.Error(err) } From de071d8e5d30943321c032de4a257ce82e9cb0a8 Mon Sep 17 00:00:00 2001 From: adetunjii Date: Thu, 15 Feb 2024 20:32:16 +0100 Subject: [PATCH 3/9] fix linting --- internal/backends/mysql/collection.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/backends/mysql/collection.go b/internal/backends/mysql/collection.go index f3aa71adf70e..2404a7b103da 100644 --- a/internal/backends/mysql/collection.go +++ b/internal/backends/mysql/collection.go @@ -198,6 +198,7 @@ func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllPa err = p.InTransaction(ctx, func(tx *fsql.Tx) error { for _, doc := range params.Docs { var b []byte + b, err = sjson.Marshal(doc) if err != nil { return lazyerrors.Error(err) @@ -209,12 +210,14 @@ func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllPa arg := must.NotFail(sjson.MarshalSingleValue(id)) var stats sql.Result + stats, err = tx.ExecContext(ctx, q, b, arg) if err != nil { return lazyerrors.Error(err) } var ra int64 + ra, err = stats.RowsAffected() if err != nil { return lazyerrors.Error(err) From f1169f5b34464a8388460f4957b67e2f36eb3a0a Mon Sep 17 00:00:00 2001 From: adetunjii Date: Fri, 16 Feb 2024 09:49:38 +0100 Subject: [PATCH 4/9] fix typo --- internal/backends/mysql/helpers.go | 2 +- internal/backends/postgresql/helpers.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/backends/mysql/helpers.go b/internal/backends/mysql/helpers.go index 2921070065c1..a5ec99d5da8b 100644 --- a/internal/backends/mysql/helpers.go +++ b/internal/backends/mysql/helpers.go @@ -26,7 +26,7 @@ import ( ) // unmarshalExplain unmarshalls the plan from EXPLAIN MySQL command. -// EXPLAIN result is not sjson, so it cannot be unmarshalled by sjson.Unmarshal. +// EXPLAIN result is not sjson, so it cannot be unmarshaled by sjson.Unmarshal. func unmarshalExplain(b []byte) (*types.Document, error) { var plans map[string]any if err := json.Unmarshal(b, &plans); err != nil { diff --git a/internal/backends/postgresql/helpers.go b/internal/backends/postgresql/helpers.go index 3718c8d2dcf4..b00d958625c5 100644 --- a/internal/backends/postgresql/helpers.go +++ b/internal/backends/postgresql/helpers.go @@ -26,7 +26,7 @@ import ( ) // unmarshalExplain unmarshalls the plan from EXPLAIN postgreSQL command. -// EXPLAIN result is not sjson, so it cannot be unmarshalled by sjson.Unmarshal. +// EXPLAIN result is not sjson, so it cannot be unmarshaled by sjson.Unmarshal. func unmarshalExplain(b []byte) (*types.Document, error) { var plans []map[string]any if err := json.Unmarshal(b, &plans); err != nil { From 2740647315dbcb70179eda7106be14a172f1f65b Mon Sep 17 00:00:00 2001 From: adetunjii Date: Thu, 22 Feb 2024 11:38:15 +0100 Subject: [PATCH 5/9] Fix InsertAll query --- internal/backends/mysql/collection.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/backends/mysql/collection.go b/internal/backends/mysql/collection.go index 2404a7b103da..0bbbfd776694 100644 --- a/internal/backends/mysql/collection.go +++ b/internal/backends/mysql/collection.go @@ -113,17 +113,17 @@ func (c *collection) Query(ctx context.Context, params *backends.QueryParams) (* // InsertAll implements backends.Collection interface. func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllParams) (*backends.InsertAllResult, error) { - p, err := c.r.DatabaseGetExisting(ctx, c.dbName) - if err != nil { - return nil, lazyerrors.Error(err) - } - - if _, err = c.r.CollectionCreate(ctx, &metadata.CollectionCreateParams{ + if _, err := c.r.CollectionCreate(ctx, &metadata.CollectionCreateParams{ DBName: c.dbName, Name: c.name, }); err != nil { return nil, lazyerrors.Error(err) } + + p, err := c.r.DatabaseGetExisting(ctx, c.dbName) + if err != nil { + return nil, lazyerrors.Error(err) + } meta, err := c.r.CollectionGet(ctx, c.dbName, c.name) if err != nil { From 3687aba1bfc3846087a9466114dc6b93b56d4182 Mon Sep 17 00:00:00 2001 From: adetunjii Date: Thu, 22 Feb 2024 12:04:46 +0100 Subject: [PATCH 6/9] fixed linting --- internal/backends/mysql/collection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/backends/mysql/collection.go b/internal/backends/mysql/collection.go index 0bbbfd776694..2a7b080fbb2a 100644 --- a/internal/backends/mysql/collection.go +++ b/internal/backends/mysql/collection.go @@ -119,7 +119,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa }); err != nil { return nil, lazyerrors.Error(err) } - + p, err := c.r.DatabaseGetExisting(ctx, c.dbName) if err != nil { return nil, lazyerrors.Error(err) From 1ce379a4938db5b649efb414f270098f46734b0d Mon Sep 17 00:00:00 2001 From: adetunjii Date: Fri, 1 Mar 2024 09:21:48 +0100 Subject: [PATCH 7/9] Sanitize database and table names --- internal/backends/mysql/collection.go | 6 +++--- internal/backends/mysql/insert.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/backends/mysql/collection.go b/internal/backends/mysql/collection.go index 2a7b080fbb2a..13c8341b7045 100644 --- a/internal/backends/mysql/collection.go +++ b/internal/backends/mysql/collection.go @@ -189,7 +189,7 @@ func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllPa } q := fmt.Sprintf( - `UPDATE %s.%s SET %s = ? WHERE %s = ?`, + `UPDATE %q.%q SET %s = ? WHERE %s = ?`, c.dbName, meta.TableName, metadata.DefaultColumn, metadata.IDColumn, @@ -272,7 +272,7 @@ func (c *collection) DeleteAll(ctx context.Context, params *backends.DeleteAllPa } q := fmt.Sprintf( - `DELETE FROM %s.%s WHERE %s IN (%s)`, + `DELETE FROM %q.%q WHERE %s IN (%s)`, c.dbName, meta.TableName, column, strings.Join(placeholders, ", "), @@ -396,7 +396,7 @@ func (c *collection) Compact(ctx context.Context, params *backends.CompactParams } q := "OPTIMIZE TABLE " - q += fmt.Sprintf("%s.%s", c.dbName, coll.TableName) + q += fmt.Sprintf("%q.%q", c.dbName, coll.TableName) if _, err = p.ExecContext(ctx, q); err != nil { return nil, lazyerrors.Error(err) diff --git a/internal/backends/mysql/insert.go b/internal/backends/mysql/insert.go index 3bb664b470b2..0413e2fbcbad 100644 --- a/internal/backends/mysql/insert.go +++ b/internal/backends/mysql/insert.go @@ -54,7 +54,7 @@ func prepareInsertStatement(schema, tableName string, capped bool, docs []*types } return fmt.Sprintf( - `INSERT INTO %s.%s (%s) VALUES %s`, + `INSERT INTO %q.%q (%s) VALUES %s`, schema, tableName, columns, strings.Join(rows, ", "), From 809f66cfedc3c7f1158502c491bce52c0d46b4a0 Mon Sep 17 00:00:00 2001 From: adetunjii Date: Fri, 1 Mar 2024 09:56:29 +0100 Subject: [PATCH 8/9] Add MySQL backend --- internal/backends/mysql/backend.go | 109 +++++++++++++++++++++++++++-- internal/util/fsql/db.go | 6 ++ 2 files changed, 109 insertions(+), 6 deletions(-) diff --git a/internal/backends/mysql/backend.go b/internal/backends/mysql/backend.go index 8234cff16919..88ee579cefc5 100644 --- a/internal/backends/mysql/backend.go +++ b/internal/backends/mysql/backend.go @@ -15,7 +15,9 @@ package mysql import ( + "cmp" "context" + "slices" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -43,7 +45,14 @@ type NewBackendParams struct { // NewBackend creates a new Backend. func NewBackend(params *NewBackendParams) (backends.Backend, error) { - return nil, lazyerrors.New("not yet implemented") + r, err := metadata.NewRegistry(params.URI, params.L, params.P) + if err != nil { + return nil, err + } + + return backends.BackendContract(&backend{ + r: r, + }), nil } // Close implements backends.Backend interface. @@ -53,7 +62,56 @@ func (b *backend) Close() { // Status implements backends.Backend interface. func (b *backend) Status(ctx context.Context, params *backends.StatusParams) (*backends.StatusResult, error) { - return nil, lazyerrors.New("not yet implemented.") + dbs, err := b.r.DatabaseList(ctx) + if err != nil { + return nil, lazyerrors.Error(err) + } + + var res backends.StatusResult + + var pingSucceeded bool + + for _, dbName := range dbs { + var cs []*metadata.Collection + + if cs, err = b.r.CollectionList(ctx, dbName); err != nil { + return nil, lazyerrors.Error(err) + } + + res.CountCollections += int64(len(cs)) + + colls, err := newDatabase(b.r, dbName).ListCollections(ctx, new(backends.ListCollectionsParams)) + if err != nil { + return nil, lazyerrors.Error(err) + } + + for _, cInfo := range colls.Collections { + if cInfo.Capped() { + res.CountCappedCollections++ + } + } + + if pingSucceeded { + continue + } + + p, err := b.r.DatabaseGetExisting(ctx, dbName) + if err != nil { + return nil, lazyerrors.Error(err) + } + + if p == nil { + continue + } + + if err = p.Ping(ctx); err != nil { + return nil, lazyerrors.Error(err) + } + + pingSucceeded = true + } + + return &res, nil } // Database implements backends.Backend interface. @@ -65,22 +123,61 @@ func (b *backend) Database(name string) (backends.Database, error) { // //nolint:lll // for readability func (b *backend) ListDatabases(ctx context.Context, params *backends.ListDatabasesParams) (*backends.ListDatabasesResult, error) { - return nil, lazyerrors.New("not yet implemented") + list, err := b.r.DatabaseList(ctx) + if err != nil { + return nil, err + } + + var res *backends.ListDatabasesResult + + if params != nil && len(params.Name) > 0 { + i, found := slices.BinarySearchFunc(list, params.Name, func(dbName, t string) int { + return cmp.Compare(dbName, t) + }) + + var filteredList []string + + if found { + filteredList = append(filteredList, list[i]) + } + + list = filteredList + } + + res = &backends.ListDatabasesResult{ + Databases: make([]backends.DatabaseInfo, 0, len(list)), + } + + for _, dbName := range list { + res.Databases = append(res.Databases, backends.DatabaseInfo{ + Name: dbName, + }) + } + return res, nil } // DropDatabase implements backends.Backend interface. func (b *backend) DropDatabase(ctx context.Context, params *backends.DropDatabaseParams) error { - return lazyerrors.New("not yet implemented.") + dropped, err := b.r.DatabaseDrop(ctx, params.Name) + if err != nil { + return lazyerrors.Error(err) + } + + if !dropped { + return backends.NewError(backends.ErrorCodeDatabaseDoesNotExist, nil) + } + + return nil } // Describe implements prometheus.Collector. func (b *backend) Describe(ch chan<- *prometheus.Desc) { - // b.r.Describe(ch) + b.r.Describe(ch) } // Collect implements prometheus.Collector. func (b *backend) Collect(ch chan<- prometheus.Metric) { - // b.r.Collect(ch) + b.r.Collect(ch) } // check interfaces diff --git a/internal/util/fsql/db.go b/internal/util/fsql/db.go index d21e80ddf98c..f550c0306705 100644 --- a/internal/util/fsql/db.go +++ b/internal/util/fsql/db.go @@ -67,6 +67,12 @@ func (db *DB) Close() error { return db.sqlDB.Close() } +// Ping calls [*sql.DB.Ping]. +func (db *DB) Ping(ctx context.Context) error { + defer observability.FuncCall(ctx)() + return db.sqlDB.Ping() +} + // QueryContext calls [*sql.DB.QueryContext]. func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) { defer observability.FuncCall(ctx)() From 1413518942f095cf3499238378dad5d4aa0a4caf Mon Sep 17 00:00:00 2001 From: adetunjii Date: Fri, 1 Mar 2024 10:21:22 +0100 Subject: [PATCH 9/9] fix linting --- internal/backends/mysql/backend.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/backends/mysql/backend.go b/internal/backends/mysql/backend.go index 88ee579cefc5..5629f1d32e4e 100644 --- a/internal/backends/mysql/backend.go +++ b/internal/backends/mysql/backend.go @@ -153,6 +153,7 @@ func (b *backend) ListDatabases(ctx context.Context, params *backends.ListDataba Name: dbName, }) } + return res, nil }