Skip to content

Commit

Permalink
feat(NODE-3697): reduce serverSession allocation (#3171)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Mar 25, 2022
1 parent de9fd7f commit 5132bc9
Show file tree
Hide file tree
Showing 8 changed files with 421 additions and 110 deletions.
2 changes: 1 addition & 1 deletion src/cmap/connection.ts
Expand Up @@ -114,7 +114,7 @@ export interface CommandOptions extends BSONSerializeOptions {
// Applying a session to a command should happen as part of command construction,
// most likely in the CommandOperation#executeCommand method, where we have access to
// the details we need to determine if a txnNum should also be applied.
willRetryWrite?: true;
willRetryWrite?: boolean;

writeConcern?: WriteConcern;
}
Expand Down
5 changes: 2 additions & 3 deletions src/operations/operation.ts
Expand Up @@ -25,7 +25,7 @@ export interface OperationConstructor extends Function {
export interface OperationOptions extends BSONSerializeOptions {
/** Specify ClientSession for this command */
session?: ClientSession;
willRetryWrites?: boolean;
willRetryWrite?: boolean;

/** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */
readPreference?: ReadPreferenceLike;
Expand Down Expand Up @@ -56,8 +56,7 @@ export abstract class AbstractOperation<TResult = any> {
// BSON serialization options
bsonOptions?: BSONSerializeOptions;

// TODO: Each operation defines its own options, there should be better typing here
options: Document;
options: OperationOptions;

[kSession]: ClientSession | undefined;

Expand Down
120 changes: 74 additions & 46 deletions src/sessions.ts
Expand Up @@ -42,20 +42,6 @@ import {

const minWireVersionForShardedTransactions = 8;

function assertAlive(session: ClientSession, callback?: Callback): boolean {
if (session.serverSession == null) {
const error = new MongoExpiredSessionError();
if (typeof callback === 'function') {
callback(error);
return false;
}

throw error;
}

return true;
}

/** @public */
export interface ClientSessionOptions {
/** Whether causal consistency should be enabled on this session */
Expand Down Expand Up @@ -89,6 +75,8 @@ const kSnapshotTime = Symbol('snapshotTime');
const kSnapshotEnabled = Symbol('snapshotEnabled');
/** @internal */
const kPinnedConnection = Symbol('pinnedConnection');
/** @internal Accumulates total number of increments to add to txnNumber when applying session to command */
const kTxnNumberIncrement = Symbol('txnNumberIncrement');

/** @public */
export interface EndSessionOptions {
Expand Down Expand Up @@ -123,13 +111,15 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
defaultTransactionOptions: TransactionOptions;
transaction: Transaction;
/** @internal */
[kServerSession]?: ServerSession;
[kServerSession]: ServerSession | null;
/** @internal */
[kSnapshotTime]?: Timestamp;
/** @internal */
[kSnapshotEnabled] = false;
/** @internal */
[kPinnedConnection]?: Connection;
/** @internal */
[kTxnNumberIncrement]: number;

/**
* Create a client session.
Expand Down Expand Up @@ -172,7 +162,10 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
this.sessionPool = sessionPool;
this.hasEnded = false;
this.clientOptions = clientOptions;
this[kServerSession] = undefined;

this.explicit = !!options.explicit;
this[kServerSession] = this.explicit ? this.sessionPool.acquire() : null;
this[kTxnNumberIncrement] = 0;

this.supports = {
causalConsistency: options.snapshot !== true && options.causalConsistency !== false
Expand All @@ -181,24 +174,29 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
this.clusterTime = options.initialClusterTime;

this.operationTime = undefined;
this.explicit = !!options.explicit;
this.owner = options.owner;
this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
this.transaction = new Transaction();
}

/** The server id associated with this session */
get id(): ServerSessionId | undefined {
return this.serverSession?.id;
return this[kServerSession]?.id;
}

get serverSession(): ServerSession {
if (this[kServerSession] == null) {
this[kServerSession] = this.sessionPool.acquire();
let serverSession = this[kServerSession];
if (serverSession == null) {
if (this.explicit) {
throw new MongoRuntimeError('Unexpected null serverSession for an explicit session');
}
if (this.hasEnded) {
throw new MongoRuntimeError('Unexpected null serverSession for an ended implicit session');
}
serverSession = this.sessionPool.acquire();
this[kServerSession] = serverSession;
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this[kServerSession]!;
return serverSession;
}

/** Whether or not this session is configured for snapshot reads */
Expand Down Expand Up @@ -267,9 +265,15 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
const completeEndSession = () => {
maybeClearPinnedConnection(this, finalOptions);

// release the server session back to the pool
this.sessionPool.release(this.serverSession);
this[kServerSession] = undefined;
const serverSession = this[kServerSession];
if (serverSession != null) {
// release the server session back to the pool
this.sessionPool.release(serverSession);
// Make sure a new serverSession never makes it on to the ClientSession
Object.defineProperty(this, kServerSession, {
value: ServerSession.clone(serverSession)
});
}

// mark the session as ended, and emit a signal
this.hasEnded = true;
Expand All @@ -279,7 +283,9 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
done();
};

if (this.serverSession && this.inTransaction()) {
if (this.inTransaction()) {
// If we've reached endSession and the transaction is still active
// by default we abort it
this.abortTransaction(err => {
if (err) return done(err);
completeEndSession();
Expand Down Expand Up @@ -353,12 +359,16 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
return this.id.id.buffer.equals(session.id.id.buffer);
}

/** Increment the transaction number on the internal ServerSession */
/**
* Increment the transaction number on the internal ServerSession
*
* @privateRemarks
* This helper increments a value stored on the client session that will be
* added to the serverSession's txnNumber upon applying it to a command.
* This is because the serverSession is lazily acquired after a connection is obtained
*/
incrementTransactionNumber(): void {
if (this.serverSession) {
this.serverSession.txnNumber =
typeof this.serverSession.txnNumber === 'number' ? this.serverSession.txnNumber + 1 : 0;
}
this[kTxnNumberIncrement] += 1;
}

/** @returns whether this session is currently in a transaction or not */
Expand All @@ -376,7 +386,6 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
throw new MongoCompatibilityError('Transactions are not allowed with snapshot sessions');
}

assertAlive(this);
if (this.inTransaction()) {
throw new MongoTransactionError('Transaction already in progress');
}
Expand Down Expand Up @@ -627,7 +636,7 @@ function attemptTransaction<TSchema>(
throw err;
}

if (session.transaction.isActive) {
if (session.inTransaction()) {
return session.abortTransaction().then(() => maybeRetryOrThrow(err));
}

Expand All @@ -641,11 +650,6 @@ function endTransaction(
commandName: 'abortTransaction' | 'commitTransaction',
callback: Callback<Document>
) {
if (!assertAlive(session, callback)) {
// checking result in case callback was called
return;
}

// handle any initial problematic cases
const txnState = session.transaction.state;

Expand Down Expand Up @@ -750,7 +754,6 @@ function endTransaction(
callback(error, result);
}

// Assumption here that commandName is "commitTransaction" or "abortTransaction"
if (session.transaction.recoveryToken) {
command.recoveryToken = session.transaction.recoveryToken;
}
Expand Down Expand Up @@ -832,6 +835,30 @@ export class ServerSession {

return idleTimeMinutes > sessionTimeoutMinutes - 1;
}

/**
* @internal
* Cloning meant to keep a readable reference to the server session data
* after ClientSession has ended
*/
static clone(serverSession: ServerSession): Readonly<ServerSession> {
const arrayBuffer = new ArrayBuffer(16);
const idBytes = Buffer.from(arrayBuffer);
idBytes.set(serverSession.id.id.buffer);

const id = new Binary(idBytes, serverSession.id.id.sub_type);

// Manual prototype construction to avoid modifying the constructor of this class
return Object.setPrototypeOf(
{
id: { id },
lastUse: serverSession.lastUse,
txnNumber: serverSession.txnNumber,
isDirty: serverSession.isDirty
},
ServerSession.prototype
);
}
}

/**
Expand Down Expand Up @@ -944,11 +971,11 @@ export function applySession(
command: Document,
options: CommandOptions
): MongoDriverError | undefined {
// TODO: merge this with `assertAlive`, did not want to throw a try/catch here
if (session.hasEnded) {
return new MongoExpiredSessionError();
}

// May acquire serverSession here
const serverSession = session.serverSession;
if (serverSession == null) {
return new MongoRuntimeError('Unable to acquire server session');
Expand All @@ -966,15 +993,16 @@ export function applySession(
serverSession.lastUse = now();
command.lsid = serverSession.id;

// first apply non-transaction-specific sessions data
const inTransaction = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = options?.willRetryWrite || false;
const inTxnOrTxnCommand = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = !!options.willRetryWrite;

if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
if (isRetryableWrite || inTxnOrTxnCommand) {
serverSession.txnNumber += session[kTxnNumberIncrement];
session[kTxnNumberIncrement] = 0;
command.txnNumber = Long.fromNumber(serverSession.txnNumber);
}

if (!inTransaction) {
if (!inTxnOrTxnCommand) {
if (session.transaction.state !== TxnState.NO_TRANSACTION) {
session.transaction.transition(TxnState.NO_TRANSACTION);
}
Expand Down
51 changes: 51 additions & 0 deletions test/integration/sessions/sessions.spec.prose.test.ts
@@ -0,0 +1,51 @@
import { expect } from 'chai';

import { Collection } from '../../../src/index';

describe('ServerSession', () => {
let client;
let testCollection: Collection<{ _id: number; a?: number }>;
beforeEach(async function () {
const configuration = this.configuration;
client = await configuration.newClient({ maxPoolSize: 1, monitorCommands: true }).connect();

// reset test collection
testCollection = client.db('test').collection('too.many.sessions');
await testCollection.drop().catch(() => null);
});

afterEach(async () => {
await client?.close(true);
});

/**
* TODO(NODE-4082): Refactor tests to align exactly with spec wording.
* Assert the following across at least 5 retries of the above test: (We do not need to retry in nodejs)
* Drivers MUST assert that exactly one session is used for all operations at least once across the retries of this test.
* Note that it's possible, although rare, for greater than 1 server session to be used because the session is not released until after the connection is checked in.
* Drivers MUST assert that the number of allocated sessions is strictly less than the number of concurrent operations in every retry of this test. In this instance it would less than (but NOT equal to) 8.
*/
it('13. may reuse one server session for many operations', async () => {
const events = [];
client.on('commandStarted', ev => events.push(ev));

const operations = [
testCollection.insertOne({ _id: 1 }),
testCollection.deleteOne({ _id: 2 }),
testCollection.updateOne({ _id: 3 }, { $set: { a: 1 } }),
testCollection.bulkWrite([{ updateOne: { filter: { _id: 4 }, update: { $set: { a: 1 } } } }]),
testCollection.findOneAndDelete({ _id: 5 }),
testCollection.findOneAndUpdate({ _id: 6 }, { $set: { a: 1 } }),
testCollection.findOneAndReplace({ _id: 7 }, { a: 8 }),
testCollection.find().toArray()
];

const allResults = await Promise.all(operations);

expect(allResults).to.have.lengthOf(operations.length);
expect(events).to.have.lengthOf(operations.length);

// This is a guarantee in node, unless you are performing a transaction (which is not being done in this test)
expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1);
});
});
33 changes: 33 additions & 0 deletions test/integration/sessions/sessions.test.ts
Expand Up @@ -367,4 +367,37 @@ describe('Sessions Spec', function () {
});
});
});

describe('Session allocation', () => {
let client;
let testCollection;

beforeEach(async function () {
client = await this.configuration
.newClient({ maxPoolSize: 1, monitorCommands: true })
.connect();
// reset test collection
testCollection = client.db('test').collection('too.many.sessions');
await testCollection.drop().catch(() => null);
});

afterEach(async () => {
await client?.close();
});

it('should only use one session for many operations when maxPoolSize is 1', async () => {
const documents = Array.from({ length: 50 }).map((_, idx) => ({ _id: idx }));

const events = [];
client.on('commandStarted', ev => events.push(ev));
const allResults = await Promise.all(
documents.map(async doc => testCollection.insertOne(doc))
);

expect(allResults).to.have.lengthOf(documents.length);
expect(events).to.have.lengthOf(documents.length);

expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1);
});
});
});
2 changes: 1 addition & 1 deletion test/tools/cluster_setup.sh
Expand Up @@ -17,7 +17,7 @@ if [[ $1 == "replica_set" ]]; then
echo "mongodb://bob:pwd123@localhost:31000,localhost:31001,localhost:31002/?replicaSet=rs"
elif [[ $1 == "sharded_cluster" ]]; then
mkdir -p $SHARDED_DIR
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --arbiter --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
echo "mongodb://bob:pwd123@localhost:51000,localhost:51001"
elif [[ $1 == "server" ]]; then
mkdir -p $SINGLE_DIR
Expand Down
2 changes: 2 additions & 0 deletions test/tools/spec-runner/index.js
Expand Up @@ -459,6 +459,8 @@ function validateExpectations(commandEvents, spec, savedSessionData) {
const rawExpectedEvents = spec.expectations.map(x => x.command_started_event);
const expectedEvents = normalizeCommandShapes(rawExpectedEvents);

expect(actualEvents).to.have.lengthOf(expectedEvents.length);

for (const [idx, expectedEvent] of expectedEvents.entries()) {
const actualEvent = actualEvents[idx];

Expand Down

0 comments on commit 5132bc9

Please sign in to comment.