From 8338bae933c777dee4e7e49dbcf52c4fd7047528 Mon Sep 17 00:00:00 2001 From: Warren James <28974128+W-A-James@users.noreply.github.com> Date: Fri, 16 Dec 2022 12:56:56 -0500 Subject: [PATCH] fix(NODE-4834): ensure that MessageStream is destroyed when connections are destroyed (#3482) --- src/cmap/connect.ts | 2 +- src/cmap/connection.ts | 53 ++--- src/cmap/connection_pool.ts | 4 +- src/sdam/server.ts | 5 +- src/sdam/topology.ts | 17 +- test/integration/crud/misc_cursors.test.js | 69 ++---- .../node-specific/topology.test.js | 18 +- ...records_for_mongos_discovery.prose.test.ts | 2 +- .../assorted/server_selection_spec_helper.js | 2 +- test/unit/cmap/connection.test.ts | 209 +++++++++++++++++- test/unit/error.test.ts | 2 +- test/unit/sdam/monitor.test.ts | 4 +- test/unit/sdam/topology.test.js | 18 +- 13 files changed, 278 insertions(+), 127 deletions(-) diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index fedf731132..0fc10c93e0 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -96,7 +96,7 @@ function performInitialHandshake( ) { const callback: Callback = function (err, ret) { if (err && conn) { - conn.destroy(); + conn.destroy({ force: false }); } _callback(err, ret); }; diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 85f6000817..06ab8f396f 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -149,7 +149,7 @@ export interface ConnectionOptions /** @public */ export interface DestroyOptions { /** Force the destruction. */ - force?: boolean; + force: boolean; } /** @public */ @@ -170,8 +170,8 @@ export class Connection extends TypedEventEmitter { address: string; socketTimeoutMS: number; monitorCommands: boolean; + /** Indicates that the connection (including underlying TCP socket) has been closed. */ closed: boolean; - destroyed: boolean; lastHelloMS?: number; serverApi?: ServerApi; helloOk?: boolean; @@ -220,7 +220,6 @@ export class Connection extends TypedEventEmitter { this.monitorCommands = options.monitorCommands; this.serverApi = options.serverApi; this.closed = false; - this.destroyed = false; this[kHello] = null; this[kClusterTime] = null; @@ -313,10 +312,7 @@ export class Connection extends TypedEventEmitter { if (this.closed) { return; } - - this[kStream].destroy(error); - - this.closed = true; + this.destroy({ force: false }); for (const op of this[kQueue].values()) { op.cb(error); @@ -330,8 +326,7 @@ export class Connection extends TypedEventEmitter { if (this.closed) { return; } - - this.closed = true; + this.destroy({ force: false }); const message = `connection ${this.id} to ${this.address} closed`; for (const op of this[kQueue].values()) { @@ -348,9 +343,7 @@ export class Connection extends TypedEventEmitter { } this[kDelayedTimeoutId] = setTimeout(() => { - this[kStream].destroy(); - - this.closed = true; + this.destroy({ force: false }); const message = `connection ${this.id} to ${this.address} timed out`; const beforeHandshake = this.hello == null; @@ -459,41 +452,27 @@ export class Connection extends TypedEventEmitter { callback(undefined, message.documents[0]); } - destroy(options?: DestroyOptions, callback?: Callback): void { - if (typeof options === 'function') { - callback = options; - options = { force: false }; - } - + destroy(options: DestroyOptions, callback?: Callback): void { this.removeAllListeners(Connection.PINNED); this.removeAllListeners(Connection.UNPINNED); - options = Object.assign({ force: false }, options); - if (this[kStream] == null || this.destroyed) { - this.destroyed = true; - if (typeof callback === 'function') { - callback(); - } - - return; - } + this[kMessageStream].destroy(); + this.closed = true; if (options.force) { this[kStream].destroy(); - this.destroyed = true; - if (typeof callback === 'function') { - callback(); + if (callback) { + return process.nextTick(callback); } - - return; } - this[kStream].end(() => { - this.destroyed = true; - if (typeof callback === 'function') { - callback(); + if (!this[kStream].writableEnded) { + this[kStream].end(callback); + } else { + if (callback) { + return process.nextTick(callback); } - }); + } } command( diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 5c8cbc9765..d6e19a7e3d 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -515,7 +515,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(this, conn, 'poolClosed') ); - conn.destroy(options, cb); + conn.destroy({ force: !!options.force }, cb); }, err => { this[kConnections].clear(); @@ -591,7 +591,7 @@ export class ConnectionPool extends TypedEventEmitter { new ConnectionClosedEvent(this, connection, reason) ); // destroy the connection - process.nextTick(() => connection.destroy()); + process.nextTick(() => connection.destroy({ force: false })); } private connectionIsStale(connection: Connection) { diff --git a/src/sdam/server.ts b/src/sdam/server.ts index ae7a1fd5f6..9cd204ebf5 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -244,7 +244,10 @@ export class Server extends TypedEventEmitter { /** Destroy the server connection */ destroy(options?: DestroyOptions, callback?: Callback): void { - if (typeof options === 'function') (callback = options), (options = {}); + if (typeof options === 'function') { + callback = options; + options = { force: false }; + } options = Object.assign({}, { force: false }, options); if (this.s.state === STATE_CLOSED) { diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 601ae2c382..8850063c36 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -484,26 +484,17 @@ export class Topology extends TypedEventEmitter { } /** Close this topology */ - close(callback: Callback): void; close(options: CloseOptions): void; close(options: CloseOptions, callback: Callback): void; - close(options?: CloseOptions | Callback, callback?: Callback): void { - if (typeof options === 'function') { - callback = options; - options = {}; - } - - if (typeof options === 'boolean') { - options = { force: options }; - } - options = options ?? {}; + close(options?: CloseOptions, callback?: Callback): void { + options = options ?? { force: false }; if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) { return callback?.(); } const destroyedServers = Array.from(this.s.servers.values(), server => { - return promisify(destroyServer)(server, this, options as CloseOptions); + return promisify(destroyServer)(server, this, { force: !!options?.force }); }); Promise.all(destroyedServers) @@ -765,7 +756,7 @@ function destroyServer( options?: DestroyOptions, callback?: Callback ) { - options = options ?? {}; + options = options ?? { force: false }; for (const event of LOCAL_SERVER_EVENTS) { server.removeAllListeners(event); } diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index ff3776ff35..87fd9187f1 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -13,6 +13,7 @@ const { ReadPreference } = require('../../../src/read_preference'); const { ServerType } = require('../../../src/sdam/common'); const { formatSort } = require('../../../src/sort'); const { getSymbolFrom } = require('../../tools/utils'); +const { MongoExpiredSessionError } = require('../../../src/error'); describe('Cursor', function () { before(function () { @@ -1905,61 +1906,31 @@ describe('Cursor', function () { } }); - it('should close dead tailable cursors', { - metadata: { - os: '!win32' // NODE-2943: timeout on windows - }, - - test: function (done) { - // http://www.mongodb.org/display/DOCS/Tailable+Cursors - - const configuration = this.configuration; - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); - - const db = client.db(configuration.db); - const options = { capped: true, size: 10000000 }; - db.createCollection( - 'test_if_dead_tailable_cursors_close', - options, - function (err, collection) { - expect(err).to.not.exist; + it('closes cursors when client is closed even if it has not been exhausted', async function () { + await client + .db() + .dropCollection('test_cleanup_tailable') + .catch(() => null); - let closeCount = 0; - const docs = Array.from({ length: 100 }).map(() => ({ a: 1 })); - collection.insertMany(docs, { w: 'majority', wtimeoutMS: 5000 }, err => { - expect(err).to.not.exist; - - const cursor = collection.find({}, { tailable: true, awaitData: true }); - const stream = cursor.stream(); + const collection = await client + .db() + .createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 }); - stream.resume(); - - var validator = () => { - closeCount++; - if (closeCount === 2) { - done(); - } - }; + // insert only 2 docs in capped coll of 3 + await collection.insertMany([{ a: 1 }, { a: 1 }]); - // we validate that the stream "ends" either cleanly or with an error - stream.on('end', validator); - stream.on('error', validator); + const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 }); - cursor.on('close', validator); + await cursor.next(); + await cursor.next(); + // will block for maxAwaitTimeMS (except we are closing the client) + const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error); - const docs = Array.from({ length: 100 }).map(() => ({ a: 1 })); - collection.insertMany(docs, err => { - expect(err).to.not.exist; + await client.close(); + expect(cursor).to.have.property('killed', true); - setTimeout(() => client.close()); - }); - }); - } - ); - }); - } + const error = await rejectedEarlyBecauseClientClosed; + expect(error).to.be.instanceOf(MongoExpiredSessionError); }); it('shouldAwaitData', { diff --git a/test/integration/node-specific/topology.test.js b/test/integration/node-specific/topology.test.js index ee806b9691..912c1443c4 100644 --- a/test/integration/node-specific/topology.test.js +++ b/test/integration/node-specific/topology.test.js @@ -10,12 +10,20 @@ describe('Topology', function () { const states = []; topology.on('stateChanged', (_, newState) => states.push(newState)); topology.connect(err => { - expect(err).to.not.exist; - topology.close(err => { + try { expect(err).to.not.exist; - expect(topology.isDestroyed()).to.be.true; - expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']); - done(); + } catch (error) { + done(error); + } + topology.close({}, err => { + try { + expect(err).to.not.exist; + expect(topology.isDestroyed()).to.be.true; + expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']); + done(); + } catch (error) { + done(error); + } }); }); } diff --git a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts index c96f738fa9..339f7c065b 100644 --- a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts +++ b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts @@ -97,7 +97,7 @@ describe('Polling Srv Records for Mongos Discovery', () => { afterEach(function (done) { if (context.topology) { - context.topology.close(done); + context.topology.close({}, done); } else { done(); } diff --git a/test/unit/assorted/server_selection_spec_helper.js b/test/unit/assorted/server_selection_spec_helper.js index 3490ae9aac..724a8e4981 100644 --- a/test/unit/assorted/server_selection_spec_helper.js +++ b/test/unit/assorted/server_selection_spec_helper.js @@ -109,7 +109,7 @@ function executeServerSelectionTest(testDefinition, testDone) { }); function done(err) { - topology.close(e => testDone(e || err)); + topology.close({}, e => testDone(e || err)); } topology.connect(err => { diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 5c8d872bb8..b29bf3a2e9 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -26,6 +26,7 @@ const connectionOptionsDefaults = { /** The absolute minimum socket API needed by Connection as of writing this test */ class FakeSocket extends EventEmitter { + writableEnded: boolean; address() { // is never called } @@ -34,6 +35,14 @@ class FakeSocket extends EventEmitter { } destroy() { // is called, has no side effects + this.writableEnded = true; + } + end(cb) { + this.writableEnded = true; + // nextTick to simulate I/O delay + if (typeof cb === 'function') { + process.nextTick(cb); + } } get remoteAddress() { return 'iLoveJavaScript'; @@ -43,6 +52,20 @@ class FakeSocket extends EventEmitter { } } +class InputStream extends Readable { + writableEnded: boolean; + constructor(options?) { + super(options); + } + + end(cb) { + this.writableEnded = true; + if (typeof cb === 'function') { + process.nextTick(cb); + } + } +} + describe('new Connection()', function () { let server; after(() => mock.cleanup()); @@ -101,7 +124,7 @@ describe('new Connection()', function () { expect(err).to.be.instanceOf(MongoNetworkTimeoutError); expect(result).to.not.exist; - expect(conn).property('stream').property('destroyed', true); + expect(conn).property('stream').property('writableEnded', true); done(); }); @@ -170,7 +193,7 @@ describe('new Connection()', function () { context('when multiple hellos exist on the stream', function () { let callbackSpy; - const inputStream = new Readable(); + const inputStream = new InputStream(); const document = { ok: 1 }; const last = { isWritablePrimary: true }; @@ -389,7 +412,7 @@ describe('new Connection()', function () { connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); kDelayedTimeoutId = getSymbolFrom(connection, 'delayedTimeoutId'); - messageStream = connection[messageStreamSymbol]; + messageStream = sinon.spy(connection[messageStreamSymbol]); }); afterEach(() => { @@ -402,13 +425,15 @@ describe('new Connection()', function () { driverSocket.emit('timeout'); expect(connection.onTimeout).to.have.been.calledOnce; + expect(connection.destroy).to.not.have.been.called; expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass); expect(connection).to.have.property('closed', false); - expect(driverSocket.destroy).to.not.have.been.called; + expect(driverSocket.end).to.not.have.been.called; clock.tick(1); - expect(driverSocket.destroy).to.have.been.calledOnce; + expect(driverSocket.end).to.have.been.calledOnce; + expect(connection.destroy).to.have.been.calledOnce; expect(connection).to.have.property('closed', true); }); @@ -433,6 +458,88 @@ describe('new Connection()', function () { expect(connection).to.have.property('closed', false); expect(connection).to.have.property(kDelayedTimeoutId, null); }); + + it('destroys the message stream and socket', () => { + expect(connection).to.have.property(kDelayedTimeoutId, null); + + driverSocket.emit('timeout'); + + clock.tick(1); + + expect(connection.onTimeout).to.have.been.calledOnce; + expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass); + + expect(messageStream.destroy).to.have.been.calledOnce; + expect(driverSocket.destroy).to.not.have.been.called; + expect(driverSocket.end).to.have.been.calledOnce; + }); + }); + + describe('onError()', () => { + let connection: sinon.SinonSpiedInstance; + let clock: sinon.SinonFakeTimers; + let timerSandbox: sinon.SinonFakeTimers; + let driverSocket: sinon.SinonSpiedInstance; + let messageStream: MessageStream; + beforeEach(() => { + timerSandbox = createTimerSandbox(); + clock = sinon.useFakeTimers(); + driverSocket = sinon.spy(new FakeSocket()); + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); + messageStream = sinon.spy(connection[messageStreamSymbol]); + }); + + afterEach(() => { + timerSandbox.restore(); + clock.restore(); + }); + + it('destroys the message stream and socket', () => { + messageStream.emit('error'); + clock.tick(1); + expect(connection.onError).to.have.been.calledOnce; + connection.destroy({ force: false }); + clock.tick(1); + expect(messageStream.destroy).to.have.been.called; + expect(driverSocket.destroy).to.not.have.been.called; + expect(driverSocket.end).to.have.been.calledOnce; + }); + }); + + describe('onClose()', () => { + let connection: sinon.SinonSpiedInstance; + let clock: sinon.SinonFakeTimers; + let timerSandbox: sinon.SinonFakeTimers; + let driverSocket: sinon.SinonSpiedInstance; + let messageStream: MessageStream; + beforeEach(() => { + timerSandbox = createTimerSandbox(); + clock = sinon.useFakeTimers(); + + driverSocket = sinon.spy(new FakeSocket()); + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); + messageStream = sinon.spy(connection[messageStreamSymbol]); + }); + + afterEach(() => { + timerSandbox.restore(); + clock.restore(); + }); + + it('destroys the message stream and socket', () => { + driverSocket.emit('close'); + clock.tick(1); + expect(connection.onClose).to.have.been.calledOnce; + connection.destroy({ force: false }); + clock.tick(1); + expect(messageStream.destroy).to.have.been.called; + expect(driverSocket.destroy).to.not.have.been.called; + expect(driverSocket.end).to.have.been.calledOnce; + }); }); describe('.hasSessionSupport', function () { @@ -486,4 +593,96 @@ describe('new Connection()', function () { }); }); }); + + describe('destroy()', () => { + let connection: sinon.SinonSpiedInstance; + let clock: sinon.SinonFakeTimers; + let timerSandbox: sinon.SinonFakeTimers; + let driverSocket: sinon.SinonSpiedInstance; + let messageStream: MessageStream; + beforeEach(() => { + timerSandbox = createTimerSandbox(); + clock = sinon.useFakeTimers(); + + driverSocket = sinon.spy(new FakeSocket()); + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); + messageStream = sinon.spy(connection[messageStreamSymbol]); + }); + + afterEach(() => { + timerSandbox.restore(); + clock.restore(); + }); + + context('when options.force == true', function () { + it('calls stream.destroy', () => { + connection.destroy({ force: true }); + clock.tick(1); + expect(driverSocket.destroy).to.have.been.calledOnce; + }); + + it('does not call stream.end', () => { + connection.destroy({ force: true }); + clock.tick(1); + expect(driverSocket.end).to.not.have.been.called; + }); + + it('destroys the tcp socket', () => { + connection.destroy({ force: true }); + clock.tick(1); + expect(driverSocket.destroy).to.have.been.calledOnce; + }); + + it('destroys the messageStream', () => { + connection.destroy({ force: true }); + clock.tick(1); + expect(messageStream.destroy).to.have.been.calledOnce; + }); + + it('calls stream.destroy whenever destroy is called ', () => { + connection.destroy({ force: true }); + connection.destroy({ force: true }); + connection.destroy({ force: true }); + clock.tick(1); + expect(driverSocket.destroy).to.have.been.calledThrice; + }); + }); + + context('when options.force == false', function () { + it('calls stream.end', () => { + connection.destroy({ force: false }); + clock.tick(1); + expect(driverSocket.end).to.have.been.calledOnce; + }); + + it('does not call stream.destroy', () => { + connection.destroy({ force: false }); + clock.tick(1); + expect(driverSocket.destroy).to.not.have.been.called; + }); + + it('ends the tcp socket', () => { + connection.destroy({ force: false }); + clock.tick(1); + expect(driverSocket.end).to.have.been.calledOnce; + }); + + it('destroys the messageStream', () => { + connection.destroy({ force: false }); + clock.tick(1); + expect(messageStream.destroy).to.have.been.calledOnce; + }); + + it('calls stream.end exactly once when destroy is called multiple times', () => { + connection.destroy({ force: false }); + connection.destroy({ force: false }); + connection.destroy({ force: false }); + connection.destroy({ force: false }); + clock.tick(1); + expect(driverSocket.end).to.have.been.calledOnce; + }); + }); + }); }); diff --git a/test/unit/error.test.ts b/test/unit/error.test.ts index 3ae43e35ea..14a624738f 100644 --- a/test/unit/error.test.ts +++ b/test/unit/error.test.ts @@ -379,7 +379,7 @@ describe('MongoErrors', () => { makeAndConnectReplSet((err, topology) => { // cleanup the server before calling done - const cleanup = err => topology.close(err2 => done(err || err2)); + const cleanup = err => topology.close({}, err2 => done(err || err2)); if (err) { return cleanup(err); diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index be7998d224..4bb611155d 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -52,7 +52,7 @@ describe('monitoring', function () { const serverDescription = Array.from(topology.description.servers.values())[0]; expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); - topology.close(done as any); + topology.close({}, done as any); }, 500); }); }).skipReason = 'TODO(NODE-3819): Unskip flaky tests'; @@ -92,7 +92,7 @@ describe('monitoring', function () { const serverDescription = Array.from(topology.description.servers.values())[0]; expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); - topology.close(done); + topology.close({}, done); }); }).skipReason = 'TODO(NODE-3600): Unskip flaky tests'; diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index 978386697b..d3313010f3 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -26,7 +26,7 @@ describe('Topology (unit)', function () { } if (topology) { - topology.close(); + topology.close({}); } }); @@ -107,7 +107,7 @@ describe('Topology (unit)', function () { topology.connect(() => { expect(topology.shouldCheckForSessionSupport()).to.be.true; - topology.close(done); + topology.close({}, done); }); }); @@ -127,7 +127,7 @@ describe('Topology (unit)', function () { topology.connect(() => { expect(topology.shouldCheckForSessionSupport()).to.be.false; - topology.close(done); + topology.close({}, done); }); }); @@ -147,7 +147,7 @@ describe('Topology (unit)', function () { topology.connect(() => { expect(topology.shouldCheckForSessionSupport()).to.be.false; - topology.close(done); + topology.close({}, done); }); }); }); @@ -182,7 +182,7 @@ describe('Topology (unit)', function () { expect(err).to.exist; expect(err).to.match(/timed out/); - topology.close(done); + topology.close({}, done); }); }); }); @@ -325,7 +325,7 @@ describe('Topology (unit)', function () { expect(err).to.exist; expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.false; - topology.close(done); + topology.close({}, done); }); }); }); @@ -467,7 +467,7 @@ describe('Topology (unit)', function () { it('should clean up listeners on close', function (done) { topology.s.state = 'connected'; // fake state to test clean up logic - topology.close(e => { + topology.close({}, e => { const srvPollerListeners = topology.s.srvPoller.listeners( SrvPoller.SRV_RECORD_DISCOVERY ); @@ -547,7 +547,7 @@ describe('Topology (unit)', function () { // occurs `requestCheck` will be called for an immediate check. expect(requestCheck).property('callCount').to.equal(1); - topology.close(done); + topology.close({}, done); }); }); }); @@ -559,7 +559,7 @@ describe('Topology (unit)', function () { this.emit('connect'); }); - topology.close(() => { + topology.close({}, () => { topology.selectServer(ReadPreference.primary, { serverSelectionTimeoutMS: 2000 }, err => { expect(err).to.exist; expect(err).to.match(/Topology is closed/);