From 1bf99019b1052eea6c87d326a805057e0080e86d Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Wed, 28 Feb 2024 00:12:47 +0100 Subject: [PATCH 01/14] add batchsize support for Handler --- cmd/ferretdb/main.go | 2 ++ internal/handler/handler.go | 1 + internal/handler/msg_insert.go | 8 +++----- internal/handler/registry/hana.go | 1 + internal/handler/registry/mysql.go | 1 + internal/handler/registry/postgresql.go | 1 + internal/handler/registry/registry.go | 1 + internal/handler/registry/sqlite.go | 1 + 8 files changed, 11 insertions(+), 5 deletions(-) diff --git a/cmd/ferretdb/main.go b/cmd/ferretdb/main.go index bcfd8e3d7a0f..1adcef45ce6f 100644 --- a/cmd/ferretdb/main.go +++ b/cmd/ferretdb/main.go @@ -101,6 +101,7 @@ var cli struct { } `embed:"" prefix:"capped-cleanup-"` EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."` + BatchSize int `default:"100" help:"number of maximum size of query parameters"` Telemetry struct { URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."` @@ -412,6 +413,7 @@ func run() { CappedCleanupInterval: cli.Test.CappedCleanup.Interval, CappedCleanupPercentage: cli.Test.CappedCleanup.Percentage, EnableNewAuth: cli.Test.EnableNewAuth, + BatchSize: cli.Test.BatchSize, }, }) if err != nil { diff --git a/internal/handler/handler.go b/internal/handler/handler.go index afcad5878b41..90a7d7a47682 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -81,6 +81,7 @@ type NewOpts struct { CappedCleanupInterval time.Duration CappedCleanupPercentage uint8 EnableNewAuth bool + BatchSize int } // New returns a new handler. diff --git a/internal/handler/msg_insert.go b/internal/handler/msg_insert.go index 7571ce1dc3c7..4359cf190787 100644 --- a/internal/handler/msg_insert.go +++ b/internal/handler/msg_insert.go @@ -85,13 +85,11 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, var done bool for !done { - // TODO https://github.com/FerretDB/FerretDB/issues/3708 - const batchSize = 1000 - docs := make([]*types.Document, 0, batchSize) - docsIndexes := make([]int, 0, batchSize) + docs := make([]*types.Document, 0, h.BatchSize) + docsIndexes := make([]int, 0, h.BatchSize) - for j := 0; j < batchSize; j++ { + for j := 0; j < h.BatchSize; j++ { var i int var d any diff --git a/internal/handler/registry/hana.go b/internal/handler/registry/hana.go index 24eb8cf190af..976390966a71 100644 --- a/internal/handler/registry/hana.go +++ b/internal/handler/registry/hana.go @@ -48,6 +48,7 @@ func init() { CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: opts.EnableNewAuth, + BatchSize: opts.BatchSize } h, err := handler.New(handlerOpts) diff --git a/internal/handler/registry/mysql.go b/internal/handler/registry/mysql.go index 11a1f4f803fb..7ac10b6406b6 100644 --- a/internal/handler/registry/mysql.go +++ b/internal/handler/registry/mysql.go @@ -45,6 +45,7 @@ func init() { CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: opts.EnableNewAuth, + BatchSize: opts.BatchSize, } h, err := handler.New(handlerOpts) diff --git a/internal/handler/registry/postgresql.go b/internal/handler/registry/postgresql.go index 36ce440569c4..61ef99a12149 100644 --- a/internal/handler/registry/postgresql.go +++ b/internal/handler/registry/postgresql.go @@ -47,6 +47,7 @@ func init() { CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: opts.EnableNewAuth, + BatchSize: opts.BatchSize, } h, err := handler.New(handlerOpts) diff --git a/internal/handler/registry/registry.go b/internal/handler/registry/registry.go index 47c98923c0b3..a3d4d0e89a64 100644 --- a/internal/handler/registry/registry.go +++ b/internal/handler/registry/registry.go @@ -71,6 +71,7 @@ type TestOpts struct { CappedCleanupInterval time.Duration CappedCleanupPercentage uint8 EnableNewAuth bool + BatchSize int _ struct{} // prevent unkeyed literals } diff --git a/internal/handler/registry/sqlite.go b/internal/handler/registry/sqlite.go index 7c0613256463..48725d6bd412 100644 --- a/internal/handler/registry/sqlite.go +++ b/internal/handler/registry/sqlite.go @@ -47,6 +47,7 @@ func init() { CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: opts.EnableNewAuth, + BatchSize: opts.BatchSize, } h, err := handler.New(handlerOpts) From b22809393940a14f2375c84609647e26865936e3 Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Tue, 5 Mar 2024 00:14:02 +0100 Subject: [PATCH 02/14] WIP --- internal/backends/postgresql/backend.go | 11 +++++----- internal/backends/postgresql/collection.go | 3 +-- .../backends/postgresql/metadata/registry.go | 20 ++++++++++++----- .../postgresql/metadata/registry_test.go | 4 ++-- internal/backends/sqlite/backend.go | 11 +++++----- internal/backends/sqlite/collection.go | 3 +-- internal/backends/sqlite/metadata/registry.go | 22 ++++++++++++++----- .../backends/sqlite/metadata/registry_test.go | 12 +++++----- internal/backends/sqlite/sqlite_test.go | 2 +- internal/handler/registry/hana.go | 2 +- internal/handler/registry/postgresql.go | 7 +++--- internal/handler/registry/sqlite.go | 7 +++--- 12 files changed, 63 insertions(+), 41 deletions(-) diff --git a/internal/backends/postgresql/backend.go b/internal/backends/postgresql/backend.go index 9aed280fb6cc..d9de166b3e16 100644 --- a/internal/backends/postgresql/backend.go +++ b/internal/backends/postgresql/backend.go @@ -37,15 +37,16 @@ type backend struct { // //nolint:vet // for readability type NewBackendParams struct { - URI string - L *zap.Logger - P *state.Provider - _ struct{} // prevent unkeyed literals + URI string + L *zap.Logger + P *state.Provider + BatchSize int + _ struct{} // prevent unkeyed literals } // NewBackend creates a new Backend. func NewBackend(params *NewBackendParams) (backends.Backend, error) { - r, err := metadata.NewRegistry(params.URI, params.L, params.P) + r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P) if err != nil { return nil, err } diff --git a/internal/backends/postgresql/collection.go b/internal/backends/postgresql/collection.go index 66f9214eed8f..fe0c535206b7 100644 --- a/internal/backends/postgresql/collection.go +++ b/internal/backends/postgresql/collection.go @@ -134,8 +134,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa } err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error { - // TODO https://github.com/FerretDB/FerretDB/issues/3708 - const batchSize = 100 + batchSize := c.r.BatchSize() var batch []*types.Document docs := params.Docs diff --git a/internal/backends/postgresql/metadata/registry.go b/internal/backends/postgresql/metadata/registry.go index eaec9ed156b8..ed09b1a9a318 100644 --- a/internal/backends/postgresql/metadata/registry.go +++ b/internal/backends/postgresql/metadata/registry.go @@ -81,20 +81,22 @@ type Registry struct { // One global lock should be replaced by more granular locks – one per database or even one per collection. // But that requires some redesign. // TODO https://github.com/FerretDB/FerretDB/issues/2755 - rw sync.RWMutex - colls map[string]map[string]*Collection // database name -> collection name -> collection + rw sync.RWMutex + colls map[string]map[string]*Collection // database name -> collection name -> collection + batchSize int } // NewRegistry creates a registry for PostgreSQL databases with a given base URI. -func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) { +func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) { p, err := pool.New(u, l, sp) if err != nil { return nil, err } r := &Registry{ - p: p, - l: l, + p: p, + l: l, + batchSize: batchSize, } return r, nil @@ -1032,6 +1034,14 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) { } } +// BatchSize returns number of maximum size of query parameters. +func (r *Registry) BatchSize() int { + r.rw.RLock() + batchSize := r.batchSize + r.rw.RUnlock() + return batchSize +} + // check interfaces var ( _ prometheus.Collector = (*Registry)(nil) diff --git a/internal/backends/postgresql/metadata/registry_test.go b/internal/backends/postgresql/metadata/registry_test.go index ac29e0003042..8c0b25068ec2 100644 --- a/internal/backends/postgresql/metadata/registry_test.go +++ b/internal/backends/postgresql/metadata/registry_test.go @@ -91,7 +91,7 @@ func createDatabase(t *testing.T, ctx context.Context) (*Registry, *pgxpool.Pool sp, err := state.NewProvider("") require.NoError(t, err) - r, err := NewRegistry(u, testutil.Logger(t), sp) + r, err := NewRegistry(u, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -140,7 +140,7 @@ func TestAuth(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - r, err := NewRegistry(tc.uri, testutil.Logger(t), sp) + r, err := NewRegistry(tc.uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) diff --git a/internal/backends/sqlite/backend.go b/internal/backends/sqlite/backend.go index b7678a85b011..44452d7c4429 100644 --- a/internal/backends/sqlite/backend.go +++ b/internal/backends/sqlite/backend.go @@ -37,15 +37,16 @@ type backend struct { // //nolint:vet // for readability type NewBackendParams struct { - URI string - L *zap.Logger - P *state.Provider - _ struct{} // prevent unkeyed literals + URI string + L *zap.Logger + P *state.Provider + BatchSize int + _ struct{} // prevent unkeyed literals } // NewBackend creates a new Backend. func NewBackend(params *NewBackendParams) (backends.Backend, error) { - r, err := metadata.NewRegistry(params.URI, params.L, params.P) + r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P) if err != nil { return nil, err } diff --git a/internal/backends/sqlite/collection.go b/internal/backends/sqlite/collection.go index 28c36341086a..5629f2ef4c86 100644 --- a/internal/backends/sqlite/collection.go +++ b/internal/backends/sqlite/collection.go @@ -109,8 +109,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa meta := c.r.CollectionGet(ctx, c.dbName, c.name) err := db.InTransaction(ctx, func(tx *fsql.Tx) error { - // TODO https://github.com/FerretDB/FerretDB/issues/3708 - const batchSize = 100 + batchSize := c.r.BatchSize() var batch []*types.Document docs := params.Docs diff --git a/internal/backends/sqlite/metadata/registry.go b/internal/backends/sqlite/metadata/registry.go index 23e1e6fe4e75..603b3f5fb4ac 100644 --- a/internal/backends/sqlite/metadata/registry.go +++ b/internal/backends/sqlite/metadata/registry.go @@ -64,21 +64,23 @@ type Registry struct { // One global lock should be replaced by more granular locks – one per database or even one per collection. // But that requires some redesign. // TODO https://github.com/FerretDB/FerretDB/issues/2755 - rw sync.RWMutex - colls map[string]map[string]*Collection // database name -> collection name -> collection + rw sync.RWMutex + colls map[string]map[string]*Collection // database name -> collection name -> collection + batchSize int } // NewRegistry creates a registry for SQLite databases in the directory specified by SQLite URI. -func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) { +func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) { p, initDBs, err := pool.New(u, l, sp) if err != nil { return nil, err } r := &Registry{ - p: p, - l: l, - colls: map[string]map[string]*Collection{}, + p: p, + l: l, + colls: map[string]map[string]*Collection{}, + batchSize: batchSize, } for name, db := range initDBs { @@ -640,6 +642,14 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) { } } +// BatchSize returns number of maximum size of query parameters. +func (r *Registry) BatchSize() int { + r.rw.RLock() + batchSize := r.batchSize + r.rw.RUnlock() + return batchSize +} + // check interfaces var ( _ prometheus.Collector = (*Registry)(nil) diff --git a/internal/backends/sqlite/metadata/registry_test.go b/internal/backends/sqlite/metadata/registry_test.go index cb3a4ec7074d..64739cfe36c9 100644 --- a/internal/backends/sqlite/metadata/registry_test.go +++ b/internal/backends/sqlite/metadata/registry_test.go @@ -75,7 +75,7 @@ func TestCreateDrop(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp) + r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -113,7 +113,7 @@ func TestCreateDropStress(t *testing.T) { } { t.Run(testName, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - r, err := NewRegistry(uri, testutil.Logger(t), sp) + r, err := NewRegistry(uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -158,7 +158,7 @@ func TestCreateSameStress(t *testing.T) { } { t.Run(testName, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - r, err := NewRegistry(uri, testutil.Logger(t), sp) + r, err := NewRegistry(uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -227,7 +227,7 @@ func TestDropSameStress(t *testing.T) { } { t.Run(testName, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - r, err := NewRegistry(uri, testutil.Logger(t), sp) + r, err := NewRegistry(uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -280,7 +280,7 @@ func TestCreateDropSameStress(t *testing.T) { } { t.Run(testName, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - r, err := NewRegistry(uri, testutil.Logger(t), sp) + r, err := NewRegistry(uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -329,7 +329,7 @@ func TestIndexesCreateDrop(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp) + r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) diff --git a/internal/backends/sqlite/sqlite_test.go b/internal/backends/sqlite/sqlite_test.go index a5251637cdb9..85c574406e3d 100644 --- a/internal/backends/sqlite/sqlite_test.go +++ b/internal/backends/sqlite/sqlite_test.go @@ -31,7 +31,7 @@ func TestCollectionsStats(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp) + r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) diff --git a/internal/handler/registry/hana.go b/internal/handler/registry/hana.go index 976390966a71..0c20a153cc27 100644 --- a/internal/handler/registry/hana.go +++ b/internal/handler/registry/hana.go @@ -48,7 +48,7 @@ func init() { CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: opts.EnableNewAuth, - BatchSize: opts.BatchSize + BatchSize: opts.BatchSize, } h, err := handler.New(handlerOpts) diff --git a/internal/handler/registry/postgresql.go b/internal/handler/registry/postgresql.go index 61ef99a12149..f24eadd5b130 100644 --- a/internal/handler/registry/postgresql.go +++ b/internal/handler/registry/postgresql.go @@ -25,9 +25,10 @@ import ( func init() { registry["postgresql"] = func(opts *NewHandlerOpts) (*handler.Handler, CloseBackendFunc, error) { b, err := postgresql.NewBackend(&postgresql.NewBackendParams{ - URI: opts.PostgreSQLURL, - L: opts.Logger.Named("postgresql"), - P: opts.StateProvider, + URI: opts.PostgreSQLURL, + L: opts.Logger.Named("postgresql"), + P: opts.StateProvider, + BatchSize: opts.BatchSize, }) if err != nil { return nil, nil, err diff --git a/internal/handler/registry/sqlite.go b/internal/handler/registry/sqlite.go index 48725d6bd412..efa5c98bc988 100644 --- a/internal/handler/registry/sqlite.go +++ b/internal/handler/registry/sqlite.go @@ -25,9 +25,10 @@ import ( func init() { registry["sqlite"] = func(opts *NewHandlerOpts) (*handler.Handler, CloseBackendFunc, error) { b, err := sqlite.NewBackend(&sqlite.NewBackendParams{ - URI: opts.SQLiteURL, - L: opts.Logger.Named("sqlite"), - P: opts.StateProvider, + URI: opts.SQLiteURL, + L: opts.Logger.Named("sqlite"), + P: opts.StateProvider, + BatchSize: opts.BatchSize, }) if err != nil { return nil, nil, err From 309ccbfb7782cf1efe08c7770e2fb2fc71cccdfb Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Tue, 5 Mar 2024 00:32:24 +0100 Subject: [PATCH 03/14] WIP --- internal/backends/helpers_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/backends/helpers_test.go b/internal/backends/helpers_test.go index 7f3ebd825f26..ef8fe5930809 100644 --- a/internal/backends/helpers_test.go +++ b/internal/backends/helpers_test.go @@ -53,9 +53,10 @@ func testBackends(t *testing.T) map[string]*testBackend { require.NoError(t, err) b, err := postgresql.NewBackend(&postgresql.NewBackendParams{ - URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""), - L: l.Named("postgresql"), - P: sp, + URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""), + L: l.Named("postgresql"), + P: sp, + BatchSize: 1000, }) require.NoError(t, err) t.Cleanup(b.Close) @@ -71,9 +72,10 @@ func testBackends(t *testing.T) map[string]*testBackend { require.NoError(t, err) b, err := sqlite.NewBackend(&sqlite.NewBackendParams{ - URI: testutil.TestSQLiteURI(t, ""), - L: l.Named("sqlite"), - P: sp, + URI: testutil.TestSQLiteURI(t, ""), + L: l.Named("sqlite"), + P: sp, + BatchSize: 100, }) require.NoError(t, err) t.Cleanup(b.Close) From d43898bdf82157336a58a9eaade7fb5ca19c3604 Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Tue, 5 Mar 2024 00:38:09 +0100 Subject: [PATCH 04/14] fix postgres TestDatabaseStats --- internal/backends/postgresql/database_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/backends/postgresql/database_test.go b/internal/backends/postgresql/database_test.go index 99840811f95c..1f4c72685a03 100644 --- a/internal/backends/postgresql/database_test.go +++ b/internal/backends/postgresql/database_test.go @@ -40,9 +40,10 @@ func TestDatabaseStats(t *testing.T) { require.NoError(t, err) params := NewBackendParams{ - URI: testutil.TestPostgreSQLURI(t, ctx, ""), - L: testutil.Logger(t), - P: sp, + URI: testutil.TestPostgreSQLURI(t, ctx, ""), + L: testutil.Logger(t), + P: sp, + BatchSize: 1000, } b, err := NewBackend(¶ms) require.NoError(t, err) From b277e01da57748035ce4bba194812cca77f6ba4a Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Tue, 5 Mar 2024 00:40:36 +0100 Subject: [PATCH 05/14] fix sqlite TestCappedCollectionInsertAllQueryExplain --- internal/backends/sqlite/collection_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/backends/sqlite/collection_test.go b/internal/backends/sqlite/collection_test.go index f257ffcae0a8..d062064cf51e 100644 --- a/internal/backends/sqlite/collection_test.go +++ b/internal/backends/sqlite/collection_test.go @@ -39,7 +39,7 @@ func TestCappedCollectionInsertAllQueryExplain(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp}) + b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp, BatchSize: 100}) require.NoError(t, err) t.Cleanup(b.Close) From 2a6219ab45e9eb4ac5ca335174269a0dd34e049e Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Tue, 5 Mar 2024 00:42:21 +0100 Subject: [PATCH 06/14] fix sqlite TestDatabaseStatsFreeStorage --- internal/backends/sqlite/database_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/backends/sqlite/database_test.go b/internal/backends/sqlite/database_test.go index 2d9b0a343271..d287733fee17 100644 --- a/internal/backends/sqlite/database_test.go +++ b/internal/backends/sqlite/database_test.go @@ -72,7 +72,7 @@ func TestDatabaseStatsFreeStorage(t *testing.T) { name, params := name, params t.Run(name, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp}) + b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp, BatchSize: 100}) require.NoError(t, err) t.Cleanup(b.Close) From 2c51cabf9741b1016ef174df4b18a226ae38eedf Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Wed, 6 Mar 2024 00:33:39 +0100 Subject: [PATCH 07/14] WIP --- Taskfile.yml | 6 ++++++ cmd/ferretdb/main.go | 2 +- internal/backends/hana/backend.go | 8 +++++--- internal/backends/postgresql/collection.go | 2 +- .../backends/postgresql/metadata/registry.go | 20 ++++++------------- internal/backends/sqlite/collection.go | 2 +- internal/backends/sqlite/metadata/registry.go | 20 ++++++------------- internal/handler/msg_insert.go | 1 - internal/handler/registry/hana.go | 7 ++++--- internal/util/fsql/db.go | 7 ++++--- 10 files changed, 34 insertions(+), 41 deletions(-) diff --git a/Taskfile.yml b/Taskfile.yml index af97ffce5656..ae53b569723f 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -381,6 +381,7 @@ tasks: --handler=sqlite --sqlite-url=file:tmp/sqlite/ --test-records-dir=tmp/records + --test-batch-size=1000 run-mysql: desc: "Run FerretDB with `mysql` backend" @@ -407,6 +408,7 @@ tasks: --mode=diff-normal --handler=hana --test-records-dir=tmp/records + --test-batch-size=100 run-secured: desc: "Run FerretDB with `postgresql` backend (TLS, auth required)" @@ -424,6 +426,7 @@ tasks: --handler=pg --postgresql-url='postgres://127.0.0.1:5433/ferretdb?search_path=' --test-records-dir=tmp/records + --test-batch-size=100 run-proxy: desc: "Run FerretDB in diff-proxy mode" @@ -437,6 +440,7 @@ tasks: --handler=pg --postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path=' --test-records-dir=tmp/records + --test-batch-size=100 run-sqlite-proxy: desc: "Run FerretDB with `sqlite` handler in diff-proxy mode" @@ -451,6 +455,7 @@ tasks: --handler=sqlite --sqlite-url=file:tmp/sqlite/ --test-records-dir=tmp/records + --test-batch-size=1000 run-proxy-secured: desc: "Run FerretDB in diff-proxy mode (TLS, auth required)" @@ -471,6 +476,7 @@ tasks: --handler=pg --postgresql-url='postgres://username@127.0.0.1:5433/ferretdb?search_path=' --test-records-dir=tmp/records + --test-batch-size=100 lint: desc: "Run linters" diff --git a/cmd/ferretdb/main.go b/cmd/ferretdb/main.go index 1adcef45ce6f..2b31108805fa 100644 --- a/cmd/ferretdb/main.go +++ b/cmd/ferretdb/main.go @@ -101,7 +101,7 @@ var cli struct { } `embed:"" prefix:"capped-cleanup-"` EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."` - BatchSize int `default:"100" help:"number of maximum size of query parameters"` + BatchSize int `default:"100" help:"number of maximum size of query parameters"` Telemetry struct { URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."` diff --git a/internal/backends/hana/backend.go b/internal/backends/hana/backend.go index e396c978c3f5..d8d759153e24 100644 --- a/internal/backends/hana/backend.go +++ b/internal/backends/hana/backend.go @@ -37,9 +37,10 @@ type backend struct { // //nolint:vet // for readability type NewBackendParams struct { - URI string - L *zap.Logger - P *state.Provider + URI string + L *zap.Logger + P *state.Provider + BatchSize int } // NewBackend creates a new Backend. @@ -50,6 +51,7 @@ func NewBackend(params *NewBackendParams) (backends.Backend, error) { } hdb := fsql.WrapDB(db, "hana", params.L) + hdb.BatchSize = params.BatchSize return backends.BackendContract(&backend{ hdb: hdb, diff --git a/internal/backends/postgresql/collection.go b/internal/backends/postgresql/collection.go index fe0c535206b7..3539c04f6c9d 100644 --- a/internal/backends/postgresql/collection.go +++ b/internal/backends/postgresql/collection.go @@ -134,7 +134,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa } err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error { - batchSize := c.r.BatchSize() + batchSize := c.r.BatchSize var batch []*types.Document docs := params.Docs diff --git a/internal/backends/postgresql/metadata/registry.go b/internal/backends/postgresql/metadata/registry.go index ed09b1a9a318..2ca65a4a49c8 100644 --- a/internal/backends/postgresql/metadata/registry.go +++ b/internal/backends/postgresql/metadata/registry.go @@ -73,17 +73,17 @@ var specialCharacters = regexp.MustCompile("[^a-z][^a-z0-9_]*") // //nolint:vet // for readability type Registry struct { - p *pool.Pool - l *zap.Logger + p *pool.Pool + l *zap.Logger + BatchSize int // rw protects colls but also acts like a global lock for the whole registry. // The latter effectively replaces transactions (see the postgresql backend package description for more info). // One global lock should be replaced by more granular locks – one per database or even one per collection. // But that requires some redesign. // TODO https://github.com/FerretDB/FerretDB/issues/2755 - rw sync.RWMutex - colls map[string]map[string]*Collection // database name -> collection name -> collection - batchSize int + rw sync.RWMutex + colls map[string]map[string]*Collection // database name -> collection name -> collection } // NewRegistry creates a registry for PostgreSQL databases with a given base URI. @@ -96,7 +96,7 @@ func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*R r := &Registry{ p: p, l: l, - batchSize: batchSize, + BatchSize: batchSize, } return r, nil @@ -1034,14 +1034,6 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) { } } -// BatchSize returns number of maximum size of query parameters. -func (r *Registry) BatchSize() int { - r.rw.RLock() - batchSize := r.batchSize - r.rw.RUnlock() - return batchSize -} - // check interfaces var ( _ prometheus.Collector = (*Registry)(nil) diff --git a/internal/backends/sqlite/collection.go b/internal/backends/sqlite/collection.go index 5629f2ef4c86..1d0c96a35e8c 100644 --- a/internal/backends/sqlite/collection.go +++ b/internal/backends/sqlite/collection.go @@ -109,7 +109,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa meta := c.r.CollectionGet(ctx, c.dbName, c.name) err := db.InTransaction(ctx, func(tx *fsql.Tx) error { - batchSize := c.r.BatchSize() + batchSize := c.r.BatchSize var batch []*types.Document docs := params.Docs diff --git a/internal/backends/sqlite/metadata/registry.go b/internal/backends/sqlite/metadata/registry.go index 603b3f5fb4ac..65d12d641e8c 100644 --- a/internal/backends/sqlite/metadata/registry.go +++ b/internal/backends/sqlite/metadata/registry.go @@ -56,17 +56,17 @@ const ( // // Exported methods are safe for concurrent use. Unexported methods are not. type Registry struct { - p *pool.Pool - l *zap.Logger + p *pool.Pool + l *zap.Logger + BatchSize int // rw protects colls but also acts like a global lock for the whole registry. // The latter effectively replaces transactions (see the sqlite backend package description for more info). // One global lock should be replaced by more granular locks – one per database or even one per collection. // But that requires some redesign. // TODO https://github.com/FerretDB/FerretDB/issues/2755 - rw sync.RWMutex - colls map[string]map[string]*Collection // database name -> collection name -> collection - batchSize int + rw sync.RWMutex + colls map[string]map[string]*Collection // database name -> collection name -> collection } // NewRegistry creates a registry for SQLite databases in the directory specified by SQLite URI. @@ -79,8 +79,8 @@ func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*R r := &Registry{ p: p, l: l, + BatchSize: batchSize, colls: map[string]map[string]*Collection{}, - batchSize: batchSize, } for name, db := range initDBs { @@ -642,14 +642,6 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) { } } -// BatchSize returns number of maximum size of query parameters. -func (r *Registry) BatchSize() int { - r.rw.RLock() - batchSize := r.batchSize - r.rw.RUnlock() - return batchSize -} - // check interfaces var ( _ prometheus.Collector = (*Registry)(nil) diff --git a/internal/handler/msg_insert.go b/internal/handler/msg_insert.go index 4359cf190787..69c30fcadf4a 100644 --- a/internal/handler/msg_insert.go +++ b/internal/handler/msg_insert.go @@ -85,7 +85,6 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, var done bool for !done { - docs := make([]*types.Document, 0, h.BatchSize) docsIndexes := make([]int, 0, h.BatchSize) diff --git a/internal/handler/registry/hana.go b/internal/handler/registry/hana.go index 0c20a153cc27..0b94f9f67802 100644 --- a/internal/handler/registry/hana.go +++ b/internal/handler/registry/hana.go @@ -27,9 +27,10 @@ func init() { opts.Logger.Warn("HANA handler is in alpha. It is not supported yet.") b, err := hana.NewBackend(&hana.NewBackendParams{ - URI: opts.HANAURL, - L: opts.Logger.Named("hana"), - P: opts.StateProvider, + URI: opts.HANAURL, + L: opts.Logger.Named("hana"), + P: opts.StateProvider, + BatchSize: opts.BatchSize, }) if err != nil { return nil, nil, err diff --git a/internal/util/fsql/db.go b/internal/util/fsql/db.go index d21e80ddf98c..b358e5716f43 100644 --- a/internal/util/fsql/db.go +++ b/internal/util/fsql/db.go @@ -35,9 +35,10 @@ import ( type DB struct { *metricsCollector - sqlDB *sql.DB - l *zap.Logger - token *resource.Token + sqlDB *sql.DB + l *zap.Logger + token *resource.Token + BatchSize int } // WrapDB creates a new DB. From a421052d33844e3a38b44243c14fee80ed07d141 Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Wed, 6 Mar 2024 00:43:14 +0100 Subject: [PATCH 08/14] wip --- cmd/ferretdb/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/ferretdb/main.go b/cmd/ferretdb/main.go index 2b31108805fa..9723758fa67e 100644 --- a/cmd/ferretdb/main.go +++ b/cmd/ferretdb/main.go @@ -101,7 +101,7 @@ var cli struct { } `embed:"" prefix:"capped-cleanup-"` EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."` - BatchSize int `default:"100" help:"number of maximum size of query parameters"` + BatchSize int `default:"100" help:"number of maximum size of query parameters"` Telemetry struct { URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."` From cae4c98cb69084747821d6f9d77b4a246e22c76f Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Tue, 12 Mar 2024 00:27:14 +0100 Subject: [PATCH 09/14] add panic for `batchSize` at `InsertAll` --- internal/backends/postgresql/collection.go | 3 +++ internal/backends/sqlite/collection.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/internal/backends/postgresql/collection.go b/internal/backends/postgresql/collection.go index 3539c04f6c9d..3a3b8b556659 100644 --- a/internal/backends/postgresql/collection.go +++ b/internal/backends/postgresql/collection.go @@ -135,6 +135,9 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error { batchSize := c.r.BatchSize + if batchSize < 1 { + panic("batch-size should be greater than 1") + } var batch []*types.Document docs := params.Docs diff --git a/internal/backends/sqlite/collection.go b/internal/backends/sqlite/collection.go index 1d0c96a35e8c..37476b28ddc9 100644 --- a/internal/backends/sqlite/collection.go +++ b/internal/backends/sqlite/collection.go @@ -110,6 +110,9 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa err := db.InTransaction(ctx, func(tx *fsql.Tx) error { batchSize := c.r.BatchSize + if batchSize < 1 { + panic("batch-size should be greater than 1") + } var batch []*types.Document docs := params.Docs From 5092b596578a60b5b9a8aa62c5a732c997f83a42 Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Wed, 13 Mar 2024 23:38:49 +0100 Subject: [PATCH 10/14] add `batch-size` for integration --- Taskfile.yml | 3 +++ integration/setup/listener.go | 1 + integration/setup/setup.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/Taskfile.yml b/Taskfile.yml index b592df97b998..ba54c4e84795 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -218,6 +218,7 @@ tasks: -postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path=' -compat-url='mongodb://username:password@127.0.0.1:47018/?tls=true&tlsCertificateKeyFile=../build/certs/client.pem&tlsCaFile=../build/certs/rootCA-cert.pem&replicaSet=rs0' -disable-pushdown={{.DISABLE_PUSHDOWN}} + -batch-size=100 test-integration-sqlite: desc: "Run integration tests for `sqlite` backend" @@ -244,6 +245,7 @@ tasks: -target-tls -compat-url='mongodb://username:password@127.0.0.1:47018/?tls=true&tlsCertificateKeyFile=../build/certs/client.pem&tlsCaFile=../build/certs/rootCA-cert.pem&replicaSet=rs0' -disable-pushdown={{.DISABLE_PUSHDOWN}} + -batch-size=1000 test-integration-mysql: desc: "Run integration tests for `mysql` handler" @@ -364,6 +366,7 @@ tasks: --handler=pg --postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path=' --test-records-dir=tmp/records + --test-batch-size=100 run-sqlite: desc: "Run FerretDB with `sqlite` backend" diff --git a/integration/setup/listener.go b/integration/setup/listener.go index 1eba1d53d55f..eef28f28a435 100644 --- a/integration/setup/listener.go +++ b/integration/setup/listener.go @@ -183,6 +183,7 @@ func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger) string CappedCleanupPercentage: 20, CappedCleanupInterval: 0, EnableNewAuth: true, + BatchSize: *batchSizeF, }, } h, closeBackend, err := registry.NewHandler(handler, handlerOpts) diff --git a/integration/setup/setup.go b/integration/setup/setup.go index 83970fc5fab8..b5623c7d133a 100644 --- a/integration/setup/setup.go +++ b/integration/setup/setup.go @@ -52,6 +52,8 @@ var ( mysqlURLF = flag.String("mysql-url", "", "in-process FerretDB: MySQL URL for 'mysql' handler.") hanaURLF = flag.String("hana-url", "", "in-process FerretDB: Hana URL for 'hana' handler.") + batchSizeF = flag.Int("batch-size", 100, "number of maximum size of query parameters") + compatURLF = flag.String("compat-url", "", "compat system's (MongoDB) URL for compatibility tests; if empty, they are skipped") benchDocsF = flag.Int("bench-docs", 1000, "benchmarks: number of documents to generate per iteration") From 8099fc22316e5fe0745dd1289f2fe01025447ef3 Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Wed, 13 Mar 2024 23:39:46 +0100 Subject: [PATCH 11/14] improove `batch-size` panic message --- internal/backends/postgresql/collection.go | 2 +- internal/backends/sqlite/collection.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/backends/postgresql/collection.go b/internal/backends/postgresql/collection.go index 3a3b8b556659..bf4fd59d4518 100644 --- a/internal/backends/postgresql/collection.go +++ b/internal/backends/postgresql/collection.go @@ -136,7 +136,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error { batchSize := c.r.BatchSize if batchSize < 1 { - panic("batch-size should be greater than 1") + panic("batch-size should be greater or equal to 1") } var batch []*types.Document diff --git a/internal/backends/sqlite/collection.go b/internal/backends/sqlite/collection.go index 37476b28ddc9..6351a93cd22a 100644 --- a/internal/backends/sqlite/collection.go +++ b/internal/backends/sqlite/collection.go @@ -111,7 +111,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa err := db.InTransaction(ctx, func(tx *fsql.Tx) error { batchSize := c.r.BatchSize if batchSize < 1 { - panic("batch-size should be greater than 1") + panic("batch-size should be greater or equal to 1") } var batch []*types.Document From 75b8ae63d4998ed41b56ee52957a9e0a9552ddba Mon Sep 17 00:00:00 2001 From: kropidlowsky Date: Thu, 14 Mar 2024 12:27:55 +0100 Subject: [PATCH 12/14] correct flag help info --- cmd/ferretdb/main.go | 2 +- integration/setup/setup.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/ferretdb/main.go b/cmd/ferretdb/main.go index 9723758fa67e..7a177bdd65bb 100644 --- a/cmd/ferretdb/main.go +++ b/cmd/ferretdb/main.go @@ -101,7 +101,7 @@ var cli struct { } `embed:"" prefix:"capped-cleanup-"` EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."` - BatchSize int `default:"100" help:"number of maximum size of query parameters"` + BatchSize int `default:"100" help:"number of maximum insertion batch size"` Telemetry struct { URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."` diff --git a/integration/setup/setup.go b/integration/setup/setup.go index b5623c7d133a..7c02f5c371e1 100644 --- a/integration/setup/setup.go +++ b/integration/setup/setup.go @@ -52,7 +52,7 @@ var ( mysqlURLF = flag.String("mysql-url", "", "in-process FerretDB: MySQL URL for 'mysql' handler.") hanaURLF = flag.String("hana-url", "", "in-process FerretDB: Hana URL for 'hana' handler.") - batchSizeF = flag.Int("batch-size", 100, "number of maximum size of query parameters") + batchSizeF = flag.Int("batch-size", 100, "number of maximum insertion batch size") compatURLF = flag.String("compat-url", "", "compat system's (MongoDB) URL for compatibility tests; if empty, they are skipped") From 7aea53bdc2bd2028a4615bf83aa239010ca6522f Mon Sep 17 00:00:00 2001 From: Alexey Palazhchenko Date: Tue, 19 Mar 2024 17:56:00 +0400 Subject: [PATCH 13/14] Improve flag description --- cmd/ferretdb/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/ferretdb/main.go b/cmd/ferretdb/main.go index 7a177bdd65bb..0e6ca88db610 100644 --- a/cmd/ferretdb/main.go +++ b/cmd/ferretdb/main.go @@ -101,7 +101,7 @@ var cli struct { } `embed:"" prefix:"capped-cleanup-"` EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."` - BatchSize int `default:"100" help:"number of maximum insertion batch size"` + BatchSize int `default:"100" help:"Experimental: maximum insertion batch size."` Telemetry struct { URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."` From 0a86ac7665481490adb2eab4b899b54e20b7f2ad Mon Sep 17 00:00:00 2001 From: Alexey Palazhchenko Date: Tue, 19 Mar 2024 18:08:09 +0400 Subject: [PATCH 14/14] Simplify --- Taskfile.yml | 9 --------- integration/setup/setup.go | 2 +- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/Taskfile.yml b/Taskfile.yml index f783d2696260..d62b943faaa9 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -218,7 +218,6 @@ tasks: -postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path=' -compat-url='mongodb://username:password@127.0.0.1:47018/?tls=true&tlsCertificateKeyFile=../build/certs/client.pem&tlsCaFile=../build/certs/rootCA-cert.pem&replicaSet=rs0' -disable-pushdown={{.DISABLE_PUSHDOWN}} - -batch-size=100 test-integration-sqlite: desc: "Run integration tests for `sqlite` backend" @@ -245,7 +244,6 @@ tasks: -target-tls -compat-url='mongodb://username:password@127.0.0.1:47018/?tls=true&tlsCertificateKeyFile=../build/certs/client.pem&tlsCaFile=../build/certs/rootCA-cert.pem&replicaSet=rs0' -disable-pushdown={{.DISABLE_PUSHDOWN}} - -batch-size=1000 test-integration-mysql: desc: "Run integration tests for `mysql` handler" @@ -366,7 +364,6 @@ tasks: --handler=pg --postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path=' --test-records-dir=tmp/records - --test-batch-size=100 run-sqlite: desc: "Run FerretDB with `sqlite` backend" @@ -381,7 +378,6 @@ tasks: --handler=sqlite --sqlite-url=file:tmp/sqlite/ --test-records-dir=tmp/records - --test-batch-size=1000 run-mysql: desc: "Run FerretDB with `mysql` backend" @@ -408,7 +404,6 @@ tasks: --mode=diff-normal --handler=hana --test-records-dir=tmp/records - --test-batch-size=100 run-secured: desc: "Run FerretDB with `postgresql` backend (TLS, auth required)" @@ -426,7 +421,6 @@ tasks: --handler=pg --postgresql-url='postgres://127.0.0.1:5433/ferretdb?search_path=' --test-records-dir=tmp/records - --test-batch-size=100 run-proxy: desc: "Run FerretDB in diff-proxy mode" @@ -440,7 +434,6 @@ tasks: --handler=pg --postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path=' --test-records-dir=tmp/records - --test-batch-size=100 run-sqlite-proxy: desc: "Run FerretDB with `sqlite` handler in diff-proxy mode" @@ -455,7 +448,6 @@ tasks: --handler=sqlite --sqlite-url=file:tmp/sqlite/ --test-records-dir=tmp/records - --test-batch-size=1000 run-proxy-secured: desc: "Run FerretDB in diff-proxy mode (TLS, auth required)" @@ -476,7 +468,6 @@ tasks: --handler=pg --postgresql-url='postgres://username@127.0.0.1:5433/ferretdb?search_path=' --test-records-dir=tmp/records - --test-batch-size=100 lint: desc: "Run linters" diff --git a/integration/setup/setup.go b/integration/setup/setup.go index e016e9735dc9..2bcee1be3948 100644 --- a/integration/setup/setup.go +++ b/integration/setup/setup.go @@ -56,7 +56,7 @@ var ( mysqlURLF = flag.String("mysql-url", "", "in-process FerretDB: MySQL URL for 'mysql' handler.") hanaURLF = flag.String("hana-url", "", "in-process FerretDB: Hana URL for 'hana' handler.") - batchSizeF = flag.Int("batch-size", 100, "number of maximum insertion batch size") + batchSizeF = flag.Int("batch-size", 100, "maximum insertion batch size") compatURLF = flag.String("compat-url", "", "compat system's (MongoDB) URL for compatibility tests; if empty, they are skipped")