-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
connect.ts
113 lines (94 loc) · 3.25 KB
/
connect.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import { MongoRuntimeError, MongoInvalidArgumentError } from '../error';
import { Topology, TOPOLOGY_EVENTS } from '../sdam/topology';
import { resolveSRVRecord } from '../connection_string';
import type { Callback } from '../utils';
import type { MongoClient, MongoOptions } from '../mongo_client';
import { CMAP_EVENTS } from '../cmap/connection_pool';
import { APM_EVENTS } from '../cmap/connection';
import { HEARTBEAT_EVENTS } from '../sdam/server';
/** @public */
export const MONGO_CLIENT_EVENTS = [
...CMAP_EVENTS,
...APM_EVENTS,
...TOPOLOGY_EVENTS,
...HEARTBEAT_EVENTS
] as const;
export function connect(
mongoClient: MongoClient,
options: MongoOptions,
callback: Callback<MongoClient>
): void {
if (!callback) {
throw new MongoInvalidArgumentError('Callback function must be provided');
}
// If a connection already been established, we can terminate early
if (mongoClient.topology && mongoClient.topology.isConnected()) {
return callback(undefined, mongoClient);
}
const logger = mongoClient.logger;
const connectCallback: Callback = err => {
const warningMessage =
'seed list contains no mongos proxies, replicaset connections requires ' +
'the parameter replicaSet to be supplied in the URI or options object, ' +
'mongodb://server:port/db?replicaSet=name';
if (err && err.message === 'no mongos proxies found in seed list') {
if (logger.isWarn()) {
logger.warn(warningMessage);
}
// Return a more specific error message for MongoClient.connect
// TODO(NODE-3483)
return callback(new MongoRuntimeError(warningMessage));
}
callback(err, mongoClient);
};
if (typeof options.srvHost === 'string') {
return resolveSRVRecord(options, (err, hosts) => {
if (err || !hosts) return callback(err);
return createTopology(mongoClient, options, connectCallback);
});
}
return createTopology(mongoClient, options, connectCallback);
}
function createTopology(
mongoClient: MongoClient,
options: MongoOptions,
callback: Callback<Topology>
) {
// Create the topology
const topology = new Topology(options.hosts, options);
// Events can be emitted before initialization is complete so we have to
// save the reference to the topology on the client ASAP if the event handlers need to access it
mongoClient.topology = topology;
topology.once(Topology.OPEN, () => mongoClient.emit('open', mongoClient));
for (const event of MONGO_CLIENT_EVENTS) {
topology.on(event, (...args: any[]) => mongoClient.emit(event, ...(args as any)));
}
// initialize CSFLE if requested
if (mongoClient.autoEncrypter) {
mongoClient.autoEncrypter.init(err => {
if (err) {
return callback(err);
}
topology.connect(options, err => {
if (err) {
topology.close({ force: true });
return callback(err);
}
options.encrypter.connectInternalClient(error => {
if (error) return callback(error);
callback(undefined, topology);
});
});
});
return;
}
// otherwise connect normally
topology.connect(options, err => {
if (err) {
topology.close({ force: true });
return callback(err);
}
callback(undefined, topology);
return;
});
}