Skip to content

Commit

Permalink
fix(mysql): respect auto_increment_increment when batch inserting
Browse files Browse the repository at this point in the history
Closes #3828
  • Loading branch information
B4nan committed Dec 6, 2022
1 parent d5789b1 commit 516db6d
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 11 deletions.
1 change: 1 addition & 0 deletions packages/core/src/MikroORM.ts
Expand Up @@ -98,6 +98,7 @@ export class MikroORM<D extends IDatabaseDriver = IDatabaseDriver> {

if (await this.isConnected()) {
this.logger.log('info', `MikroORM successfully connected to database ${colors.green(db)}`);
await this.driver.init();
} else {
this.logger.error('info', `MikroORM failed to connect to database ${db}`);
}
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/drivers/DatabaseDriver.ts
Expand Up @@ -28,6 +28,10 @@ export abstract class DatabaseDriver<C extends Connection> implements IDatabaseD
protected constructor(readonly config: Configuration,
protected readonly dependencies: string[]) { }

init(): void {
// do nothing on this level
}

abstract find<T extends object, P extends string = never>(entityName: string, where: FilterQuery<T>, options?: FindOptions<T, P>): Promise<EntityData<T>[]>;

abstract findOne<T extends object, P extends string = never>(entityName: string, where: FilterQuery<T>, options?: FindOneOptions<T, P>): Promise<EntityData<T> | null>;
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/drivers/IDatabaseDriver.ts
Expand Up @@ -18,6 +18,8 @@ export interface IDatabaseDriver<C extends Connection = Connection> {
[EntityManagerType]: EntityManager<this>;
readonly config: Configuration;

init(): void | Promise<void>;

createEntityManager<D extends IDatabaseDriver = IDatabaseDriver>(useContext?: boolean): D[typeof EntityManagerType];

connect(): Promise<C>;
Expand Down
12 changes: 6 additions & 6 deletions packages/knex/src/AbstractSqlDriver.ts
Expand Up @@ -12,22 +12,22 @@ import { QueryBuilder, QueryType } from './query';
import { SqlEntityManager } from './SqlEntityManager';
import type { Field } from './typings';

export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = AbstractSqlConnection> extends DatabaseDriver<C> {
export abstract class AbstractSqlDriver<Connection extends AbstractSqlConnection = AbstractSqlConnection, Platform extends AbstractSqlPlatform = AbstractSqlPlatform> extends DatabaseDriver<Connection> {

[EntityManagerType]!: SqlEntityManager<this>;

protected readonly connection: C;
protected readonly replicas: C[] = [];
protected readonly platform: AbstractSqlPlatform;
protected readonly connection: Connection;
protected readonly replicas: Connection[] = [];
protected readonly platform: Platform;

protected constructor(config: Configuration, platform: AbstractSqlPlatform, connection: Constructor<C>, connector: string[]) {
protected constructor(config: Configuration, platform: Platform, connection: Constructor<Connection>, connector: string[]) {
super(config, connector);
this.connection = new connection(this.config);
this.replicas = this.createReplicas(conf => new connection(this.config, conf, 'read'));
this.platform = platform;
}

getPlatform(): AbstractSqlPlatform {
getPlatform(): Platform {
return this.platform;
}

Expand Down
11 changes: 9 additions & 2 deletions packages/mariadb/src/MariaDbDriver.ts
Expand Up @@ -3,17 +3,24 @@ import { AbstractSqlDriver } from '@mikro-orm/knex';
import { MariaDbConnection } from './MariaDbConnection';
import { MariaDbPlatform } from './MariaDbPlatform';

export class MariaDbDriver extends AbstractSqlDriver<MariaDbConnection> {
export class MariaDbDriver extends AbstractSqlDriver<MariaDbConnection, MariaDbPlatform> {

constructor(config: Configuration) {
super(config, new MariaDbPlatform(), MariaDbConnection, ['knex', 'mariadb']);
}

async init(): Promise<void> {
await super.init();
// preload the value early
await this.platform.getAutoIncrementIncrement(this.connection);
}

async nativeInsertMany<T extends object>(entityName: string, data: EntityDictionary<T>[], options: NativeInsertUpdateManyOptions<T> = {}): Promise<QueryResult<T>> {
options.processCollections ??= true;
const res = await super.nativeInsertMany(entityName, data, options);
const pks = this.getPrimaryKeyFields(entityName);
data.forEach((item, idx) => res.rows![idx] = { [pks[0]]: item[pks[0]] ?? res.insertId as number + idx });
const autoIncrementIncrement = await this.platform.getAutoIncrementIncrement(this.connection);
data.forEach((item, idx) => res.rows![idx] = { [pks[0]]: item[pks[0]] ?? res.insertId as number + (idx * autoIncrementIncrement) });
res.row = res.rows![0];

return res;
Expand Down
15 changes: 15 additions & 0 deletions packages/mariadb/src/MariaDbPlatform.ts
@@ -1,3 +1,4 @@
import type { AbstractSqlConnection } from '@mikro-orm/knex';
import { AbstractSqlPlatform } from '@mikro-orm/knex';
import { MariaDbSchemaHelper } from './MariaDbSchemaHelper';
import { MariaDbExceptionConverter } from './MariaDbExceptionConverter';
Expand All @@ -8,6 +9,20 @@ export class MariaDbPlatform extends AbstractSqlPlatform {

protected readonly schemaHelper: MariaDbSchemaHelper = new MariaDbSchemaHelper(this);
protected readonly exceptionConverter = new MariaDbExceptionConverter();
protected autoIncrementIncrement?: number;

/**
* the increment may differ when running a cluster, see https://github.com/mikro-orm/mikro-orm/issues/3828
* @internal
*/
async getAutoIncrementIncrement(con: AbstractSqlConnection) {
if (this.autoIncrementIncrement == null) {
const res = await con.execute(`show variables like 'auto_increment_increment'`);
this.autoIncrementIncrement = res[0]?.auto_increment_increment ?? 1;
}

return this.autoIncrementIncrement!;
}

getDefaultCharset(): string {
return 'utf8mb4';
Expand Down
11 changes: 9 additions & 2 deletions packages/mysql/src/MySqlDriver.ts
Expand Up @@ -3,17 +3,24 @@ import { AbstractSqlDriver } from '@mikro-orm/knex';
import { MySqlConnection } from './MySqlConnection';
import { MySqlPlatform } from './MySqlPlatform';

export class MySqlDriver extends AbstractSqlDriver<MySqlConnection> {
export class MySqlDriver extends AbstractSqlDriver<MySqlConnection, MySqlPlatform> {

constructor(config: Configuration) {
super(config, new MySqlPlatform(), MySqlConnection, ['knex', 'mysql2']);
}

async init(): Promise<void> {
await super.init();
// preload the value early
await this.platform.getAutoIncrementIncrement(this.connection);
}

async nativeInsertMany<T extends object>(entityName: string, data: EntityDictionary<T>[], options: NativeInsertUpdateManyOptions<T> = {}): Promise<QueryResult<T>> {
options.processCollections ??= true;
const res = await super.nativeInsertMany(entityName, data, options);
const pks = this.getPrimaryKeyFields(entityName);
data.forEach((item, idx) => res.rows![idx] = { [pks[0]]: item[pks[0]] ?? res.insertId as number + idx });
const autoIncrementIncrement = await this.platform.getAutoIncrementIncrement(this.connection);
data.forEach((item, idx) => res.rows![idx] = { [pks[0]]: item[pks[0]] ?? res.insertId as number + (idx * autoIncrementIncrement) });
res.row = res.rows![0];

return res;
Expand Down
15 changes: 15 additions & 0 deletions packages/mysql/src/MySqlPlatform.ts
@@ -1,3 +1,4 @@
import type { AbstractSqlConnection } from '@mikro-orm/knex';
import { AbstractSqlPlatform } from '@mikro-orm/knex';
import { MySqlSchemaHelper } from './MySqlSchemaHelper';
import { MySqlExceptionConverter } from './MySqlExceptionConverter';
Expand All @@ -8,6 +9,20 @@ export class MySqlPlatform extends AbstractSqlPlatform {

protected readonly schemaHelper: MySqlSchemaHelper = new MySqlSchemaHelper(this);
protected readonly exceptionConverter = new MySqlExceptionConverter();
protected autoIncrementIncrement?: number;

/**
* the increment may differ when running a cluster, see https://github.com/mikro-orm/mikro-orm/issues/3828
* @internal
*/
async getAutoIncrementIncrement(con: AbstractSqlConnection) {
if (this.autoIncrementIncrement == null) {
const res = await con.execute(`show variables like 'auto_increment_increment'`);
this.autoIncrementIncrement = res[0]?.auto_increment_increment ?? 1;
}

return this.autoIncrementIncrement!;
}

getDefaultCharset(): string {
return 'utf8mb4';
Expand Down
1 change: 0 additions & 1 deletion tests/features/upsert/upsert.test.ts
Expand Up @@ -84,7 +84,6 @@ describe.each(Object.keys(options))('em.upsert [%s]', type => {
orm = await MikroORM.init({
entities: [Author, Book, FooBar],
type,
debug: true,
loggerFactory: options => new SimpleLogger(options),
...options[type],
});
Expand Down

0 comments on commit 516db6d

Please sign in to comment.