Skip to content

Commit 2d3576b

Browse files
authoredMar 7, 2023
fix(NODE-5097): set timeout on write and reset on message (#3590)

File tree

7 files changed

+407
-196
lines changed

7 files changed

+407
-196
lines changed
 

‎src/cmap/connect.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ function makeConnection(options: MakeConnectionOptions, _callback: Callback<Stre
428428
}
429429
}
430430

431-
socket.setTimeout(socketTimeoutMS);
431+
socket.setTimeout(0);
432432
callback(undefined, socket);
433433
}
434434

‎src/cmap/connection.ts

+6-3
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
370370
this[kDelayedTimeoutId] = null;
371371
}
372372

373+
const socketTimeoutMS = this[kStream].timeout ?? 0;
374+
this[kStream].setTimeout(0);
375+
373376
// always emit the message, in case we are streaming
374377
this.emit('message', message);
375378
let operationDescription = this[kQueue].get(message.responseTo);
@@ -410,8 +413,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
410413
// back in the queue with the correct requestId and will resolve not being able
411414
// to find the next one via the responseTo of the next streaming hello.
412415
this[kQueue].set(message.requestId, operationDescription);
413-
} else if (operationDescription.socketTimeoutOverride) {
414-
this[kStream].setTimeout(this.socketTimeoutMS);
416+
this[kStream].setTimeout(socketTimeoutMS);
415417
}
416418

