From 5132bc9d8ba816b0d98e1fceb6dfc96be0beb1da Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 25 Mar 2022 18:01:02 -0400 Subject: [PATCH] feat(NODE-3697): reduce serverSession allocation (#3171) --- src/cmap/connection.ts | 2 +- src/operations/operation.ts | 5 +- src/sessions.ts | 120 ++++--- .../sessions/sessions.spec.prose.test.ts | 51 +++ test/integration/sessions/sessions.test.ts | 33 ++ test/tools/cluster_setup.sh | 2 +- test/tools/spec-runner/index.js | 2 + test/unit/sessions.test.js | 316 ++++++++++++++---- 8 files changed, 421 insertions(+), 110 deletions(-) create mode 100644 test/integration/sessions/sessions.spec.prose.test.ts diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index e5665e9116..c15d0fadfb 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -114,7 +114,7 @@ export interface CommandOptions extends BSONSerializeOptions { // Applying a session to a command should happen as part of command construction, // most likely in the CommandOperation#executeCommand method, where we have access to // the details we need to determine if a txnNum should also be applied. - willRetryWrite?: true; + willRetryWrite?: boolean; writeConcern?: WriteConcern; } diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 82265d06f3..e21a87585c 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -25,7 +25,7 @@ export interface OperationConstructor extends Function { export interface OperationOptions extends BSONSerializeOptions { /** Specify ClientSession for this command */ session?: ClientSession; - willRetryWrites?: boolean; + willRetryWrite?: boolean; /** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */ readPreference?: ReadPreferenceLike; @@ -56,8 +56,7 @@ export abstract class AbstractOperation { // BSON serialization options bsonOptions?: BSONSerializeOptions; - // TODO: Each operation defines its own options, there should be better typing here - options: Document; + options: OperationOptions; [kSession]: ClientSession | undefined; diff --git a/src/sessions.ts b/src/sessions.ts index 7c853740f7..3fe4620a33 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -42,20 +42,6 @@ import { const minWireVersionForShardedTransactions = 8; -function assertAlive(session: ClientSession, callback?: Callback): boolean { - if (session.serverSession == null) { - const error = new MongoExpiredSessionError(); - if (typeof callback === 'function') { - callback(error); - return false; - } - - throw error; - } - - return true; -} - /** @public */ export interface ClientSessionOptions { /** Whether causal consistency should be enabled on this session */ @@ -89,6 +75,8 @@ const kSnapshotTime = Symbol('snapshotTime'); const kSnapshotEnabled = Symbol('snapshotEnabled'); /** @internal */ const kPinnedConnection = Symbol('pinnedConnection'); +/** @internal Accumulates total number of increments to add to txnNumber when applying session to command */ +const kTxnNumberIncrement = Symbol('txnNumberIncrement'); /** @public */ export interface EndSessionOptions { @@ -123,13 +111,15 @@ export class ClientSession extends TypedEventEmitter { defaultTransactionOptions: TransactionOptions; transaction: Transaction; /** @internal */ - [kServerSession]?: ServerSession; + [kServerSession]: ServerSession | null; /** @internal */ [kSnapshotTime]?: Timestamp; /** @internal */ [kSnapshotEnabled] = false; /** @internal */ [kPinnedConnection]?: Connection; + /** @internal */ + [kTxnNumberIncrement]: number; /** * Create a client session. @@ -172,7 +162,10 @@ export class ClientSession extends TypedEventEmitter { this.sessionPool = sessionPool; this.hasEnded = false; this.clientOptions = clientOptions; - this[kServerSession] = undefined; + + this.explicit = !!options.explicit; + this[kServerSession] = this.explicit ? this.sessionPool.acquire() : null; + this[kTxnNumberIncrement] = 0; this.supports = { causalConsistency: options.snapshot !== true && options.causalConsistency !== false @@ -181,7 +174,6 @@ export class ClientSession extends TypedEventEmitter { this.clusterTime = options.initialClusterTime; this.operationTime = undefined; - this.explicit = !!options.explicit; this.owner = options.owner; this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions); this.transaction = new Transaction(); @@ -189,16 +181,22 @@ export class ClientSession extends TypedEventEmitter { /** The server id associated with this session */ get id(): ServerSessionId | undefined { - return this.serverSession?.id; + return this[kServerSession]?.id; } get serverSession(): ServerSession { - if (this[kServerSession] == null) { - this[kServerSession] = this.sessionPool.acquire(); + let serverSession = this[kServerSession]; + if (serverSession == null) { + if (this.explicit) { + throw new MongoRuntimeError('Unexpected null serverSession for an explicit session'); + } + if (this.hasEnded) { + throw new MongoRuntimeError('Unexpected null serverSession for an ended implicit session'); + } + serverSession = this.sessionPool.acquire(); + this[kServerSession] = serverSession; } - - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this[kServerSession]!; + return serverSession; } /** Whether or not this session is configured for snapshot reads */ @@ -267,9 +265,15 @@ export class ClientSession extends TypedEventEmitter { const completeEndSession = () => { maybeClearPinnedConnection(this, finalOptions); - // release the server session back to the pool - this.sessionPool.release(this.serverSession); - this[kServerSession] = undefined; + const serverSession = this[kServerSession]; + if (serverSession != null) { + // release the server session back to the pool + this.sessionPool.release(serverSession); + // Make sure a new serverSession never makes it on to the ClientSession + Object.defineProperty(this, kServerSession, { + value: ServerSession.clone(serverSession) + }); + } // mark the session as ended, and emit a signal this.hasEnded = true; @@ -279,7 +283,9 @@ export class ClientSession extends TypedEventEmitter { done(); }; - if (this.serverSession && this.inTransaction()) { + if (this.inTransaction()) { + // If we've reached endSession and the transaction is still active + // by default we abort it this.abortTransaction(err => { if (err) return done(err); completeEndSession(); @@ -353,12 +359,16 @@ export class ClientSession extends TypedEventEmitter { return this.id.id.buffer.equals(session.id.id.buffer); } - /** Increment the transaction number on the internal ServerSession */ + /** + * Increment the transaction number on the internal ServerSession + * + * @privateRemarks + * This helper increments a value stored on the client session that will be + * added to the serverSession's txnNumber upon applying it to a command. + * This is because the serverSession is lazily acquired after a connection is obtained + */ incrementTransactionNumber(): void { - if (this.serverSession) { - this.serverSession.txnNumber = - typeof this.serverSession.txnNumber === 'number' ? this.serverSession.txnNumber + 1 : 0; - } + this[kTxnNumberIncrement] += 1; } /** @returns whether this session is currently in a transaction or not */ @@ -376,7 +386,6 @@ export class ClientSession extends TypedEventEmitter { throw new MongoCompatibilityError('Transactions are not allowed with snapshot sessions'); } - assertAlive(this); if (this.inTransaction()) { throw new MongoTransactionError('Transaction already in progress'); } @@ -627,7 +636,7 @@ function attemptTransaction( throw err; } - if (session.transaction.isActive) { + if (session.inTransaction()) { return session.abortTransaction().then(() => maybeRetryOrThrow(err)); } @@ -641,11 +650,6 @@ function endTransaction( commandName: 'abortTransaction' | 'commitTransaction', callback: Callback ) { - if (!assertAlive(session, callback)) { - // checking result in case callback was called - return; - } - // handle any initial problematic cases const txnState = session.transaction.state; @@ -750,7 +754,6 @@ function endTransaction( callback(error, result); } - // Assumption here that commandName is "commitTransaction" or "abortTransaction" if (session.transaction.recoveryToken) { command.recoveryToken = session.transaction.recoveryToken; } @@ -832,6 +835,30 @@ export class ServerSession { return idleTimeMinutes > sessionTimeoutMinutes - 1; } + + /** + * @internal + * Cloning meant to keep a readable reference to the server session data + * after ClientSession has ended + */ + static clone(serverSession: ServerSession): Readonly { + const arrayBuffer = new ArrayBuffer(16); + const idBytes = Buffer.from(arrayBuffer); + idBytes.set(serverSession.id.id.buffer); + + const id = new Binary(idBytes, serverSession.id.id.sub_type); + + // Manual prototype construction to avoid modifying the constructor of this class + return Object.setPrototypeOf( + { + id: { id }, + lastUse: serverSession.lastUse, + txnNumber: serverSession.txnNumber, + isDirty: serverSession.isDirty + }, + ServerSession.prototype + ); + } } /** @@ -944,11 +971,11 @@ export function applySession( command: Document, options: CommandOptions ): MongoDriverError | undefined { - // TODO: merge this with `assertAlive`, did not want to throw a try/catch here if (session.hasEnded) { return new MongoExpiredSessionError(); } + // May acquire serverSession here const serverSession = session.serverSession; if (serverSession == null) { return new MongoRuntimeError('Unable to acquire server session'); @@ -966,15 +993,16 @@ export function applySession( serverSession.lastUse = now(); command.lsid = serverSession.id; - // first apply non-transaction-specific sessions data - const inTransaction = session.inTransaction() || isTransactionCommand(command); - const isRetryableWrite = options?.willRetryWrite || false; + const inTxnOrTxnCommand = session.inTransaction() || isTransactionCommand(command); + const isRetryableWrite = !!options.willRetryWrite; - if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) { + if (isRetryableWrite || inTxnOrTxnCommand) { + serverSession.txnNumber += session[kTxnNumberIncrement]; + session[kTxnNumberIncrement] = 0; command.txnNumber = Long.fromNumber(serverSession.txnNumber); } - if (!inTransaction) { + if (!inTxnOrTxnCommand) { if (session.transaction.state !== TxnState.NO_TRANSACTION) { session.transaction.transition(TxnState.NO_TRANSACTION); } diff --git a/test/integration/sessions/sessions.spec.prose.test.ts b/test/integration/sessions/sessions.spec.prose.test.ts new file mode 100644 index 0000000000..a2bfbd958f --- /dev/null +++ b/test/integration/sessions/sessions.spec.prose.test.ts @@ -0,0 +1,51 @@ +import { expect } from 'chai'; + +import { Collection } from '../../../src/index'; + +describe('ServerSession', () => { + let client; + let testCollection: Collection<{ _id: number; a?: number }>; + beforeEach(async function () { + const configuration = this.configuration; + client = await configuration.newClient({ maxPoolSize: 1, monitorCommands: true }).connect(); + + // reset test collection + testCollection = client.db('test').collection('too.many.sessions'); + await testCollection.drop().catch(() => null); + }); + + afterEach(async () => { + await client?.close(true); + }); + + /** + * TODO(NODE-4082): Refactor tests to align exactly with spec wording. + * Assert the following across at least 5 retries of the above test: (We do not need to retry in nodejs) + * Drivers MUST assert that exactly one session is used for all operations at least once across the retries of this test. + * Note that it's possible, although rare, for greater than 1 server session to be used because the session is not released until after the connection is checked in. + * Drivers MUST assert that the number of allocated sessions is strictly less than the number of concurrent operations in every retry of this test. In this instance it would less than (but NOT equal to) 8. + */ + it('13. may reuse one server session for many operations', async () => { + const events = []; + client.on('commandStarted', ev => events.push(ev)); + + const operations = [ + testCollection.insertOne({ _id: 1 }), + testCollection.deleteOne({ _id: 2 }), + testCollection.updateOne({ _id: 3 }, { $set: { a: 1 } }), + testCollection.bulkWrite([{ updateOne: { filter: { _id: 4 }, update: { $set: { a: 1 } } } }]), + testCollection.findOneAndDelete({ _id: 5 }), + testCollection.findOneAndUpdate({ _id: 6 }, { $set: { a: 1 } }), + testCollection.findOneAndReplace({ _id: 7 }, { a: 8 }), + testCollection.find().toArray() + ]; + + const allResults = await Promise.all(operations); + + expect(allResults).to.have.lengthOf(operations.length); + expect(events).to.have.lengthOf(operations.length); + + // This is a guarantee in node, unless you are performing a transaction (which is not being done in this test) + expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1); + }); +}); diff --git a/test/integration/sessions/sessions.test.ts b/test/integration/sessions/sessions.test.ts index 9d365812b2..96cc6703fd 100644 --- a/test/integration/sessions/sessions.test.ts +++ b/test/integration/sessions/sessions.test.ts @@ -367,4 +367,37 @@ describe('Sessions Spec', function () { }); }); }); + + describe('Session allocation', () => { + let client; + let testCollection; + + beforeEach(async function () { + client = await this.configuration + .newClient({ maxPoolSize: 1, monitorCommands: true }) + .connect(); + // reset test collection + testCollection = client.db('test').collection('too.many.sessions'); + await testCollection.drop().catch(() => null); + }); + + afterEach(async () => { + await client?.close(); + }); + + it('should only use one session for many operations when maxPoolSize is 1', async () => { + const documents = Array.from({ length: 50 }).map((_, idx) => ({ _id: idx })); + + const events = []; + client.on('commandStarted', ev => events.push(ev)); + const allResults = await Promise.all( + documents.map(async doc => testCollection.insertOne(doc)) + ); + + expect(allResults).to.have.lengthOf(documents.length); + expect(events).to.have.lengthOf(documents.length); + + expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1); + }); + }); }); diff --git a/test/tools/cluster_setup.sh b/test/tools/cluster_setup.sh index 44e8704278..3e665e3867 100755 --- a/test/tools/cluster_setup.sh +++ b/test/tools/cluster_setup.sh @@ -17,7 +17,7 @@ if [[ $1 == "replica_set" ]]; then echo "mongodb://bob:pwd123@localhost:31000,localhost:31001,localhost:31002/?replicaSet=rs" elif [[ $1 == "sharded_cluster" ]]; then mkdir -p $SHARDED_DIR - mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --arbiter --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2 + mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2 echo "mongodb://bob:pwd123@localhost:51000,localhost:51001" elif [[ $1 == "server" ]]; then mkdir -p $SINGLE_DIR diff --git a/test/tools/spec-runner/index.js b/test/tools/spec-runner/index.js index c9878a6a4a..2039125492 100644 --- a/test/tools/spec-runner/index.js +++ b/test/tools/spec-runner/index.js @@ -459,6 +459,8 @@ function validateExpectations(commandEvents, spec, savedSessionData) { const rawExpectedEvents = spec.expectations.map(x => x.command_started_event); const expectedEvents = normalizeCommandShapes(rawExpectedEvents); + expect(actualEvents).to.have.lengthOf(expectedEvents.length); + for (const [idx, expectedEvent] of expectedEvents.entries()) { const actualEvent = actualEvents[idx]; diff --git a/test/unit/sessions.test.js b/test/unit/sessions.test.js index 45ed170799..84de683ab2 100644 --- a/test/unit/sessions.test.js +++ b/test/unit/sessions.test.js @@ -4,13 +4,27 @@ const mock = require('../tools/mongodb-mock/index'); const { expect } = require('chai'); const { genClusterTime, sessionCleanupHandler } = require('../tools/common'); const { Topology } = require('../../src/sdam/topology'); -const { ServerSessionPool, ServerSession, ClientSession } = require('../../src/sessions'); +const { + ServerSessionPool, + ServerSession, + ClientSession, + applySession +} = require('../../src/sessions'); const { now, isHello } = require('../../src/utils'); +const { getSymbolFrom } = require('../tools/utils'); +const { Long } = require('../../src/bson'); +const { MongoRuntimeError } = require('../../src/error'); +const sinon = require('sinon'); -let test = {}; +let test = { + topology: null +}; -describe('Sessions - unit/core', function () { - describe('ClientSession', function () { +describe('Sessions - unit', function () { + const topology = {}; + const serverSessionPool = new ServerSessionPool(topology); + + describe('class ClientSession', function () { let session; let sessionPool; @@ -22,48 +36,11 @@ describe('Sessions - unit/core', function () { } }); - it('should throw errors with invalid parameters', function () { - expect(() => { - new ClientSession(); - }).to.throw(/ClientSession requires a topology/); - - expect(() => { - new ClientSession({}); - }).to.throw(/ClientSession requires a ServerSessionPool/); - - expect(() => { - new ClientSession({}, {}); - }).to.throw(/ClientSession requires a ServerSessionPool/); - }); - - it('should throw an error if snapshot and causalConsistency options are both set to true', function () { - const client = new Topology('localhost:27017', {}); - sessionPool = client.s.sessionPool; - expect( - () => new ClientSession(client, sessionPool, { causalConsistency: true, snapshot: true }) - ).to.throw('Properties "causalConsistency" and "snapshot" are mutually exclusive'); - }); - - it('should default to `null` for `clusterTime`', function () { - const client = new Topology('localhost:27017', {}); - sessionPool = client.s.sessionPool; - session = new ClientSession(client, sessionPool); - expect(session.clusterTime).to.not.exist; - }); - - it('should set the internal clusterTime to `initialClusterTime` if provided', function () { - const clusterTime = genClusterTime(Date.now()); - const client = new Topology('localhost:27017'); - sessionPool = client.s.sessionPool; - session = new ClientSession(client, sessionPool, { initialClusterTime: clusterTime }); - expect(session.clusterTime).to.eql(clusterTime); - }); - describe('startTransaction()', () => { it('should throw an error if the session is snapshot enabled', function () { - const client = new Topology('localhost:27017', {}); - sessionPool = client.s.sessionPool; - session = new ClientSession(client, sessionPool, { snapshot: true }); + const topology = new Topology('localhost:27017', {}); + sessionPool = topology.s.sessionPool; + session = new ClientSession(topology, sessionPool, { snapshot: true }); expect(session.snapshotEnabled).to.equal(true); expect(() => session.startTransaction()).to.throw( 'Transactions are not allowed with snapshot sessions' @@ -73,9 +50,9 @@ describe('Sessions - unit/core', function () { describe('advanceClusterTime()', () => { beforeEach(() => { - const client = new Topology('localhost:27017', {}); - sessionPool = client.s.sessionPool; - session = new ClientSession(client, sessionPool, {}); + const topology = new Topology('localhost:27017', {}); + sessionPool = topology.s.sessionPool; + session = new ClientSession(topology, sessionPool, {}); }); it('should throw an error if the input cluster time is not an object', function () { @@ -181,11 +158,232 @@ describe('Sessions - unit/core', function () { expect(session).property('clusterTime').to.equal(validInitialTime); }); }); + + describe('new ClientSession()', () => { + it('should throw errors with invalid parameters', function () { + expect(() => { + new ClientSession(); + }).to.throw(/ClientSession requires a topology/); + + expect(() => { + new ClientSession({}); + }).to.throw(/ClientSession requires a ServerSessionPool/); + + expect(() => { + new ClientSession({}, {}); + }).to.throw(/ClientSession requires a ServerSessionPool/); + }); + + it('should throw an error if snapshot and causalConsistency options are both set to true', function () { + const topology = new Topology('localhost:27017', {}); + sessionPool = topology.s.sessionPool; + expect( + () => + new ClientSession(topology, sessionPool, { causalConsistency: true, snapshot: true }) + ).to.throw('Properties "causalConsistency" and "snapshot" are mutually exclusive'); + }); + + it('should default to `null` for `clusterTime`', function () { + const topology = new Topology('localhost:27017', {}); + sessionPool = topology.s.sessionPool; + session = new ClientSession(topology, sessionPool); + expect(session.clusterTime).to.not.exist; + }); + + it('should set the internal clusterTime to `initialClusterTime` if provided', function () { + const clusterTime = genClusterTime(Date.now()); + const topology = new Topology('localhost:27017'); + sessionPool = topology.s.sessionPool; + session = new ClientSession(topology, sessionPool, { initialClusterTime: clusterTime }); + expect(session.clusterTime).to.eql(clusterTime); + }); + + it('should acquire a serverSession in the constructor if the session is explicit', () => { + const session = new ClientSession(topology, serverSessionPool, { explicit: true }); + const serverSessionSymbol = getSymbolFrom(session, 'serverSession'); + expect(session).to.have.property(serverSessionSymbol).that.is.an.instanceOf(ServerSession); + }); + + it('should leave serverSession null if the session is implicit', () => { + // implicit via false (this should not be allowed...) + let session = new ClientSession(topology, serverSessionPool, { explicit: false }); + const serverSessionSymbol = getSymbolFrom(session, 'serverSession'); + expect(session).to.have.property(serverSessionSymbol, null); + // implicit via omission + session = new ClientSession(topology, serverSessionPool, {}); + expect(session).to.have.property(serverSessionSymbol, null); + }); + + it('should start the txnNumberIncrement at zero', () => { + const session = new ClientSession(topology, serverSessionPool); + const txnNumberIncrementSymbol = getSymbolFrom(session, 'txnNumberIncrement'); + expect(session).to.have.property(txnNumberIncrementSymbol, 0); + }); + }); + + describe('get serverSession()', () => { + let serverSessionSymbol; + before(() => { + serverSessionSymbol = getSymbolFrom( + new ClientSession({}, serverSessionPool, {}), + 'serverSession' + ); + }); + + describe('from an explicit session', () => { + it('should always have a non-null serverSession after construction', () => { + const session = new ClientSession(topology, serverSessionPool, { explicit: true }); + expect(session).to.have.a.property(serverSessionSymbol).be.an.instanceOf(ServerSession); + expect(session.serverSession).be.an.instanceOf(ServerSession); + }); + + it('should always have non-null serverSession even if it is ended before getter called', () => { + const session = new ClientSession(topology, serverSessionPool, { explicit: true }); + session.hasEnded = true; + expect(session).to.have.a.property(serverSessionSymbol).be.an.instanceOf(ServerSession); + expect(session.serverSession).be.an.instanceOf(ServerSession); + }); + + it('should throw if the serverSession at the symbol property goes missing', () => { + const session = new ClientSession(topology, serverSessionPool, { explicit: true }); + // We really want to make sure a ClientSession is not separated from its serverSession + session[serverSessionSymbol] = null; + expect(session).to.have.a.property(serverSessionSymbol).be.null; + expect(() => session.serverSession).throw(MongoRuntimeError); + }); + }); + + describe('from an implicit session', () => { + it('should throw if the session ended before serverSession was acquired', () => { + const session = new ClientSession(topology, serverSessionPool, { explicit: false }); // make an implicit session + expect(session).to.have.property(serverSessionSymbol, null); + session.hasEnded = true; + expect(() => session.serverSession).to.throw(MongoRuntimeError); + }); + + it('should acquire a serverSession if clientSession.hasEnded is false and serverSession is not set', () => { + const session = new ClientSession(topology, serverSessionPool, { explicit: false }); // make an implicit session + expect(session).to.have.property(serverSessionSymbol, null); + session.hasEnded = false; + const acquireSpy = sinon.spy(serverSessionPool, 'acquire'); + expect(session.serverSession).to.be.instanceOf(ServerSession); + expect(acquireSpy.calledOnce).to.be.true; + acquireSpy.restore(); + }); + + it('should return the existing serverSession and not acquire a new one if one is already set', () => { + const session = new ClientSession(topology, serverSessionPool, { explicit: false }); // make an implicit session + expect(session).to.have.property(serverSessionSymbol, null); + const acquireSpy = sinon.spy(serverSessionPool, 'acquire'); + const firstServerSessionGetResult = session.serverSession; + expect(firstServerSessionGetResult).to.be.instanceOf(ServerSession); + expect(acquireSpy.calledOnce).to.be.true; + + // call the getter a bunch more times + expect(session.serverSession).to.be.instanceOf(ServerSession); + expect(session.serverSession).to.be.instanceOf(ServerSession); + expect(session.serverSession).to.be.instanceOf(ServerSession); + + expect(session.serverSession.id.id.buffer.toString('hex')).to.equal( + firstServerSessionGetResult.id.id.buffer.toString('hex') + ); + + // acquire never called again + expect(acquireSpy.calledOnce).to.be.true; + + acquireSpy.restore(); + }); + + it('should return the existing serverSession and not acquire a new one if one is already set and session is ended', () => { + const session = new ClientSession(topology, serverSessionPool, { explicit: false }); // make an implicit session + expect(session).to.have.property(serverSessionSymbol, null); + const acquireSpy = sinon.spy(serverSessionPool, 'acquire'); + const firstServerSessionGetResult = session.serverSession; + expect(firstServerSessionGetResult).to.be.instanceOf(ServerSession); + expect(acquireSpy.calledOnce).to.be.true; + + session.hasEnded = true; + + // call the getter a bunch more times + expect(session.serverSession).to.be.instanceOf(ServerSession); + expect(session.serverSession).to.be.instanceOf(ServerSession); + expect(session.serverSession).to.be.instanceOf(ServerSession); + + expect(session.serverSession.id.id.buffer.toString('hex')).to.equal( + firstServerSessionGetResult.id.id.buffer.toString('hex') + ); + + // acquire never called again + expect(acquireSpy.calledOnce).to.be.true; + + acquireSpy.restore(); + }); + }); + }); + + describe('incrementTransactionNumber()', () => { + it('should not allocate serverSession', () => { + const session = new ClientSession(topology, serverSessionPool); + const txnNumberIncrementSymbol = getSymbolFrom(session, 'txnNumberIncrement'); + + session.incrementTransactionNumber(); + expect(session).to.have.property(txnNumberIncrementSymbol, 1); + + const serverSessionSymbol = getSymbolFrom(session, 'serverSession'); + expect(session).to.have.property(serverSessionSymbol, null); + }); + + it('should save increments to txnNumberIncrement symbol', () => { + const session = new ClientSession(topology, serverSessionPool); + const txnNumberIncrementSymbol = getSymbolFrom(session, 'txnNumberIncrement'); + + session.incrementTransactionNumber(); + session.incrementTransactionNumber(); + session.incrementTransactionNumber(); + + expect(session).to.have.property(txnNumberIncrementSymbol, 3); + }); + }); + + describe('applySession()', () => { + it('should allocate serverSession', () => { + const session = new ClientSession(topology, serverSessionPool); + const serverSessionSymbol = getSymbolFrom(session, 'serverSession'); + + const command = { magic: 1 }; + const result = applySession(session, command, {}); + + expect(result).to.not.exist; + expect(command).to.have.property('lsid'); + expect(session).to.have.property(serverSessionSymbol).that.is.instanceOf(ServerSession); + }); + + it('should apply saved txnNumberIncrements', () => { + const session = new ClientSession(topology, serverSessionPool); + const serverSessionSymbol = getSymbolFrom(session, 'serverSession'); + + session.incrementTransactionNumber(); + session.incrementTransactionNumber(); + session.incrementTransactionNumber(); + + const command = { magic: 1 }; + const result = applySession(session, command, { + // txnNumber will be applied for retryable write command + willRetryWrite: true + }); + + expect(result).to.not.exist; + expect(command).to.have.property('lsid'); + expect(command).to.have.property('txnNumber').instanceOf(Long); + expect(command.txnNumber.toNumber()).to.equal(3); + expect(session).to.have.property(serverSessionSymbol).that.is.instanceOf(ServerSession); + }); + }); }); - describe('ServerSessionPool', function () { + describe('class ServerSessionPool', function () { afterEach(() => { - test.client.close(); + test.topology.close(); return mock.cleanup(); }); @@ -202,12 +400,12 @@ describe('Sessions - unit/core', function () { }); }) .then(() => { - test.client = new Topology(test.server.hostAddress()); + test.topology = new Topology(test.server.hostAddress()); return new Promise((resolve, reject) => { - test.client.once('error', reject); - test.client.once('connect', resolve); - test.client.connect(); + test.topology.once('error', reject); + test.topology.once('connect', resolve); + test.topology.connect(); }); }); }); @@ -219,7 +417,7 @@ describe('Sessions - unit/core', function () { }); it('should create a new session if the pool is empty', function (done) { - const pool = new ServerSessionPool(test.client); + const pool = new ServerSessionPool(test.topology); done = sessionCleanupHandler(null, pool, done); expect(pool.sessions).to.have.length(0); @@ -233,7 +431,7 @@ describe('Sessions - unit/core', function () { it('should reuse sessions which have not timed out yet on acquire', function (done) { const oldSession = new ServerSession(); - const pool = new ServerSessionPool(test.client); + const pool = new ServerSessionPool(test.topology); done = sessionCleanupHandler(null, pool, done); pool.sessions.push(oldSession); @@ -249,7 +447,7 @@ describe('Sessions - unit/core', function () { const oldSession = new ServerSession(); oldSession.lastUse = now() - 30 * 60 * 1000; // add 30min - const pool = new ServerSessionPool(test.client); + const pool = new ServerSessionPool(test.topology); done = sessionCleanupHandler(null, pool, done); pool.sessions.push(oldSession); @@ -268,7 +466,7 @@ describe('Sessions - unit/core', function () { return session; }); - const pool = new ServerSessionPool(test.client); + const pool = new ServerSessionPool(test.topology); done = sessionCleanupHandler(null, pool, done); pool.sessions = pool.sessions.concat(oldSessions); @@ -282,7 +480,7 @@ describe('Sessions - unit/core', function () { const session = new ServerSession(); session.lastUse = now() - 9.5 * 60 * 1000; // add 9.5min - const pool = new ServerSessionPool(test.client); + const pool = new ServerSessionPool(test.topology); done = sessionCleanupHandler(null, pool, done); pool.release(session); @@ -291,7 +489,7 @@ describe('Sessions - unit/core', function () { }); it('should maintain a LIFO queue of sessions', function (done) { - const pool = new ServerSessionPool(test.client); + const pool = new ServerSessionPool(test.topology); done = sessionCleanupHandler(null, pool, done); const sessionA = new ServerSession();