Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batch sizes configurable #4149

Merged
merged 22 commits into from Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions Taskfile.yml
Expand Up @@ -218,6 +218,7 @@ tasks:
-postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path='
-compat-url='mongodb://username:password@127.0.0.1:47018/?tls=true&tlsCertificateKeyFile=../build/certs/client.pem&tlsCaFile=../build/certs/rootCA-cert.pem&replicaSet=rs0'
-disable-pushdown={{.DISABLE_PUSHDOWN}}
-batch-size=100

test-integration-sqlite:
desc: "Run integration tests for `sqlite` backend"
Expand All @@ -244,6 +245,7 @@ tasks:
-target-tls
-compat-url='mongodb://username:password@127.0.0.1:47018/?tls=true&tlsCertificateKeyFile=../build/certs/client.pem&tlsCaFile=../build/certs/rootCA-cert.pem&replicaSet=rs0'
-disable-pushdown={{.DISABLE_PUSHDOWN}}
-batch-size=1000

test-integration-mysql:
desc: "Run integration tests for `mysql` handler"
Expand Down Expand Up @@ -364,6 +366,7 @@ tasks:
--handler=pg
--postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path='
--test-records-dir=tmp/records
--test-batch-size=100

run-sqlite:
desc: "Run FerretDB with `sqlite` backend"
Expand All @@ -378,6 +381,7 @@ tasks:
--handler=sqlite
--sqlite-url=file:tmp/sqlite/
--test-records-dir=tmp/records
--test-batch-size=1000

run-mysql:
desc: "Run FerretDB with `mysql` backend"
Expand All @@ -404,6 +408,7 @@ tasks:
--mode=diff-normal
--handler=hana
--test-records-dir=tmp/records
--test-batch-size=100

run-secured:
desc: "Run FerretDB with `postgresql` backend (TLS, auth required)"
Expand All @@ -421,6 +426,7 @@ tasks:
--handler=pg
--postgresql-url='postgres://127.0.0.1:5433/ferretdb?search_path='
--test-records-dir=tmp/records
--test-batch-size=100

run-proxy:
desc: "Run FerretDB in diff-proxy mode"
Expand All @@ -434,6 +440,7 @@ tasks:
--handler=pg
--postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path='
--test-records-dir=tmp/records
--test-batch-size=100

run-sqlite-proxy:
desc: "Run FerretDB with `sqlite` handler in diff-proxy mode"
Expand All @@ -448,6 +455,7 @@ tasks:
--handler=sqlite
--sqlite-url=file:tmp/sqlite/
--test-records-dir=tmp/records
--test-batch-size=1000

run-proxy-secured:
desc: "Run FerretDB in diff-proxy mode (TLS, auth required)"
Expand All @@ -468,6 +476,7 @@ tasks:
--handler=pg
--postgresql-url='postgres://username@127.0.0.1:5433/ferretdb?search_path='
--test-records-dir=tmp/records
--test-batch-size=100

lint:
desc: "Run linters"
Expand Down
2 changes: 2 additions & 0 deletions cmd/ferretdb/main.go
Expand Up @@ -101,6 +101,7 @@ var cli struct {
} `embed:"" prefix:"capped-cleanup-"`

EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."`
BatchSize int `default:"100" help:"number of maximum insertion batch size"`

Telemetry struct {
URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."`
Expand Down Expand Up @@ -412,6 +413,7 @@ func run() {
CappedCleanupInterval: cli.Test.CappedCleanup.Interval,
CappedCleanupPercentage: cli.Test.CappedCleanup.Percentage,
EnableNewAuth: cli.Test.EnableNewAuth,
BatchSize: cli.Test.BatchSize,
},
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions integration/setup/listener.go
Expand Up @@ -185,6 +185,7 @@
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: true,
BatchSize: *batchSizeF,

Check warning on line 188 in integration/setup/listener.go

View check run for this annotation

Codecov / codecov/patch

integration/setup/listener.go#L188

Added line #L188 was not covered by tests
},
}
h, closeBackend, err := registry.NewHandler(handler, handlerOpts)
Expand Down
2 changes: 2 additions & 0 deletions integration/setup/setup.go
Expand Up @@ -56,6 +56,8 @@ var (
mysqlURLF = flag.String("mysql-url", "", "in-process FerretDB: MySQL URL for 'mysql' handler.")
hanaURLF = flag.String("hana-url", "", "in-process FerretDB: Hana URL for 'hana' handler.")

batchSizeF = flag.Int("batch-size", 100, "number of maximum insertion batch size")

compatURLF = flag.String("compat-url", "", "compat system's (MongoDB) URL for compatibility tests; if empty, they are skipped")

benchDocsF = flag.Int("bench-docs", 1000, "benchmarks: number of documents to generate per iteration")
Expand Down
8 changes: 5 additions & 3 deletions internal/backends/hana/backend.go
Expand Up @@ -37,9 +37,10 @@
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
URI string
L *zap.Logger
P *state.Provider
BatchSize int
}

// NewBackend creates a new Backend.
Expand All @@ -50,6 +51,7 @@
}

hdb := fsql.WrapDB(db, "hana", params.L)
hdb.BatchSize = params.BatchSize

Check warning on line 54 in internal/backends/hana/backend.go

View check run for this annotation

Codecov / codecov/patch

internal/backends/hana/backend.go#L54

Added line #L54 was not covered by tests

