Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-3697): reduce serverSession allocation #3171

Merged
merged 14 commits into from Mar 25, 2022
2 changes: 1 addition & 1 deletion src/cmap/connection.ts
Expand Up @@ -114,7 +114,7 @@ export interface CommandOptions extends BSONSerializeOptions {
// Applying a session to a command should happen as part of command construction,
// most likely in the CommandOperation#executeCommand method, where we have access to
// the details we need to determine if a txnNum should also be applied.
willRetryWrite?: true;
willRetryWrite?: boolean;

writeConcern?: WriteConcern;
}
Expand Down
5 changes: 2 additions & 3 deletions src/operations/operation.ts
Expand Up @@ -25,7 +25,7 @@ export interface OperationConstructor extends Function {
export interface OperationOptions extends BSONSerializeOptions {
/** Specify ClientSession for this command */
session?: ClientSession;
willRetryWrites?: boolean;
willRetryWrite?: boolean;

/** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */
readPreference?: ReadPreferenceLike;
Expand Down Expand Up @@ -56,8 +56,7 @@ export abstract class AbstractOperation<TResult = any> {
// BSON serialization options
bsonOptions?: BSONSerializeOptions;

// TODO: Each operation defines its own options, there should be better typing here
options: Document;
options: OperationOptions;

[kSession]: ClientSession | undefined;

Expand Down
106 changes: 63 additions & 43 deletions src/sessions.ts
Expand Up @@ -42,20 +42,6 @@ import {

const minWireVersionForShardedTransactions = 8;

function assertAlive(session: ClientSession, callback?: Callback): boolean {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
if (session.serverSession == null) {
const error = new MongoExpiredSessionError();
if (typeof callback === 'function') {
callback(error);
return false;
}

throw error;
}

return true;
}

/** @public */
export interface ClientSessionOptions {
/** Whether causal consistency should be enabled on this session */
Expand Down Expand Up @@ -89,6 +75,8 @@ const kSnapshotTime = Symbol('snapshotTime');
const kSnapshotEnabled = Symbol('snapshotEnabled');
/** @internal */
const kPinnedConnection = Symbol('pinnedConnection');
/** @internal Accumulates total number of increments to perform to txnNumber */
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const kTxnNumberIncrement = Symbol('txnNumberIncrement');

/** @public */
export interface EndSessionOptions {
Expand Down Expand Up @@ -130,6 +118,8 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
[kSnapshotEnabled] = false;
/** @internal */
[kPinnedConnection]?: Connection;
/** @internal Accumulates total number of increments to perform to txnNumber */
dariakp marked this conversation as resolved.
Show resolved Hide resolved
[kTxnNumberIncrement]: number;

/**
* Create a client session.
Expand Down Expand Up @@ -172,7 +162,10 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
this.sessionPool = sessionPool;
this.hasEnded = false;
this.clientOptions = clientOptions;
this[kServerSession] = undefined;

this.explicit = Boolean(options.explicit);
dariakp marked this conversation as resolved.
Show resolved Hide resolved
this[kServerSession] = this.explicit ? this.sessionPool.acquire() : undefined;
this[kTxnNumberIncrement] = 0;

this.supports = {
causalConsistency: options.snapshot !== true && options.causalConsistency !== false
Expand All @@ -181,24 +174,27 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
this.clusterTime = options.initialClusterTime;

this.operationTime = undefined;
this.explicit = !!options.explicit;
this.owner = options.owner;
this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
this.transaction = new Transaction();
}

/** The server id associated with this session */
get id(): ServerSessionId | undefined {
return this.serverSession?.id;
const serverSession = this[kServerSession];
if (serverSession == null) {
return undefined;
}
return serverSession.id;
dariakp marked this conversation as resolved.
Show resolved Hide resolved
}

get serverSession(): ServerSession {
if (this[kServerSession] == null) {
this[kServerSession] = this.sessionPool.acquire();
let serverSession = this[kServerSession];
if (serverSession == null) {
serverSession = this.sessionPool.acquire();
this[kServerSession] = serverSession;
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this[kServerSession]!;
return serverSession;
}

/** Whether or not this session is configured for snapshot reads */
Expand Down Expand Up @@ -267,9 +263,15 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
const completeEndSession = () => {
maybeClearPinnedConnection(this, finalOptions);

// release the server session back to the pool
this.sessionPool.release(this.serverSession);
this[kServerSession] = undefined;
const serverSession = this[kServerSession];
if (serverSession != null) {
// release the server session back to the pool
this.sessionPool.release(serverSession);
// Make sure a new serverSession never makes it on to the ClientSession
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
Object.defineProperty(this, kServerSession, {
value: ServerSession.clone(serverSession)
});
}

// mark the session as ended, and emit a signal
this.hasEnded = true;
Expand All @@ -279,7 +281,9 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
done();
};

if (this.serverSession && this.inTransaction()) {
if (this.inTransaction()) {
// If we've reached endSession and the transaction is still active
// by default we abort it
this.abortTransaction(err => {
if (err) return done(err);
completeEndSession();
Expand Down Expand Up @@ -355,10 +359,7 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {

/** Increment the transaction number on the internal ServerSession */
incrementTransactionNumber(): void {
if (this.serverSession) {
this.serverSession.txnNumber =
typeof this.serverSession.txnNumber === 'number' ? this.serverSession.txnNumber + 1 : 0;
}
this[kTxnNumberIncrement] += 1;
dariakp marked this conversation as resolved.
Show resolved Hide resolved
}

/** @returns whether this session is currently in a transaction or not */
Expand All @@ -376,7 +377,6 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
throw new MongoCompatibilityError('Transactions are not allowed with snapshot sessions');
}

assertAlive(this);
if (this.inTransaction()) {
throw new MongoTransactionError('Transaction already in progress');
}
Expand Down Expand Up @@ -627,7 +627,7 @@ function attemptTransaction<TSchema>(
throw err;
}

if (session.transaction.isActive) {
if (session.inTransaction()) {
return session.abortTransaction().then(() => maybeRetryOrThrow(err));
}

Expand All @@ -641,11 +641,6 @@ function endTransaction(
commandName: 'abortTransaction' | 'commitTransaction',
callback: Callback<Document>
) {
if (!assertAlive(session, callback)) {
// checking result in case callback was called
return;
}

// handle any initial problematic cases
const txnState = session.transaction.state;

Expand Down Expand Up @@ -750,7 +745,6 @@ function endTransaction(
callback(error, result);
}

// Assumption here that commandName is "commitTransaction" or "abortTransaction"
if (session.transaction.recoveryToken) {
command.recoveryToken = session.transaction.recoveryToken;
}
Expand Down Expand Up @@ -832,6 +826,30 @@ export class ServerSession {

return idleTimeMinutes > sessionTimeoutMinutes - 1;
}

/**
* @internal
* Cloning meant to keep a readable reference to the server session data
* after ClientSession has ended
*/
static clone(serverSession: ServerSession): Readonly<ServerSession> {
const arrayBuffer = new ArrayBuffer(16);
const idBytes = Buffer.from(arrayBuffer);
idBytes.set(serverSession.id.id.buffer);

const id = new Binary(idBytes, serverSession.id.id.sub_type);

// Manual prototype construction to avoid modifying the constructor of this class
return Object.setPrototypeOf(
{
id: { id },
lastUse: serverSession.lastUse,
txnNumber: serverSession.txnNumber,
isDirty: serverSession.isDirty
},
ServerSession.prototype
);
}
}

/**
Expand Down Expand Up @@ -944,11 +962,11 @@ export function applySession(
command: Document,
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
options: CommandOptions
): MongoDriverError | undefined {
// TODO: merge this with `assertAlive`, did not want to throw a try/catch here
if (session.hasEnded) {
return new MongoExpiredSessionError();
}

// May acquire serverSession here
const serverSession = session.serverSession;
if (serverSession == null) {
return new MongoRuntimeError('Unable to acquire server session');
Expand All @@ -967,14 +985,16 @@ export function applySession(
command.lsid = serverSession.id;

// first apply non-transaction-specific sessions data
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
const inTransaction = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = options?.willRetryWrite || false;
const inTxnOrTxnCommand = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = Boolean(options.willRetryWrite);
dariakp marked this conversation as resolved.
Show resolved Hide resolved

if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
if (isRetryableWrite || inTxnOrTxnCommand) {
serverSession.txnNumber += session[kTxnNumberIncrement];
session[kTxnNumberIncrement] = 0;
command.txnNumber = Long.fromNumber(serverSession.txnNumber);
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
}

if (!inTransaction) {
if (!inTxnOrTxnCommand) {
if (session.transaction.state !== TxnState.NO_TRANSACTION) {
session.transaction.transition(TxnState.NO_TRANSACTION);
}
Expand Down
50 changes: 50 additions & 0 deletions test/integration/sessions/sessions.spec.prose.test.ts
@@ -0,0 +1,50 @@
import { expect } from 'chai';

import { Collection } from '../../../src/index';

describe('ServerSession', () => {
let client;
let testCollection: Collection<{ _id: number; a?: number }>;
beforeEach(async function () {
const configuration = this.configuration;
client = await configuration.newClient({ maxPoolSize: 1, monitorCommands: true }).connect();

// reset test collection
testCollection = client.db('test').collection('too.many.sessions');
await testCollection.drop().catch(() => null);
});

afterEach(async () => {
await client?.close(true);
});

/**
* TODO(DRIVERS-2218): Refactor tests to align exactly with spec wording. Preliminarily implements:
dariakp marked this conversation as resolved.
Show resolved Hide resolved
* Drivers MAY assert that exactly one session is used for all the concurrent operations listed in the test, however this is a race condition if the session isn't released before checkIn (which SHOULD NOT be attempted)
* Drivers SHOULD assert that after repeated runs they are able to achieve the use of exactly one session, this will statistically prove we've reduced the allocation amount
* Drivers MUST assert that the number of allocated sessions never exceeds the number of concurrent operations executing
*/

it('13. may reuse one server session for many operations', async () => {
const events = [];
client.on('commandStarted', ev => events.push(ev));

const operations = [
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
testCollection.insertOne({ _id: 1 }),
testCollection.deleteOne({ _id: 2 }),
testCollection.updateOne({ _id: 3 }, { $set: { a: 1 } }),
testCollection.bulkWrite([{ updateOne: { filter: { _id: 4 }, update: { $set: { a: 1 } } } }]),
testCollection.findOneAndDelete({ _id: 5 }),
testCollection.findOneAndUpdate({ _id: 6 }, { $set: { a: 1 } }),
testCollection.findOneAndReplace({ _id: 7 }, { a: 8 }),
testCollection.find().toArray()
];

const allResults = await Promise.all(operations);

expect(allResults).to.have.lengthOf(operations.length);
expect(events).to.have.lengthOf(operations.length);

expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1); // This is a guarantee in node
});
});
33 changes: 33 additions & 0 deletions test/integration/sessions/sessions.test.ts
Expand Up @@ -367,4 +367,37 @@ describe('Sessions Spec', function () {
});
});
});

describe('Session allocation', () => {
let client;
let testCollection;

beforeEach(async function () {
client = await this.configuration
.newClient({ maxPoolSize: 1, monitorCommands: true })
.connect();
// reset test collection
testCollection = client.db('test').collection('too.many.sessions');
await testCollection.drop().catch(() => null);
});

afterEach(async () => {
await client?.close();
});

it('should only use one session for many operations when maxPoolSize is 1', async () => {
const documents = new Array(50).fill(null).map((_, idx) => ({ _id: idx }));
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

const events = [];
client.on('commandStarted', ev => events.push(ev));
const allResults = await Promise.all(
documents.map(async doc => testCollection.insertOne(doc))
);

expect(allResults).to.have.lengthOf(documents.length);
expect(events).to.have.lengthOf(documents.length);

expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1);
});
});
});
2 changes: 1 addition & 1 deletion test/tools/cluster_setup.sh
Expand Up @@ -17,7 +17,7 @@ if [[ $1 == "replica_set" ]]; then
echo "mongodb://bob:pwd123@localhost:31000,localhost:31001,localhost:31002/?replicaSet=rs"
elif [[ $1 == "sharded_cluster" ]]; then
mkdir -p $SHARDED_DIR
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --arbiter --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
echo "mongodb://bob:pwd123@localhost:51000,localhost:51001"
elif [[ $1 == "server" ]]; then
mkdir -p $SINGLE_DIR
Expand Down
2 changes: 2 additions & 0 deletions test/tools/spec-runner/index.js
Expand Up @@ -459,6 +459,8 @@ function validateExpectations(commandEvents, spec, savedSessionData) {
const rawExpectedEvents = spec.expectations.map(x => x.command_started_event);
const expectedEvents = normalizeCommandShapes(rawExpectedEvents);

expect(actualEvents).to.have.lengthOf(expectedEvents.length);

for (const [idx, expectedEvent] of expectedEvents.entries()) {
const actualEvent = actualEvents[idx];

Expand Down