diff --git a/.gitignore b/.gitignore index d6dd2cc16d..9ad3d724ce 100644 --- a/.gitignore +++ b/.gitignore @@ -77,4 +77,4 @@ etc/docs/build !docs/*/lib !docs/**/*.png !docs/**/*.css -!docs/**/*.js \ No newline at end of file +!docs/**/*.js diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index 87acf9caa7..78e4445f25 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -458,6 +458,7 @@ export class KillCursor { } } +/** @internal */ export interface MessageHeader { length: number; requestId: number; @@ -466,6 +467,7 @@ export interface MessageHeader { fromCompressed?: boolean; } +/** @internal */ export interface OpResponseOptions extends BSONSerializeOptions { raw?: boolean; documentsReturnedIn?: string | null; @@ -640,6 +642,7 @@ const OPTS_CHECKSUM_PRESENT = 1; const OPTS_MORE_TO_COME = 2; const OPTS_EXHAUST_ALLOWED = 1 << 16; +/** @internal */ export interface OpMsgOptions { requestId: number; serializeFunctions: boolean; diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index e5665e9116..f411316aac 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -75,6 +75,8 @@ const kHello = Symbol('hello'); const kAutoEncrypter = Symbol('autoEncrypter'); /** @internal */ const kFullResult = Symbol('fullResult'); +/** @internal */ +const kDelayedTimeoutId = Symbol('delayedTimeoutId'); /** @internal */ export interface QueryOptions extends BSONSerializeOptions { @@ -191,6 +193,9 @@ export class Connection extends TypedEventEmitter { lastHelloMS?: number; serverApi?: ServerApi; helloOk?: boolean; + + /**@internal */ + [kDelayedTimeoutId]: NodeJS.Timeout | null; /** @internal */ [kDescription]: StreamDescription; /** @internal */ @@ -245,19 +250,21 @@ export class Connection extends TypedEventEmitter { ...options, maxBsonMessageSize: this.hello?.maxBsonMessageSize }); - this[kMessageStream].on('message', messageHandler(this)); this[kStream] = stream; - stream.on('error', () => { + + this[kDelayedTimeoutId] = null; + + this[kMessageStream].on('message', message => this.onMessage(message)); + this[kMessageStream].on('error', error => this.onError(error)); + this[kStream].on('close', () => this.onClose()); + this[kStream].on('timeout', () => this.onTimeout()); + this[kStream].on('error', () => { /* ignore errors, listen to `close` instead */ }); - this[kMessageStream].on('error', error => this.handleIssue({ destroy: error })); - stream.on('close', () => this.handleIssue({ isClose: true })); - stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true })); - // hook the message stream up to the passed in stream - stream.pipe(this[kMessageStream]); - this[kMessageStream].pipe(stream); + this[kStream].pipe(this[kMessageStream]); + this[kMessageStream].pipe(this[kStream]); } get description(): StreamDescription { @@ -309,40 +316,133 @@ export class Connection extends TypedEventEmitter { this[kLastUseTime] = now(); } - handleIssue(issue: { isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }): void { + onError(error: Error) { if (this.closed) { return; } - if (issue.destroy) { - this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); + this[kStream].destroy(error); + + this.closed = true; + + for (const op of this[kQueue].values()) { + op.cb(error); + } + + this[kQueue].clear(); + this.emit(Connection.CLOSE); + } + + onClose() { + if (this.closed) { + return; } this.closed = true; - for (const [, op] of this[kQueue]) { - if (issue.isTimeout) { - op.cb( - new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, { - beforeHandshake: this.hello == null - }) - ); - } else if (issue.isClose) { - op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)); - } else { - op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); - } + const message = `connection ${this.id} to ${this.address} closed`; + for (const op of this[kQueue].values()) { + op.cb(new MongoNetworkError(message)); } this[kQueue].clear(); this.emit(Connection.CLOSE); } - destroy(): void; - destroy(callback: Callback): void; - destroy(options: DestroyOptions): void; - destroy(options: DestroyOptions, callback: Callback): void; - destroy(options?: DestroyOptions | Callback, callback?: Callback): void { + onTimeout() { + if (this.closed) { + return; + } + + this[kDelayedTimeoutId] = setTimeout(() => { + this[kStream].destroy(); + + this.closed = true; + + const message = `connection ${this.id} to ${this.address} timed out`; + const beforeHandshake = this.hello == null; + for (const op of this[kQueue].values()) { + op.cb(new MongoNetworkTimeoutError(message, { beforeHandshake })); + } + + this[kQueue].clear(); + this.emit(Connection.CLOSE); + }, 1).unref(); // No need for this timer to hold the event loop open + } + + onMessage(message: BinMsg | Response) { + const delayedTimeoutId = this[kDelayedTimeoutId]; + if (delayedTimeoutId != null) { + clearTimeout(delayedTimeoutId); + this[kDelayedTimeoutId] = null; + } + + // always emit the message, in case we are streaming + this.emit('message', message); + const operationDescription = this[kQueue].get(message.responseTo); + if (!operationDescription) { + return; + } + + const callback = operationDescription.cb; + + // SERVER-45775: For exhaust responses we should be able to use the same requestId to + // track response, however the server currently synthetically produces remote requests + // making the `responseTo` change on each response + this[kQueue].delete(message.responseTo); + if ('moreToCome' in message && message.moreToCome) { + // requeue the callback for next synthetic request + this[kQueue].set(message.requestId, operationDescription); + } else if (operationDescription.socketTimeoutOverride) { + this[kStream].setTimeout(this.socketTimeoutMS); + } + + try { + // Pass in the entire description because it has BSON parsing options + message.parse(operationDescription); + } catch (err) { + // If this error is generated by our own code, it will already have the correct class applied + // if it is not, then it is coming from a catastrophic data parse failure or the BSON library + // in either case, it should not be wrapped + callback(err); + return; + } + + if (message.documents[0]) { + const document: Document = message.documents[0]; + const session = operationDescription.session; + if (session) { + updateSessionFromResponse(session, document); + } + + if (document.$clusterTime) { + this[kClusterTime] = document.$clusterTime; + this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); + } + + if (operationDescription.command) { + if (document.writeConcernError) { + callback(new MongoWriteConcernError(document.writeConcernError, document)); + return; + } + + if (document.ok === 0 || document.$err || document.errmsg || document.code) { + callback(new MongoServerError(document)); + return; + } + } else { + // Pre 3.2 support + if (document.ok === 0 || document.$err || document.errmsg) { + callback(new MongoServerError(document)); + return; + } + } + } + + callback(undefined, operationDescription.fullResult ? message : message.documents[0]); + } + + destroy(options?: DestroyOptions, callback?: Callback): void { if (typeof options === 'function') { callback = options; options = { force: false }; @@ -379,7 +479,6 @@ export class Connection extends TypedEventEmitter { }); } - /** @internal */ command( ns: MongoDBNamespace, cmd: Document, @@ -456,7 +555,6 @@ export class Connection extends TypedEventEmitter { } } - /** @internal */ query(ns: MongoDBNamespace, cmd: Document, options: QueryOptions, callback: Callback): void { const isExplain = cmd.$explain != null; const readPreference = options.readPreference ?? ReadPreference.primary; @@ -529,7 +627,6 @@ export class Connection extends TypedEventEmitter { ); } - /** @internal */ getMore( ns: MongoDBNamespace, cursorId: Long, @@ -586,7 +683,6 @@ export class Connection extends TypedEventEmitter { this.command(ns, getMoreCmd, commandOptions, callback); } - /** @internal */ killCursors( ns: MongoDBNamespace, cursorIds: Long[], @@ -706,74 +802,6 @@ function supportsOpMsg(conn: Connection) { return maxWireVersion(conn) >= 6 && !description.__nodejs_mock_server__; } -function messageHandler(conn: Connection) { - return function messageHandler(message: BinMsg | Response) { - // always emit the message, in case we are streaming - conn.emit('message', message); - const operationDescription = conn[kQueue].get(message.responseTo); - if (!operationDescription) { - return; - } - - const callback = operationDescription.cb; - - // SERVER-45775: For exhaust responses we should be able to use the same requestId to - // track response, however the server currently synthetically produces remote requests - // making the `responseTo` change on each response - conn[kQueue].delete(message.responseTo); - if ('moreToCome' in message && message.moreToCome) { - // requeue the callback for next synthetic request - conn[kQueue].set(message.requestId, operationDescription); - } else if (operationDescription.socketTimeoutOverride) { - conn[kStream].setTimeout(conn.socketTimeoutMS); - } - - try { - // Pass in the entire description because it has BSON parsing options - message.parse(operationDescription); - } catch (err) { - // If this error is generated by our own code, it will already have the correct class applied - // if it is not, then it is coming from a catastrophic data parse failure or the BSON library - // in either case, it should not be wrapped - callback(err); - return; - } - - if (message.documents[0]) { - const document: Document = message.documents[0]; - const session = operationDescription.session; - if (session) { - updateSessionFromResponse(session, document); - } - - if (document.$clusterTime) { - conn[kClusterTime] = document.$clusterTime; - conn.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); - } - - if (operationDescription.command) { - if (document.writeConcernError) { - callback(new MongoWriteConcernError(document.writeConcernError, document)); - return; - } - - if (document.ok === 0 || document.$err || document.errmsg || document.code) { - callback(new MongoServerError(document)); - return; - } - } else { - // Pre 3.2 support - if (document.ok === 0 || document.$err || document.errmsg) { - callback(new MongoServerError(document)); - return; - } - } - } - - callback(undefined, operationDescription.fullResult ? message : message.documents[0]); - }; -} - function streamIdentifier(stream: Stream, options: ConnectionOptions): string { if (options.proxyHost) { // If proxy options are specified, the properties of `stream` itself diff --git a/src/index.ts b/src/index.ts index da8cc2614d..efac9dfe0b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -186,12 +186,17 @@ export type { MongoCredentialsOptions } from './cmap/auth/mongo_credentials'; export type { + BinMsg, GetMore, KillCursor, + MessageHeader, Msg, OpGetMoreOptions, + OpMsgOptions, OpQueryOptions, + OpResponseOptions, Query, + Response, WriteProtocolMessageType } from './cmap/commands'; export type { LEGAL_TCP_SOCKET_OPTIONS, LEGAL_TLS_SOCKET_OPTIONS, Stream } from './cmap/connect'; diff --git a/test/tools/runner/hooks/configuration.js b/test/tools/runner/hooks/configuration.js index 881dedcac3..f7ad51d89d 100644 --- a/test/tools/runner/hooks/configuration.js +++ b/test/tools/runner/hooks/configuration.js @@ -66,15 +66,14 @@ async function initializeFilters(client) { } const testSkipBeforeEachHook = async function () { - // `metadata` always exists, `requires` is optional - const requires = this.currentTest.metadata.requires; + const metadata = this.currentTest.metadata; - if (requires && Object.keys(requires).length > 0) { + if (metadata && metadata.requires && Object.keys(metadata.requires).length > 0) { const failedFilter = filters.find(filter => !filter.filter(this.currentTest)); if (failedFilter) { const filterName = failedFilter.constructor.name; - const metadataString = inspect(requires, { + const metadataString = inspect(metadata.requires, { colors: true, compact: true, depth: 10, diff --git a/test/unit/cmap/connection.test.js b/test/unit/cmap/connection.test.js deleted file mode 100644 index 4b2a1fa09e..0000000000 --- a/test/unit/cmap/connection.test.js +++ /dev/null @@ -1,155 +0,0 @@ -'use strict'; - -const mock = require('../../tools/mongodb-mock/index'); -const { connect } = require('../../../src/cmap/connect'); -const { Connection, hasSessionSupport } = require('../../../src/cmap/connection'); -const { expect } = require('chai'); -const { Socket } = require('net'); -const { ns, isHello } = require('../../../src/utils'); -const { getSymbolFrom } = require('../../tools/utils'); - -describe('Connection - unit/cmap', function () { - let server; - after(() => mock.cleanup()); - before(() => mock.createServer().then(s => (server = s))); - - it('should support fire-and-forget messages', function (done) { - server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(mock.HELLO); - } - - // blackhole all other requests - }); - - connect({ connectionType: Connection, hostAddress: server.hostAddress() }, (err, conn) => { - expect(err).to.not.exist; - expect(conn).to.exist; - - conn.command(ns('$admin.cmd'), { ping: 1 }, { noResponse: true }, (err, result) => { - expect(err).to.not.exist; - expect(result).to.not.exist; - - done(); - }); - }); - }); - - it('should destroy streams which time out', function (done) { - server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(mock.HELLO); - } - - // blackhole all other requests - }); - - connect({ connectionType: Connection, hostAddress: server.hostAddress() }, (err, conn) => { - expect(err).to.not.exist; - expect(conn).to.exist; - - conn.command(ns('$admin.cmd'), { ping: 1 }, { socketTimeoutMS: 50 }, (err, result) => { - expect(err).to.exist; - expect(result).to.not.exist; - - expect(conn).property('stream').property('destroyed').to.be.true; - - done(); - }); - }); - }); - - it('should throw a network error with kBeforeHandshake set to false on timeout after hand shake', function (done) { - server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(mock.HELLO); - } - // respond to no other requests to trigger timeout event - }); - - const options = { - hostAddress: server.hostAddress() - }; - - connect(options, (err, conn) => { - expect(err).to.be.a('undefined'); - expect(conn).to.be.instanceOf(Connection); - expect(conn).to.have.property('hello').that.is.a('object'); - - conn.command(ns('$admin.cmd'), { ping: 1 }, { socketTimeoutMS: 50 }, err => { - const beforeHandshakeSymbol = getSymbolFrom(err, 'beforeHandshake', false); - expect(beforeHandshakeSymbol).to.be.a('symbol'); - expect(err).to.have.property(beforeHandshakeSymbol, false); - - done(); - }); - }); - }); - - it('should throw a network error with kBeforeHandshake set to true on timeout before hand shake', function (done) { - // respond to no requests to trigger timeout event - server.setMessageHandler(() => {}); - - const options = { - hostAddress: server.hostAddress(), - socketTimeoutMS: 50 - }; - - connect(options, (err, conn) => { - expect(conn).to.be.a('undefined'); - - const beforeHandshakeSymbol = getSymbolFrom(err, 'beforeHandshake'); - expect(err).to.have.property(beforeHandshakeSymbol, true); - - done(); - }); - }); - - describe('.hasSessionSupport', function () { - let connection; - const stream = new Socket(); - - context('when logicalSessionTimeoutMinutes is present', function () { - beforeEach(function () { - connection = new Connection(stream, { - hostAddress: server.hostAddress(), - logicalSessionTimeoutMinutes: 5 - }); - }); - - it('returns true', function () { - expect(hasSessionSupport(connection)).to.be.true; - }); - }); - - context('when logicalSessionTimeoutMinutes is not present', function () { - context('when in load balancing mode', function () { - beforeEach(function () { - connection = new Connection(stream, { - hostAddress: server.hostAddress(), - loadBalanced: true - }); - }); - - it('returns true', function () { - expect(hasSessionSupport(connection)).to.be.true; - }); - }); - - context('when not in load balancing mode', function () { - beforeEach(function () { - connection = new Connection(stream, { - hostAddress: server.hostAddress() - }); - }); - - it('returns false', function () { - expect(hasSessionSupport(connection)).to.be.false; - }); - }); - }); - }); -}); diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts new file mode 100644 index 0000000000..952b138349 --- /dev/null +++ b/test/unit/cmap/connection.test.ts @@ -0,0 +1,271 @@ +import { expect } from 'chai'; +import { EventEmitter } from 'events'; +import { Socket } from 'net'; +import * as sinon from 'sinon'; + +import { connect } from '../../../src/cmap/connect'; +import { Connection, hasSessionSupport } from '../../../src/cmap/connection'; +import { MessageStream } from '../../../src/cmap/message_stream'; +import { MongoNetworkTimeoutError } from '../../../src/error'; +import { isHello, ns } from '../../../src/utils'; +import * as mock from '../../tools/mongodb-mock/index'; +import { getSymbolFrom } from '../../tools/utils'; + +const connectionOptionsDefaults = { + id: 0, + generation: 0, + monitorCommands: false, + tls: false, + metadata: undefined, + loadBalanced: false +}; + +describe('new Connection()', function () { + let server; + after(() => mock.cleanup()); + before(() => mock.createServer().then(s => (server = s))); + + it('should support fire-and-forget messages', function (done) { + server.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply(mock.HELLO); + } + + // blackhole all other requests + }); + + const options = { + ...connectionOptionsDefaults, + connectionType: Connection, + hostAddress: server.hostAddress() + }; + + connect(options, (err, conn) => { + expect(err).to.not.exist; + expect(conn).to.exist; + + conn.command(ns('$admin.cmd'), { ping: 1 }, { noResponse: true }, (err, result) => { + expect(err).to.not.exist; + expect(result).to.not.exist; + + done(); + }); + }); + }); + + it('should destroy streams which time out', function (done) { + server.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply(mock.HELLO); + } + + // blackhole all other requests + }); + + const options = { + ...connectionOptionsDefaults, + connectionType: Connection, + hostAddress: server.hostAddress() + }; + + connect(options, (err, conn) => { + expect(err).to.not.exist; + expect(conn).to.exist; + + conn.command(ns('$admin.cmd'), { ping: 1 }, { socketTimeoutMS: 50 }, (err, result) => { + expect(err).to.be.instanceOf(MongoNetworkTimeoutError); + expect(result).to.not.exist; + + expect(conn).property('stream').property('destroyed', true); + + done(); + }); + }); + }); + + it('should throw a network error with kBeforeHandshake set to false on timeout after handshake', function (done) { + server.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply(mock.HELLO); + } + // respond to no other requests to trigger timeout event + }); + + const options = { + hostAddress: server.hostAddress(), + ...connectionOptionsDefaults + }; + + connect(options, (err, conn) => { + expect(err).to.be.a('undefined'); + expect(conn).to.be.instanceOf(Connection); + expect(conn).to.have.property('hello').that.is.a('object'); + + conn.command(ns('$admin.cmd'), { ping: 1 }, { socketTimeoutMS: 50 }, err => { + const beforeHandshakeSymbol = getSymbolFrom(err, 'beforeHandshake', false); + expect(beforeHandshakeSymbol).to.be.a('symbol'); + expect(err).to.have.property(beforeHandshakeSymbol, false); + + done(); + }); + }); + }); + + it('should throw a network error with kBeforeHandshake set to true on timeout before handshake', function (done) { + server.setMessageHandler(() => { + // respond to no requests to trigger timeout event + }); + + const options = { + ...connectionOptionsDefaults, + hostAddress: server.hostAddress(), + socketTimeoutMS: 50 + }; + + connect(options, (err, conn) => { + expect(conn).to.be.a('undefined'); + + const beforeHandshakeSymbol = getSymbolFrom(err, 'beforeHandshake'); + expect(err).to.have.property(beforeHandshakeSymbol, true); + + done(); + }); + }); + + describe('onTimeout()', () => { + let connection: sinon.SinonSpiedInstance; + let clock: sinon.SinonFakeTimers; + let driverSocket: sinon.SinonSpiedInstance; + let messageStream: MessageStream; + let kDelayedTimeoutId: symbol; + let NodeJSTimeoutClass: any; + + /** The absolute minimum socket API needed by Connection as of writing this test */ + class FakeSocket extends EventEmitter { + address() { + // is never called + } + pipe() { + // does not need to do anything + } + destroy() { + // is called, has no side effects + } + get remoteAddress() { + return 'iLoveJavaScript'; + } + get remotePort() { + return 123; + } + } + + beforeEach(() => { + clock = sinon.useFakeTimers(); + + NodeJSTimeoutClass = setTimeout(() => null, 1).constructor; + + driverSocket = sinon.spy(new FakeSocket()); + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); + kDelayedTimeoutId = getSymbolFrom(connection, 'delayedTimeoutId'); + messageStream = connection[messageStreamSymbol]; + }); + + afterEach(() => { + clock.restore(); + }); + + it('should delay timeout errors by one tick', async () => { + expect(connection).to.have.property(kDelayedTimeoutId, null); + + driverSocket.emit('timeout'); + expect(connection.onTimeout).to.have.been.calledOnce; + expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass); + expect(connection).to.have.property('closed', false); + expect(driverSocket.destroy).to.not.have.been.called; + + clock.tick(1); + + expect(driverSocket.destroy).to.have.been.calledOnce; + expect(connection).to.have.property('closed', true); + }); + + it('should clear timeout errors if more data is available', () => { + expect(connection).to.have.property(kDelayedTimeoutId, null); + + driverSocket.emit('timeout'); + expect(connection.onTimeout).to.have.been.calledOnce; + expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass); + + // emit a message before the clock ticks even once + // onMessage ignores unknown 'responseTo' value + messageStream.emit('message', { responseTo: null }); + + // New message before clock ticks 1 will clear the timeout + expect(connection).to.have.property(kDelayedTimeoutId, null); + + // ticking the clock should do nothing, there is no timeout anymore + clock.tick(1); + + expect(driverSocket.destroy).to.not.have.been.called; + expect(connection).to.have.property('closed', false); + expect(connection).to.have.property(kDelayedTimeoutId, null); + }); + }); + + describe('.hasSessionSupport', function () { + let connection; + const stream = new Socket(); + + context('when logicalSessionTimeoutMinutes is present', function () { + beforeEach(function () { + const options = { + ...connectionOptionsDefaults, + hostAddress: server.hostAddress(), + logicalSessionTimeoutMinutes: 5 + }; + connection = new Connection(stream, options); + }); + + it('returns true', function () { + expect(hasSessionSupport(connection)).to.be.true; + }); + }); + + context('when logicalSessionTimeoutMinutes is not present', function () { + context('when in load balancing mode', function () { + beforeEach(function () { + const options = { + ...connectionOptionsDefaults, + hostAddress: server.hostAddress(), + loadBalanced: true + }; + connection = new Connection(stream, options); + }); + + it('returns true', function () { + expect(hasSessionSupport(connection)).to.be.true; + }); + }); + + context('when not in load balancing mode', function () { + beforeEach(function () { + const options = { + ...connectionOptionsDefaults, + hostAddress: server.hostAddress(), + loadBalanced: false + }; + connection = new Connection(stream, options); + }); + + it('returns false', function () { + expect(hasSessionSupport(connection)).to.be.false; + }); + }); + }); + }); +});