diff --git a/cmd/ferretdb/main.go b/cmd/ferretdb/main.go index bcfd8e3d7a0f..0e6ca88db610 100644 --- a/cmd/ferretdb/main.go +++ b/cmd/ferretdb/main.go @@ -101,6 +101,7 @@ var cli struct { } `embed:"" prefix:"capped-cleanup-"` EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."` + BatchSize int `default:"100" help:"Experimental: maximum insertion batch size."` Telemetry struct { URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."` @@ -412,6 +413,7 @@ func run() { CappedCleanupInterval: cli.Test.CappedCleanup.Interval, CappedCleanupPercentage: cli.Test.CappedCleanup.Percentage, EnableNewAuth: cli.Test.EnableNewAuth, + BatchSize: cli.Test.BatchSize, }, }) if err != nil { diff --git a/integration/setup/listener.go b/integration/setup/listener.go index e795d810575d..307e503bd6e1 100644 --- a/integration/setup/listener.go +++ b/integration/setup/listener.go @@ -185,6 +185,7 @@ func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger, opts * CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: true, + BatchSize: *batchSizeF, }, } h, closeBackend, err := registry.NewHandler(handler, handlerOpts) diff --git a/integration/setup/setup.go b/integration/setup/setup.go index 6cd474928555..2bcee1be3948 100644 --- a/integration/setup/setup.go +++ b/integration/setup/setup.go @@ -56,6 +56,8 @@ var ( mysqlURLF = flag.String("mysql-url", "", "in-process FerretDB: MySQL URL for 'mysql' handler.") hanaURLF = flag.String("hana-url", "", "in-process FerretDB: Hana URL for 'hana' handler.") + batchSizeF = flag.Int("batch-size", 100, "maximum insertion batch size") + compatURLF = flag.String("compat-url", "", "compat system's (MongoDB) URL for compatibility tests; if empty, they are skipped") benchDocsF = flag.Int("bench-docs", 1000, "benchmarks: number of documents to generate per iteration") diff --git a/internal/backends/hana/backend.go b/internal/backends/hana/backend.go index e396c978c3f5..d8d759153e24 100644 --- a/internal/backends/hana/backend.go +++ b/internal/backends/hana/backend.go @@ -37,9 +37,10 @@ type backend struct { // //nolint:vet // for readability type NewBackendParams struct { - URI string - L *zap.Logger - P *state.Provider + URI string + L *zap.Logger + P *state.Provider + BatchSize int } // NewBackend creates a new Backend. @@ -50,6 +51,7 @@ func NewBackend(params *NewBackendParams) (backends.Backend, error) { } hdb := fsql.WrapDB(db, "hana", params.L) + hdb.BatchSize = params.BatchSize return backends.BackendContract(&backend{ hdb: hdb, diff --git a/internal/backends/helpers_test.go b/internal/backends/helpers_test.go index 7f3ebd825f26..ef8fe5930809 100644 --- a/internal/backends/helpers_test.go +++ b/internal/backends/helpers_test.go @@ -53,9 +53,10 @@ func testBackends(t *testing.T) map[string]*testBackend { require.NoError(t, err) b, err := postgresql.NewBackend(&postgresql.NewBackendParams{ - URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""), - L: l.Named("postgresql"), - P: sp, + URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""), + L: l.Named("postgresql"), + P: sp, + BatchSize: 1000, }) require.NoError(t, err) t.Cleanup(b.Close) @@ -71,9 +72,10 @@ func testBackends(t *testing.T) map[string]*testBackend { require.NoError(t, err) b, err := sqlite.NewBackend(&sqlite.NewBackendParams{ - URI: testutil.TestSQLiteURI(t, ""), - L: l.Named("sqlite"), - P: sp, + URI: testutil.TestSQLiteURI(t, ""), + L: l.Named("sqlite"), + P: sp, + BatchSize: 100, }) require.NoError(t, err) t.Cleanup(b.Close) diff --git a/internal/backends/postgresql/backend.go b/internal/backends/postgresql/backend.go index 9aed280fb6cc..d9de166b3e16 100644 --- a/internal/backends/postgresql/backend.go +++ b/internal/backends/postgresql/backend.go @@ -37,15 +37,16 @@ type backend struct { // //nolint:vet // for readability type NewBackendParams struct { - URI string - L *zap.Logger - P *state.Provider - _ struct{} // prevent unkeyed literals + URI string + L *zap.Logger + P *state.Provider + BatchSize int + _ struct{} // prevent unkeyed literals } // NewBackend creates a new Backend. func NewBackend(params *NewBackendParams) (backends.Backend, error) { - r, err := metadata.NewRegistry(params.URI, params.L, params.P) + r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P) if err != nil { return nil, err } diff --git a/internal/backends/postgresql/collection.go b/internal/backends/postgresql/collection.go index 66f9214eed8f..bf4fd59d4518 100644 --- a/internal/backends/postgresql/collection.go +++ b/internal/backends/postgresql/collection.go @@ -134,8 +134,10 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa } err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error { - // TODO https://github.com/FerretDB/FerretDB/issues/3708 - const batchSize = 100 + batchSize := c.r.BatchSize + if batchSize < 1 { + panic("batch-size should be greater or equal to 1") + } var batch []*types.Document docs := params.Docs diff --git a/internal/backends/postgresql/database_test.go b/internal/backends/postgresql/database_test.go index 99840811f95c..1f4c72685a03 100644 --- a/internal/backends/postgresql/database_test.go +++ b/internal/backends/postgresql/database_test.go @@ -40,9 +40,10 @@ func TestDatabaseStats(t *testing.T) { require.NoError(t, err) params := NewBackendParams{ - URI: testutil.TestPostgreSQLURI(t, ctx, ""), - L: testutil.Logger(t), - P: sp, + URI: testutil.TestPostgreSQLURI(t, ctx, ""), + L: testutil.Logger(t), + P: sp, + BatchSize: 1000, } b, err := NewBackend(¶ms) require.NoError(t, err) diff --git a/internal/backends/postgresql/metadata/registry.go b/internal/backends/postgresql/metadata/registry.go index c920a439a9a8..df368761511f 100644 --- a/internal/backends/postgresql/metadata/registry.go +++ b/internal/backends/postgresql/metadata/registry.go @@ -73,8 +73,9 @@ var specialCharacters = regexp.MustCompile("[^a-z][^a-z0-9_]*") // //nolint:vet // for readability type Registry struct { - p *pool.Pool - l *zap.Logger + p *pool.Pool + l *zap.Logger + BatchSize int // rw protects colls but also acts like a global lock for the whole registry. // The latter effectively replaces transactions (see the postgresql backend package description for more info). @@ -86,15 +87,16 @@ type Registry struct { } // NewRegistry creates a registry for PostgreSQL databases with a given base URI. -func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) { +func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) { p, err := pool.New(u, l, sp) if err != nil { return nil, err } r := &Registry{ - p: p, - l: l, + p: p, + l: l, + BatchSize: batchSize, } return r, nil diff --git a/internal/backends/postgresql/metadata/registry_test.go b/internal/backends/postgresql/metadata/registry_test.go index ac29e0003042..8c0b25068ec2 100644 --- a/internal/backends/postgresql/metadata/registry_test.go +++ b/internal/backends/postgresql/metadata/registry_test.go @@ -91,7 +91,7 @@ func createDatabase(t *testing.T, ctx context.Context) (*Registry, *pgxpool.Pool sp, err := state.NewProvider("") require.NoError(t, err) - r, err := NewRegistry(u, testutil.Logger(t), sp) + r, err := NewRegistry(u, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -140,7 +140,7 @@ func TestAuth(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - r, err := NewRegistry(tc.uri, testutil.Logger(t), sp) + r, err := NewRegistry(tc.uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) diff --git a/internal/backends/sqlite/backend.go b/internal/backends/sqlite/backend.go index b7678a85b011..44452d7c4429 100644 --- a/internal/backends/sqlite/backend.go +++ b/internal/backends/sqlite/backend.go @@ -37,15 +37,16 @@ type backend struct { // //nolint:vet // for readability type NewBackendParams struct { - URI string - L *zap.Logger - P *state.Provider - _ struct{} // prevent unkeyed literals + URI string + L *zap.Logger + P *state.Provider + BatchSize int + _ struct{} // prevent unkeyed literals } // NewBackend creates a new Backend. func NewBackend(params *NewBackendParams) (backends.Backend, error) { - r, err := metadata.NewRegistry(params.URI, params.L, params.P) + r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P) if err != nil { return nil, err } diff --git a/internal/backends/sqlite/collection.go b/internal/backends/sqlite/collection.go index 28c36341086a..6351a93cd22a 100644 --- a/internal/backends/sqlite/collection.go +++ b/internal/backends/sqlite/collection.go @@ -109,8 +109,10 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa meta := c.r.CollectionGet(ctx, c.dbName, c.name) err := db.InTransaction(ctx, func(tx *fsql.Tx) error { - // TODO https://github.com/FerretDB/FerretDB/issues/3708 - const batchSize = 100 + batchSize := c.r.BatchSize + if batchSize < 1 { + panic("batch-size should be greater or equal to 1") + } var batch []*types.Document docs := params.Docs diff --git a/internal/backends/sqlite/collection_test.go b/internal/backends/sqlite/collection_test.go index f257ffcae0a8..d062064cf51e 100644 --- a/internal/backends/sqlite/collection_test.go +++ b/internal/backends/sqlite/collection_test.go @@ -39,7 +39,7 @@ func TestCappedCollectionInsertAllQueryExplain(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp}) + b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp, BatchSize: 100}) require.NoError(t, err) t.Cleanup(b.Close) diff --git a/internal/backends/sqlite/database_test.go b/internal/backends/sqlite/database_test.go index 2d9b0a343271..d287733fee17 100644 --- a/internal/backends/sqlite/database_test.go +++ b/internal/backends/sqlite/database_test.go @@ -72,7 +72,7 @@ func TestDatabaseStatsFreeStorage(t *testing.T) { name, params := name, params t.Run(name, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp}) + b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp, BatchSize: 100}) require.NoError(t, err) t.Cleanup(b.Close) diff --git a/internal/backends/sqlite/metadata/registry.go b/internal/backends/sqlite/metadata/registry.go index 23e1e6fe4e75..65d12d641e8c 100644 --- a/internal/backends/sqlite/metadata/registry.go +++ b/internal/backends/sqlite/metadata/registry.go @@ -56,8 +56,9 @@ const ( // // Exported methods are safe for concurrent use. Unexported methods are not. type Registry struct { - p *pool.Pool - l *zap.Logger + p *pool.Pool + l *zap.Logger + BatchSize int // rw protects colls but also acts like a global lock for the whole registry. // The latter effectively replaces transactions (see the sqlite backend package description for more info). @@ -69,16 +70,17 @@ type Registry struct { } // NewRegistry creates a registry for SQLite databases in the directory specified by SQLite URI. -func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) { +func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) { p, initDBs, err := pool.New(u, l, sp) if err != nil { return nil, err } r := &Registry{ - p: p, - l: l, - colls: map[string]map[string]*Collection{}, + p: p, + l: l, + BatchSize: batchSize, + colls: map[string]map[string]*Collection{}, } for name, db := range initDBs { diff --git a/internal/backends/sqlite/metadata/registry_test.go b/internal/backends/sqlite/metadata/registry_test.go index cb3a4ec7074d..64739cfe36c9 100644 --- a/internal/backends/sqlite/metadata/registry_test.go +++ b/internal/backends/sqlite/metadata/registry_test.go @@ -75,7 +75,7 @@ func TestCreateDrop(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp) + r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -113,7 +113,7 @@ func TestCreateDropStress(t *testing.T) { } { t.Run(testName, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - r, err := NewRegistry(uri, testutil.Logger(t), sp) + r, err := NewRegistry(uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -158,7 +158,7 @@ func TestCreateSameStress(t *testing.T) { } { t.Run(testName, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - r, err := NewRegistry(uri, testutil.Logger(t), sp) + r, err := NewRegistry(uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -227,7 +227,7 @@ func TestDropSameStress(t *testing.T) { } { t.Run(testName, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - r, err := NewRegistry(uri, testutil.Logger(t), sp) + r, err := NewRegistry(uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -280,7 +280,7 @@ func TestCreateDropSameStress(t *testing.T) { } { t.Run(testName, func(t *testing.T) { uri := testutil.TestSQLiteURI(t, "") + params - r, err := NewRegistry(uri, testutil.Logger(t), sp) + r, err := NewRegistry(uri, 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) @@ -329,7 +329,7 @@ func TestIndexesCreateDrop(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp) + r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) diff --git a/internal/backends/sqlite/sqlite_test.go b/internal/backends/sqlite/sqlite_test.go index a5251637cdb9..85c574406e3d 100644 --- a/internal/backends/sqlite/sqlite_test.go +++ b/internal/backends/sqlite/sqlite_test.go @@ -31,7 +31,7 @@ func TestCollectionsStats(t *testing.T) { sp, err := state.NewProvider("") require.NoError(t, err) - r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp) + r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp) require.NoError(t, err) t.Cleanup(r.Close) diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 37cd03ea2cb1..511db6b7eb9b 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -81,6 +81,7 @@ type NewOpts struct { CappedCleanupInterval time.Duration CappedCleanupPercentage uint8 EnableNewAuth bool + BatchSize int } // New returns a new handler. diff --git a/internal/handler/msg_insert.go b/internal/handler/msg_insert.go index 7571ce1dc3c7..69c30fcadf4a 100644 --- a/internal/handler/msg_insert.go +++ b/internal/handler/msg_insert.go @@ -85,13 +85,10 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, var done bool for !done { - // TODO https://github.com/FerretDB/FerretDB/issues/3708 - const batchSize = 1000 + docs := make([]*types.Document, 0, h.BatchSize) + docsIndexes := make([]int, 0, h.BatchSize) - docs := make([]*types.Document, 0, batchSize) - docsIndexes := make([]int, 0, batchSize) - - for j := 0; j < batchSize; j++ { + for j := 0; j < h.BatchSize; j++ { var i int var d any diff --git a/internal/handler/registry/hana.go b/internal/handler/registry/hana.go index 24eb8cf190af..0b94f9f67802 100644 --- a/internal/handler/registry/hana.go +++ b/internal/handler/registry/hana.go @@ -27,9 +27,10 @@ func init() { opts.Logger.Warn("HANA handler is in alpha. It is not supported yet.") b, err := hana.NewBackend(&hana.NewBackendParams{ - URI: opts.HANAURL, - L: opts.Logger.Named("hana"), - P: opts.StateProvider, + URI: opts.HANAURL, + L: opts.Logger.Named("hana"), + P: opts.StateProvider, + BatchSize: opts.BatchSize, }) if err != nil { return nil, nil, err @@ -48,6 +49,7 @@ func init() { CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: opts.EnableNewAuth, + BatchSize: opts.BatchSize, } h, err := handler.New(handlerOpts) diff --git a/internal/handler/registry/mysql.go b/internal/handler/registry/mysql.go index 11a1f4f803fb..7ac10b6406b6 100644 --- a/internal/handler/registry/mysql.go +++ b/internal/handler/registry/mysql.go @@ -45,6 +45,7 @@ func init() { CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: opts.EnableNewAuth, + BatchSize: opts.BatchSize, } h, err := handler.New(handlerOpts) diff --git a/internal/handler/registry/postgresql.go b/internal/handler/registry/postgresql.go index 36ce440569c4..f24eadd5b130 100644 --- a/internal/handler/registry/postgresql.go +++ b/internal/handler/registry/postgresql.go @@ -25,9 +25,10 @@ import ( func init() { registry["postgresql"] = func(opts *NewHandlerOpts) (*handler.Handler, CloseBackendFunc, error) { b, err := postgresql.NewBackend(&postgresql.NewBackendParams{ - URI: opts.PostgreSQLURL, - L: opts.Logger.Named("postgresql"), - P: opts.StateProvider, + URI: opts.PostgreSQLURL, + L: opts.Logger.Named("postgresql"), + P: opts.StateProvider, + BatchSize: opts.BatchSize, }) if err != nil { return nil, nil, err @@ -47,6 +48,7 @@ func init() { CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: opts.EnableNewAuth, + BatchSize: opts.BatchSize, } h, err := handler.New(handlerOpts) diff --git a/internal/handler/registry/registry.go b/internal/handler/registry/registry.go index 47c98923c0b3..a3d4d0e89a64 100644 --- a/internal/handler/registry/registry.go +++ b/internal/handler/registry/registry.go @@ -71,6 +71,7 @@ type TestOpts struct { CappedCleanupInterval time.Duration CappedCleanupPercentage uint8 EnableNewAuth bool + BatchSize int _ struct{} // prevent unkeyed literals } diff --git a/internal/handler/registry/sqlite.go b/internal/handler/registry/sqlite.go index 7c0613256463..efa5c98bc988 100644 --- a/internal/handler/registry/sqlite.go +++ b/internal/handler/registry/sqlite.go @@ -25,9 +25,10 @@ import ( func init() { registry["sqlite"] = func(opts *NewHandlerOpts) (*handler.Handler, CloseBackendFunc, error) { b, err := sqlite.NewBackend(&sqlite.NewBackendParams{ - URI: opts.SQLiteURL, - L: opts.Logger.Named("sqlite"), - P: opts.StateProvider, + URI: opts.SQLiteURL, + L: opts.Logger.Named("sqlite"), + P: opts.StateProvider, + BatchSize: opts.BatchSize, }) if err != nil { return nil, nil, err @@ -47,6 +48,7 @@ func init() { CappedCleanupPercentage: opts.CappedCleanupPercentage, CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: opts.EnableNewAuth, + BatchSize: opts.BatchSize, } h, err := handler.New(handlerOpts) diff --git a/internal/util/fsql/db.go b/internal/util/fsql/db.go index f550c0306705..2eaa3d75c3da 100644 --- a/internal/util/fsql/db.go +++ b/internal/util/fsql/db.go @@ -35,9 +35,10 @@ import ( type DB struct { *metricsCollector - sqlDB *sql.DB - l *zap.Logger - token *resource.Token + sqlDB *sql.DB + l *zap.Logger + token *resource.Token + BatchSize int } // WrapDB creates a new DB.