forked from typeorm/typeorm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
EntityPersistExecutor.ts
174 lines (146 loc) · 8.78 KB
/
EntityPersistExecutor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import {ObjectLiteral} from "../common/ObjectLiteral";
import {SaveOptions} from "../repository/SaveOptions";
import {RemoveOptions} from "../repository/RemoveOptions";
import {MustBeEntityError} from "../error/MustBeEntityError";
import {SubjectExecutor} from "./SubjectExecutor";
import {CannotDetermineEntityError} from "../error/CannotDetermineEntityError";
import {QueryRunner} from "../query-runner/QueryRunner";
import {Connection} from "../connection/Connection";
import {Subject} from "./Subject";
import {OneToManySubjectBuilder} from "./subject-builder/OneToManySubjectBuilder";
import {OneToOneInverseSideSubjectBuilder} from "./subject-builder/OneToOneInverseSideSubjectBuilder";
import {ManyToManySubjectBuilder} from "./subject-builder/ManyToManySubjectBuilder";
import {SubjectDatabaseEntityLoader} from "./SubjectDatabaseEntityLoader";
import {CascadesSubjectBuilder} from "./subject-builder/CascadesSubjectBuilder";
import {OrmUtils} from "../util/OrmUtils";
import {PromiseUtils} from "../util/PromiseUtils";
/**
* Persists a single entity or multiple entities - saves or removes them.
*/
export class EntityPersistExecutor {
// -------------------------------------------------------------------------
// Constructor
// -------------------------------------------------------------------------
constructor(protected connection: Connection,
protected queryRunner: QueryRunner|undefined,
protected mode: "save"|"remove",
protected target: Function|string|undefined,
protected entity: ObjectLiteral|ObjectLiteral[],
protected options?: SaveOptions & RemoveOptions) {
}
// -------------------------------------------------------------------------
// Public Methods
// -------------------------------------------------------------------------
/**
* Executes persistence operation ob given entity or entities.
*/
execute(): Promise<void> {
// check if entity we are going to save is valid and is an object
if (!this.entity || !(this.entity instanceof Object))
return Promise.reject(new MustBeEntityError(this.mode, this.entity));
// we MUST call "fake" resolve here to make sure all properties of lazily loaded relations are resolved
return Promise.resolve().then(async () => {
// if query runner is already defined in this class, it means this entity manager was already created for a single connection
// if its not defined we create a new query runner - single connection where we'll execute all our operations
const queryRunner = this.queryRunner || this.connection.createQueryRunner("master");
// save data in the query runner - this is useful functionality to share data from outside of the world
// with third classes - like subscribers and listener methods
if (this.options && this.options.data)
queryRunner.data = this.options.data;
try {
// collect all operate subjects
const entities: ObjectLiteral[] = this.entity instanceof Array ? this.entity : [this.entity];
const entitiesInChunks = this.options && this.options.chunk && this.options.chunk > 0 ? OrmUtils.chunk(entities, this.options.chunk) : [entities];
// console.time("building subject executors...");
const executors = await Promise.all(entitiesInChunks.map(async entities => {
const subjects: Subject[] = [];
// create subjects for all entities we received for the persistence
entities.forEach(entity => {
const entityTarget = this.target ? this.target : entity.constructor;
if (entityTarget === Object)
throw new CannotDetermineEntityError(this.mode);
subjects.push(new Subject({
metadata: this.connection.getMetadata(entityTarget),
entity: entity,
canBeInserted: this.mode === "save",
canBeUpdated: this.mode === "save",
mustBeRemoved: this.mode === "remove"
}));
});
// console.time("building cascades...");
// go through each entity with metadata and create subjects and subjects by cascades for them
const cascadesSubjectBuilder = new CascadesSubjectBuilder(subjects);
await Promise.all(subjects.map(async subject => {
// next step we build list of subjects we will operate with
// these subjects are subjects that we need to insert or update alongside with main persisted entity
await cascadesSubjectBuilder.build(subject);
}));
// console.timeEnd("building cascades...");
// load database entities for all subjects we have
// next step is to load database entities for all operate subjects
// console.time("loading...");
await new SubjectDatabaseEntityLoader(queryRunner, subjects).load(this.mode);
// console.timeEnd("loading...");
// console.time("other subjects...");
// build all related subjects and change maps
if (this.mode === "save") {
new OneToManySubjectBuilder(subjects).build();
new OneToOneInverseSideSubjectBuilder(subjects).build();
new ManyToManySubjectBuilder(subjects).build();
} else {
subjects.forEach(subject => {
if (subject.mustBeRemoved) {
new ManyToManySubjectBuilder(subjects).buildForAllRemoval(subject);
}
});
}
// console.timeEnd("other subjects...");
// console.timeEnd("building subjects...");
// console.log("subjects", subjects);
// create a subject executor
return new SubjectExecutor(queryRunner, subjects, this.options);
}));
// console.timeEnd("building subject executors...");
// make sure we have at least one executable operation before we create a transaction and proceed
// if we don't have operations it means we don't really need to update or remove something
const executorsWithExecutableOperations = executors.filter(executor => executor.hasExecutableOperations);
if (executorsWithExecutableOperations.length === 0)
return;
// start execute queries in a transaction
// if transaction is already opened in this query runner then we don't touch it
// if its not opened yet then we open it here, and once we finish - we close it
let isTransactionStartedByUs = false;
try {
// open transaction if its not opened yet
if (!queryRunner.isTransactionActive) {
if (!this.options || this.options.transaction !== false) { // start transaction until it was not explicitly disabled
isTransactionStartedByUs = true;
await queryRunner.startTransaction();
}
}
// execute all persistence operations for all entities we have
// console.time("executing subject executors...");
await PromiseUtils.runInSequence(executorsWithExecutableOperations, executor => executor.execute());
// console.timeEnd("executing subject executors...");
// commit transaction if it was started by us
// console.time("commit");
if (isTransactionStartedByUs === true)
await queryRunner.commitTransaction();
// console.timeEnd("commit");
} catch (error) {
// rollback transaction if it was started by us
if (isTransactionStartedByUs) {
try {
await queryRunner.rollbackTransaction();
} catch (rollbackError) { }
}
throw error;
}
} finally {
// release query runner only if its created by us
if (!this.queryRunner)
await queryRunner.release();
}
});
}
}