return backends.BackendContract(&backend{
hdb: hdb,
Expand Down
14 changes: 8 additions & 6 deletions internal/backends/helpers_test.go
Expand Up @@ -53,9 +53,10 @@ func testBackends(t *testing.T) map[string]*testBackend {
require.NoError(t, err)

b, err := postgresql.NewBackend(&postgresql.NewBackendParams{
URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""),
L: l.Named("postgresql"),
P: sp,
URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""),
L: l.Named("postgresql"),
P: sp,
BatchSize: 1000,
})
require.NoError(t, err)
t.Cleanup(b.Close)
Expand All @@ -71,9 +72,10 @@ func testBackends(t *testing.T) map[string]*testBackend {
require.NoError(t, err)

b, err := sqlite.NewBackend(&sqlite.NewBackendParams{
URI: testutil.TestSQLiteURI(t, ""),
L: l.Named("sqlite"),
P: sp,
URI: testutil.TestSQLiteURI(t, ""),
L: l.Named("sqlite"),
P: sp,
BatchSize: 100,
})
require.NoError(t, err)
t.Cleanup(b.Close)
Expand Down
11 changes: 6 additions & 5 deletions internal/backends/postgresql/backend.go
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions internal/backends/postgresql/collection.go
Expand Up @@ -134,8 +134,10 @@
}

err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize
kropidlowsky marked this conversation as resolved.
Show resolved Hide resolved
if batchSize < 1 {
panic("batch-size should be greater or equal to 1")

Check warning on line 139 in internal/backends/postgresql/collection.go

View check run for this annotation

Codecov / codecov/patch

internal/backends/postgresql/collection.go#L139

Added line #L139 was not covered by tests
}

var batch []*types.Document
docs := params.Docs
Expand Down
7 changes: 4 additions & 3 deletions internal/backends/postgresql/database_test.go
Expand Up @@ -40,9 +40,10 @@ func TestDatabaseStats(t *testing.T) {
require.NoError(t, err)

params := NewBackendParams{
URI: testutil.TestPostgreSQLURI(t, ctx, ""),
L: testutil.Logger(t),
P: sp,
URI: testutil.TestPostgreSQLURI(t, ctx, ""),
L: testutil.Logger(t),
P: sp,
BatchSize: 1000,
}
b, err := NewBackend(&params)
require.NoError(t, err)
Expand Down
12 changes: 7 additions & 5 deletions internal/backends/postgresql/metadata/registry.go
Expand Up @@ -73,8 +73,9 @@ var specialCharacters = regexp.MustCompile("[^a-z][^a-z0-9_]*")
//
//nolint:vet // for readability
type Registry struct {
p *pool.Pool
l *zap.Logger
p *pool.Pool
l *zap.Logger
BatchSize int

// rw protects colls but also acts like a global lock for the whole registry.
// The latter effectively replaces transactions (see the postgresql backend package description for more info).
Expand All @@ -86,15 +87,16 @@ type Registry struct {
}

// NewRegistry creates a registry for PostgreSQL databases with a given base URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
p: p,
l: l,
BatchSize: batchSize,
}

return r, nil
Expand Down
4 changes: 2 additions & 2 deletions internal/backends/postgresql/metadata/registry_test.go
Expand Up @@ -91,7 +91,7 @@ func createDatabase(t *testing.T, ctx context.Context) (*Registry, *pgxpool.Pool
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(u, testutil.Logger(t), sp)
r, err := NewRegistry(u, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -140,7 +140,7 @@ func TestAuth(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(tc.uri, testutil.Logger(t), sp)
r, err := NewRegistry(tc.uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
11 changes: 6 additions & 5 deletions internal/backends/sqlite/backend.go
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions internal/backends/sqlite/collection.go
Expand Up @@ -109,8 +109,10 @@
meta := c.r.CollectionGet(ctx, c.dbName, c.name)

err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize
kropidlowsky marked this conversation as resolved.
Show resolved Hide resolved
if batchSize < 1 {
panic("batch-size should be greater or equal to 1")

Check warning on line 114 in internal/backends/sqlite/collection.go

View check run for this annotation

Codecov / codecov/patch

internal/backends/sqlite/collection.go#L114

Added line #L114 was not covered by tests
}

var batch []*types.Document
docs := params.Docs
Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/collection_test.go
Expand Up @@ -39,7 +39,7 @@ func TestCappedCollectionInsertAllQueryExplain(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp})
b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp, BatchSize: 100})
require.NoError(t, err)
t.Cleanup(b.Close)

Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/database_test.go
Expand Up @@ -72,7 +72,7 @@ func TestDatabaseStatsFreeStorage(t *testing.T) {
name, params := name, params
t.Run(name, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp})
b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp, BatchSize: 100})
require.NoError(t, err)

t.Cleanup(b.Close)
Expand Down
14 changes: 8 additions & 6 deletions internal/backends/sqlite/metadata/registry.go
Expand Up @@ -56,8 +56,9 @@ const (
//
// Exported methods are safe for concurrent use. Unexported methods are not.
type Registry struct {
p *pool.Pool
l *zap.Logger
p *pool.Pool
l *zap.Logger
BatchSize int

// rw protects colls but also acts like a global lock for the whole registry.
// The latter effectively replaces transactions (see the sqlite backend package description for more info).
Expand All @@ -69,16 +70,17 @@ type Registry struct {
}

// NewRegistry creates a registry for SQLite databases in the directory specified by SQLite URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, initDBs, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
colls: map[string]map[string]*Collection{},
p: p,
l: l,
BatchSize: batchSize,
colls: map[string]map[string]*Collection{},
}

for name, db := range initDBs {
Expand Down
12 changes: 6 additions & 6 deletions internal/backends/sqlite/metadata/registry_test.go
Expand Up @@ -75,7 +75,7 @@ func TestCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestCreateDropStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -158,7 +158,7 @@ func TestCreateSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -280,7 +280,7 @@ func TestCreateDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -329,7 +329,7 @@ func TestIndexesCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down