417419
try {
@@ -707,8 +709,9 @@ function write(
707709
}
708710

709711
if (typeof options.socketTimeoutMS === 'number') {
710-
operationDescription.socketTimeoutOverride = true;
711712
conn[kStream].setTimeout(options.socketTimeoutMS);
713+
} else if (conn.socketTimeoutMS !== 0) {
714+
conn[kStream].setTimeout(conn.socketTimeoutMS);
712715
}
713716

714717
// if command monitoring is enabled we need to modify the callback here

‎src/cmap/message_stream.ts

-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ export interface OperationDescription extends BSONSerializeOptions {
3636
raw: boolean;
3737
requestId: number;
3838
session?: ClientSession;
39-
socketTimeoutOverride?: boolean;
4039
agreedCompressor?: CompressorName;
4140
zlibCompressionLevel?: number;
4241
$clusterTime?: Document;

‎test/integration/connection-monitoring-and-pooling/connection.test.ts

+1-23
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { connect } from '../../../src/cmap/connect';
55
import { Connection } from '../../../src/cmap/connection';
66
import { LEGACY_HELLO_COMMAND } from '../../../src/constants';
77
import { Topology } from '../../../src/sdam/topology';
8-
import { HostAddress, ns } from '../../../src/utils';
8+
import { ns } from '../../../src/utils';
99
import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration';
1010
import { assert as test, setupDatabase } from '../shared';
1111

@@ -74,28 +74,6 @@ describe('Connection', function () {
7474
}
7575
});
7676

77-
it.skip('should support socket timeouts', {
78-
// FIXME: NODE-2941
79-
metadata: {
80-
requires: {
81-
os: '!win32' // 240.0.0.1 doesnt work for windows
82-
}
83-
},
84-
test: function (done) {
85-
const connectOptions = {
86-
hostAddress: new HostAddress('240.0.0.1'),
87-
connectionType: Connection,
88-
connectionTimeout: 500
89-
};
90-
91-
connect(connectOptions, err => {
92-
expect(err).to.exist;
93-
expect(err).to.match(/timed out/);
94-
done();
95-
});
96-
}
97-
});
98-
9977
it('should support calling back multiple times on exhaust commands', {
10078
metadata: {
10179
requires: { apiVersion: false, mongodb: '>=4.2.0', topology: ['single'] }

‎test/unit/cmap/connect.test.js

-165
This file was deleted.

‎test/unit/cmap/connect.test.ts

+276
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
import { expect } from 'chai';
2+
import { promisify } from 'util';
3+
4+
import { CancellationToken } from '../../../src';
5+
import { MongoCredentials } from '../../../src/cmap/auth/mongo_credentials';
6+
import {
7+
connect,
8+
prepareHandshakeDocument as prepareHandshakeDocumentCb
9+
} from '../../../src/cmap/connect';
10+
import { Connection, ConnectionOptions } from '../../../src/cmap/connection';
11+
import { LEGACY_HELLO_COMMAND } from '../../../src/constants';
12+
import { MongoNetworkError } from '../../../src/error';
13+
import { ClientMetadata, HostAddress, isHello } from '../../../src/utils';
14+
import { genClusterTime } from '../../tools/common';
15+
import * as mock from '../../tools/mongodb-mock/index';
16+
17+
const CONNECT_DEFAULTS = {
18+
id: 1,
19+
tls: false,
20+
generation: 1,
21+
monitorCommands: false,
22+
metadata: {} as ClientMetadata,
23+
loadBalanced: false
24+
};
25+
26+
describe('Connect Tests', function () {
27+
context('when PLAIN auth enabled', () => {
28+
const test: {
29+
server?: any;
30+
connectOptions?: ConnectionOptions;
31+
} = {};
32+
33+
beforeEach(async () => {
34+
const mockServer = await mock.createServer();
35+
test.server = mockServer;
36+
test.connectOptions = {
37+
...CONNECT_DEFAULTS,
38+
hostAddress: test.server.hostAddress() as HostAddress,
39+
credentials: new MongoCredentials({
40+
username: 'testUser',
41+
password: 'pencil',
42+
source: 'admin',
43+
mechanism: 'PLAIN',
44+
mechanismProperties: {}
45+
})
46+
};
47+
});
48+
49+
afterEach(() => mock.cleanup());
50+
51+
it('should auth against a non-arbiter', function (done) {
52+
const whatHappened = {};
53+
54+
test.server.setMessageHandler(request => {
55+
const doc = request.document;
56+
const $clusterTime = genClusterTime(Date.now());
57+
58+
if (isHello(doc)) {
59+
whatHappened[LEGACY_HELLO_COMMAND] = true;
60+
request.reply(
61+
Object.assign({}, mock.HELLO, {
62+
$clusterTime
63+
})
64+
);
65+
} else if (doc.saslStart) {
66+
whatHappened.saslStart = true;
67+
request.reply({ ok: 1 });
68+
}
69+
});
70+
71+
connect(test.connectOptions, err => {
72+
try {
73+
expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true);
74+
expect(whatHappened).to.have.property('saslStart', true);
75+
} catch (_err) {
76+
err = _err;
77+
}
78+
79+
done(err);
80+
});
81+
});
82+
83+
it('should not auth against an arbiter', function (done) {
84+
const whatHappened = {};
85+
test.server.setMessageHandler(request => {
86+
const doc = request.document;
87+
const $clusterTime = genClusterTime(Date.now());
88+
if (isHello(doc)) {
89+
whatHappened[LEGACY_HELLO_COMMAND] = true;
90+
request.reply(
91+
Object.assign({}, mock.HELLO, {
92+
$clusterTime,
93+
arbiterOnly: true
94+
})
95+
);
96+
} else if (doc.saslStart) {
97+
whatHappened.saslStart = true;
98+
request.reply({ ok: 0 });
99+
}
100+
});
101+
102+
connect(test.connectOptions, err => {
103+
try {
104+
expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true);
105+
expect(whatHappened).to.not.have.property('saslStart');
106+
} catch (_err) {
107+
err = _err;
108+
}
109+
110+
done(err);
111+
});
112+
});
113+
});
114+
115+
context('when creating a connection', () => {
116+
let server;
117+
let connectOptions;
118+
let connection: Connection;
119+
120+
beforeEach(async () => {
121+
server = await mock.createServer();
122+
server.setMessageHandler(request => {
123+
if (isHello(request.document)) {
124+
request.reply(mock.HELLO);
125+
}
126+
});
127+
connectOptions = {
128+
...CONNECT_DEFAULTS,
129+
hostAddress: server.hostAddress() as HostAddress,
130+
socketTimeoutMS: 15000
131+
};
132+
133+
connection = await promisify<Connection>(callback =>
134+
//@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence
135+
connect(connectOptions, callback)
136+
)();
137+
});
138+
139+
afterEach(async () => {
140+
connection.destroy({ force: true });
141+
await mock.cleanup();
142+
});
143+
144+
it('creates a connection with an infinite timeout', async () => {
145+
expect(connection.stream).to.have.property('timeout', 0);
146+
});
147+
148+
it('connection instance has property socketTimeoutMS equal to the value passed in the connectOptions', async () => {
149+
expect(connection).to.have.property('socketTimeoutMS', 15000);
150+
});
151+
152+
context('when the provided cancellation token emits cancel', () => {
153+
it('interrupts the connection with an error', async () => {
154+
// set no response handler for mock server, effectively black hole requests
155+
server.setMessageHandler(() => null);
156+
157+
const cancellationToken = new CancellationToken();
158+
// Make sure the cancel listener is added before emitting cancel
159+
cancellationToken.addListener('newListener', () => {
160+
process.nextTick(() => {
161+
cancellationToken.emit('cancel');
162+
});
163+
});
164+
165+
const error = await promisify<Connection>(callback =>
166+
connect(
167+
{
168+
...connectOptions,
169+
// Ensure these timeouts do not fire first
170+
socketTimeoutMS: 5000,
171+
connectTimeoutMS: 5000,
172+
cancellationToken
173+
},
174+
//@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence
175+
callback
176+
)
177+
)().catch(error => error);
178+
179+
expect(error, error.stack).to.match(/connection establishment was cancelled/);
180+
});
181+
});
182+
183+
context('when connecting takes longer than connectTimeoutMS', () => {
184+
it('interrupts the connection with an error', async () => {
185+
// set no response handler for mock server, effectively black hole requests
186+
server.setMessageHandler(() => null);
187+
188+
const error = await promisify<Connection>(callback =>
189+
//@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence
190+
connect({ ...connectOptions, connectTimeoutMS: 5 }, callback)
191+
)().catch(error => error);
192+
193+
expect(error).to.match(/timed out/);
194+
});
195+
});
196+
});
197+
198+
it('should emit `MongoNetworkError` for network errors', function (done) {
199+
connect({ hostAddress: new HostAddress('non-existent:27018') }, err => {
200+
expect(err).to.be.instanceOf(MongoNetworkError);
201+
done();
202+
});
203+
});
204+
205+
context('prepareHandshakeDocument', () => {
206+
const prepareHandshakeDocument = promisify(prepareHandshakeDocumentCb);
207+
208+
context('when serverApi.version is present', () => {
209+
const options = {};
210+
const authContext = {
211+
connection: { serverApi: { version: '1' } },
212+
options
213+
};
214+
215+
it('sets the hello parameter to true', async () => {
216+
const handshakeDocument = await prepareHandshakeDocument(authContext);
217+
expect(handshakeDocument).to.have.property('hello', true);
218+
});
219+
});
220+
221+
context('when serverApi is not present', () => {
222+
const options = {};
223+
const authContext = {
224+
connection: {},
225+
options
226+
};
227+
228+
it('sets the legacy hello parameter to true', async () => {
229+
const handshakeDocument = await prepareHandshakeDocument(authContext);
230+
expect(handshakeDocument).to.have.property(LEGACY_HELLO_COMMAND, true);
231+
});
232+
});
233+
234+
context('loadBalanced option', () => {
235+
context('when loadBalanced is not set as an option', () => {
236+
it('does not set loadBalanced on the handshake document', async () => {
237+
const options = {};
238+
const authContext = {
239+
connection: {},
240+
options
241+
};
242+
const handshakeDocument = await prepareHandshakeDocument(authContext);
243+
expect(handshakeDocument).not.to.have.property('loadBalanced');
244+
});
245+
});
246+
247+
context('when loadBalanced is set to false', () => {
248+
it('does not set loadBalanced on the handshake document', async () => {
249+
const options = {
250+
loadBalanced: false
251+
};
252+
const authContext = {
253+
connection: {},
254+
options
255+
};
256+
const handshakeDocument = await prepareHandshakeDocument(authContext);
257+
expect(handshakeDocument).not.to.have.property('loadBalanced');
258+
});
259+
});
260+
261+
context('when loadBalanced is set to true', () => {
262+
it('does set loadBalanced on the handshake document', async () => {
263+
const options = {
264+
loadBalanced: true
265+
};
266+
const authContext = {
267+
connection: {},
268+
options
269+
};
270+
const handshakeDocument = await prepareHandshakeDocument(authContext);
271+
expect(handshakeDocument).to.have.property('loadBalanced', true);
272+
});
273+
});
274+
});
275+
});
276+
});

‎test/unit/cmap/connection.test.ts

+123-3
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import { Socket } from 'net';
44
import * as sinon from 'sinon';
55
import { Readable } from 'stream';
66
import { setTimeout } from 'timers';
7+
import { promisify } from 'util';
78

89
import { BinMsg } from '../../../src/cmap/commands';
910
import { connect } from '../../../src/cmap/connect';
1011
import { Connection, hasSessionSupport } from '../../../src/cmap/connection';
11-
import { MessageStream } from '../../../src/cmap/message_stream';
12+
import { MessageStream, OperationDescription } from '../../../src/cmap/message_stream';
1213
import { MongoNetworkTimeoutError, MongoRuntimeError } from '../../../src/error';
1314
import { isHello, ns } from '../../../src/utils';
1415
import * as mock from '../../tools/mongodb-mock/index';
@@ -24,8 +25,15 @@ const connectionOptionsDefaults = {
2425
loadBalanced: false
2526
};
2627

27-
/** The absolute minimum socket API needed by Connection as of writing this test */
28+
/**
29+
* The absolute minimum socket API needed by these tests
30+
*
31+
* The driver has a greater API requirement for sockets detailed in: NODE-4785
32+
*/
2833
class FakeSocket extends EventEmitter {
34+
destroyed = false;
35+
writableEnded: boolean;
36+
timeout = 0;
2937
address() {
3038
// is never called
3139
}
@@ -41,6 +49,29 @@ class FakeSocket extends EventEmitter {
4149
get remotePort() {
4250
return 123;
4351
}
52+
setTimeout(timeout) {
53+
this.timeout = timeout;
54+
}
55+
}
56+
57+
class InputStream extends Readable {
58+
writableEnded: boolean;
59+
timeout = 0;
60+
61+
constructor(options?) {
62+
super(options);
63+
}
64+
65+
end(cb) {
66+
this.writableEnded = true;
67+
if (typeof cb === 'function') {
68+
process.nextTick(cb);
69+
}
70+
}
71+
72+
setTimeout(timeout) {
73+
this.timeout = timeout;
74+
}
4475
}
4576

4677
describe('new Connection()', function () {
@@ -170,7 +201,7 @@ describe('new Connection()', function () {
170201

171202
context('when multiple hellos exist on the stream', function () {
172203
let callbackSpy;
173-
const inputStream = new Readable();
204+
const inputStream = new InputStream();
174205
const document = { ok: 1 };
175206
const last = { isWritablePrimary: true };
176207

@@ -367,6 +398,95 @@ describe('new Connection()', function () {
367398
});
368399
});
369400
});
401+
402+
context('when sending commands on a connection', () => {
403+
const CONNECT_DEFAULTS = {
404+
id: 1,
405+
tls: false,
406+
generation: 1,
407+
monitorCommands: false,
408+
metadata: {} as ClientMetadata,
409+
loadBalanced: false
410+
};
411+
let server;
412+
let connectOptions;
413+
let connection: Connection;
414+
let streamSetTimeoutSpy;
415+
416+
beforeEach(async () => {
417+
server = await mock.createServer();
418+
server.setMessageHandler(request => {
419+
if (isHello(request.document)) {
420+
request.reply(mock.HELLO);
421+
}
422+
});
423+
connectOptions = {
424+
...CONNECT_DEFAULTS,
425+
hostAddress: server.hostAddress() as HostAddress,
426+
socketTimeoutMS: 15000
427+
};
428+
429+
connection = await promisify<Connection>(callback =>
430+
//@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence
431+
connect(connectOptions, callback)
432+
)();
433+
434+
streamSetTimeoutSpy = sinon.spy(connection.stream, 'setTimeout');
435+
});
436+
437+
afterEach(async () => {
438+
connection.destroy({ force: true });
439+
sinon.restore();
440+
await mock.cleanup();
441+
});
442+
443+
it('sets timeout specified on class before writing to the socket', async () => {
444+
await promisify(callback =>
445+
connection.command(ns('admin.$cmd'), { hello: 1 }, {}, callback)
446+
)();
447+
expect(streamSetTimeoutSpy).to.have.been.calledWith(15000);
448+
});
449+
450+
it('sets timeout specified on options before writing to the socket', async () => {
451+
await promisify(callback =>
452+
connection.command(ns('admin.$cmd'), { hello: 1 }, { socketTimeoutMS: 2000 }, callback)
453+
)();
454+
expect(streamSetTimeoutSpy).to.have.been.calledWith(2000);
455+
});
456+
457+
it('clears timeout after getting a message if moreToCome=false', async () => {
458+
connection.stream.setTimeout(1);
459+
const msg = generateOpMsgBuffer({ hello: 1 });
460+
const msgHeader = {
461+
length: msg.readInt32LE(0),
462+
requestId: 1,
463+
responseTo: 0,
464+
opCode: msg.readInt32LE(12)
465+
};
466+
const msgBody = msg.subarray(16);
467+
msgBody.writeInt32LE(0, 0); // OPTS_MORE_TO_COME
468+
connection.onMessage(new BinMsg(msg, msgHeader, msgBody));
469+
// timeout is still reset
470+
expect(connection.stream).to.have.property('timeout', 0);
471+
});
472+
473+
it('does not clear timeout after getting a message if moreToCome=true', async () => {
474+
connection.stream.setTimeout(1);
475+
const msg = generateOpMsgBuffer({ hello: 1 });
476+
const msgHeader = {
477+
length: msg.readInt32LE(0),
478+
requestId: 1,
479+
responseTo: 0,
480+
opCode: msg.readInt32LE(12)
481+
};
482+
const msgBody = msg.subarray(16);
483+
msgBody.writeInt32LE(2, 0); // OPTS_MORE_TO_COME
484+
connection[getSymbolFrom(connection, 'queue')].set(0, { cb: () => null });
485+
connection.onMessage(new BinMsg(msg, msgHeader, msgBody));
486+
// timeout is still set
487+
expect(connection.stream).to.have.property('timeout', 1);
488+
});
489+
});
370490
});
371491

372492
describe('onTimeout()', () => {

0 commit comments

Comments
 (0)
Please sign in to comment.