Skip to content

Commit

Permalink
fix(NODE-4262): make startSession work without a connection (#3286)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jun 15, 2022
1 parent cfe18f9 commit 89ad7c3
Show file tree
Hide file tree
Showing 31 changed files with 630 additions and 1,127 deletions.
40 changes: 16 additions & 24 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -14,6 +14,7 @@ import type { MongoClient } from '../mongo_client';
import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { GetMoreOperation } from '../operations/get_more';
import { KillCursorsOperation } from '../operations/kill_cursors';
import { ReadConcern, ReadConcernLike } from '../read_concern';
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
Expand Down Expand Up @@ -118,7 +119,7 @@ export abstract class AbstractCursor<
/** @internal */
[kId]?: Long;
/** @internal */
[kSession]?: ClientSession;
[kSession]: ClientSession;
/** @internal */
[kServer]?: Server;
/** @internal */
Expand Down Expand Up @@ -187,6 +188,8 @@ export abstract class AbstractCursor<

if (options.session instanceof ClientSession) {
this[kSession] = options.session;
} else {
this[kSession] = this[kClient].startSession({ owner: this, explicit: false });
}
}

Expand Down Expand Up @@ -217,11 +220,11 @@ export abstract class AbstractCursor<
}

/** @internal */
get session(): ClientSession | undefined {
get session(): ClientSession {
return this[kSession];
}

set session(clientSession: ClientSession | undefined) {
set session(clientSession: ClientSession) {
this[kSession] = clientSession;
}

Expand Down Expand Up @@ -592,11 +595,12 @@ export abstract class AbstractCursor<
const session = this[kSession];
if (session) {
// We only want to end this session if we created it, and it hasn't ended yet
if (session.explicit === false && !session.hasEnded) {
session.endSession();
if (session.explicit === false) {
if (!session.hasEnded) {
session.endSession(() => null);
}
this[kSession] = this.client.startSession({ owner: this, explicit: false });
}

this[kSession] = undefined;
}
}

Expand Down Expand Up @@ -644,22 +648,10 @@ export abstract class AbstractCursor<
* a significant refactor.
*/
[kInit](callback: Callback<TSchema | null>): void {
if (this[kSession] == null) {
if (this[kClient].topology?.shouldCheckForSessionSupport()) {
return this[kClient].topology?.selectServer(ReadPreference.primaryPreferred, {}, err => {
if (err) return callback(err);
return this[kInit](callback);
});
} else if (this[kClient].topology?.hasSessionSupport()) {
this[kSession] = this[kClient].topology?.startSession({ owner: this, explicit: false });
}
}

this._initialize(this[kSession], (err, state) => {
if (state) {
const response = state.response;
this[kServer] = state.server;
this[kSession] = state.session;

if (response.cursor) {
this[kId] =
Expand Down Expand Up @@ -843,11 +835,11 @@ function cleanupCursor(
}

cursor[kKilled] = true;
server.killCursors(
cursorNs,
[cursorId],
{ ...pluckBSONSerializeOptions(cursor[kOptions]), session },
() => completeCleanup()

return executeOperation(
cursor[kClient],
new KillCursorsOperation(cursorId, cursorNs, server, { session }),
completeCleanup
);
}

Expand Down
88 changes: 64 additions & 24 deletions src/mongo_client.ts
Expand Up @@ -13,17 +13,17 @@ import type { MONGO_CLIENT_EVENTS } from './constants';
import { Db, DbOptions } from './db';
import type { AutoEncrypter, AutoEncryptionOptions } from './deps';
import type { Encrypter } from './encrypter';
import { MongoInvalidArgumentError, MongoNotConnectedError } from './error';
import { MongoInvalidArgumentError } from './error';
import type { Logger, LoggerLevel } from './logger';
import { TypedEventEmitter } from './mongo_types';
import { connect } from './operations/connect';
import { PromiseProvider } from './promise_provider';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
import type { ReadPreference, ReadPreferenceMode } from './read_preference';
import { ReadPreference, ReadPreferenceMode } from './read_preference';
import type { TagSet } from './sdam/server_description';
import type { SrvPoller } from './sdam/srv_polling';
import type { Topology, TopologyEvents } from './sdam/topology';
import type { ClientSession, ClientSessionOptions } from './sessions';
import { ClientSession, ClientSessionOptions, ServerSessionPool } from './sessions';
import {
Callback,
ClientMetadata,
Expand Down Expand Up @@ -267,10 +267,16 @@ export type WithSessionCallback = (session: ClientSession) => Promise<any>;
/** @internal */
export interface MongoClientPrivate {
url: string;
sessions: Set<ClientSession>;
bsonOptions: BSONSerializeOptions;
namespace: MongoDBNamespace;
hasBeenClosed: boolean;
/**
* We keep a reference to the sessions that are acquired from the pool.
* - used to track and close all sessions in client.close() (which is non-standard behavior)
* - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions
*/
readonly activeSessions: Set<ClientSession>;
readonly sessionPool: ServerSessionPool;
readonly options: MongoOptions;
readonly readConcern?: ReadConcern;
readonly writeConcern?: WriteConcern;
Expand Down Expand Up @@ -352,10 +358,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
// The internal state
this.s = {
url,
sessions: new Set(),
bsonOptions: resolveBSONOptions(this[kOptions]),
namespace: ns('admin'),
hasBeenClosed: false,
sessionPool: new ServerSessionPool(this),
activeSessions: new Set(),

get options() {
return client[kOptions];
Expand Down Expand Up @@ -470,23 +477,51 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {

return maybePromise(callback, callback => {
if (this.topology == null) {
// Do not connect just to end sessions
return callback();
}

// clear out references to old topology
const topology = this.topology;
this.topology = undefined;

topology.close({ force }, error => {
if (error) return callback(error);
const { encrypter } = this[kOptions];
if (encrypter) {
return encrypter.close(this, force, error => {
callback(error);
const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession());
this.s.activeSessions.clear();

Promise.all(activeSessionEnds)
.then(() => {
const endSessions = Array.from(this.s.sessionPool.sessions, ({ id }) => id);
if (endSessions.length === 0) return;
return this.db('admin')
.command(
{ endSessions },
{ readPreference: ReadPreference.primaryPreferred, noResponse: true }
)
.then(() => null) // outcome does not matter
.catch(() => null); // outcome does not matter
})
.then(() => {
if (this.topology == null) {
return callback();
}
// clear out references to old topology
const topology = this.topology;
this.topology = undefined;

return new Promise<void>((resolve, reject) => {
topology.close({ force }, error => {
if (error) return reject(error);
const { encrypter } = this[kOptions];
if (encrypter) {
return encrypter.close(this, force, error => {
if (error) return reject(error);
resolve();
});
}
resolve();
});
});
}
callback();
});
})
.then(
() => callback(),
error => callback(error)
);
});
}

Expand Down Expand Up @@ -553,12 +588,17 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
startSession(): ClientSession;
startSession(options: ClientSessionOptions): ClientSession;
startSession(options?: ClientSessionOptions): ClientSession {
options = Object.assign({ explicit: true }, options);
if (!this.topology) {
throw new MongoNotConnectedError('MongoClient must be connected to start a session');
}

return this.topology.startSession(options, this.s.options);
const session = new ClientSession(
this,
this.s.sessionPool,
{ explicit: true, ...options },
this[kOptions]
);
this.s.activeSessions.add(session);
session.once('ended', () => {
this.s.activeSessions.delete(session);
});
return session;
}

/**
Expand Down
22 changes: 15 additions & 7 deletions src/operations/execute_operation.ts
Expand Up @@ -114,16 +114,24 @@ export function executeOperation<
if (topology.hasSessionSupport()) {
if (session == null) {
owner = Symbol();
session = topology.startSession({ owner, explicit: false });
session = client.startSession({ owner, explicit: false });
} else if (session.hasEnded) {
return callback(new MongoExpiredSessionError('Use of expired sessions is not permitted'));
} else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
return callback(new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later'));
}
} else if (session) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return callback(new MongoCompatibilityError('Current topology does not support sessions'));
} else {
// no session support
if (session && session.explicit) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return callback(new MongoCompatibilityError('Current topology does not support sessions'));
} else if (session && !session.explicit) {
// We do not have to worry about ending the session because the server session has not been acquired yet
delete operation.options.session;
operation.clearSession();
session = undefined;
}
}

try {
Expand Down Expand Up @@ -166,8 +174,8 @@ function executeWithServerSelection<TResult>(

let selector: ReadPreference | ServerSelector;

if (operation.hasAspect(Aspect.CURSOR_ITERATING)) {
// Get more operations must always select the same server, but run through
if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
// GetMore and KillCursor operations must always select the same server, but run through
// server selection to potentially force monitor checks if the server is
// in an unknown state.
selector = sameServerSelector(operation.server?.description);
Expand Down
2 changes: 1 addition & 1 deletion src/operations/get_more.ts
Expand Up @@ -60,4 +60,4 @@ export class GetMoreOperation extends AbstractOperation {
}
}

defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.CURSOR_ITERATING]);
defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.MUST_SELECT_SAME_SERVER]);
27 changes: 27 additions & 0 deletions src/operations/kill_cursors.ts
@@ -0,0 +1,27 @@
import type { Long } from '../bson';
import { MongoRuntimeError } from '../error';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback, MongoDBNamespace } from '../utils';
import { AbstractOperation, Aspect, defineAspects, OperationOptions } from './operation';

export class KillCursorsOperation extends AbstractOperation {
cursorId: Long;
constructor(cursorId: Long, ns: MongoDBNamespace, server: Server, options: OperationOptions) {
super(options);
this.ns = ns;
this.cursorId = cursorId;
this.server = server;
}

execute(server: Server, session: ClientSession | undefined, callback: Callback<void>): void {
if (server !== this.server) {
return callback(
new MongoRuntimeError('Killcursor must run on the same server operation began on')
);
}
server.killCursors(this.ns, [this.cursorId], { session }, () => callback());
}
}

defineAspects(KillCursorsOperation, [Aspect.MUST_SELECT_SAME_SERVER]);
6 changes: 5 additions & 1 deletion src/operations/operation.ts
Expand Up @@ -11,7 +11,7 @@ export const Aspect = {
EXPLAINABLE: Symbol('EXPLAINABLE'),
SKIP_COLLATION: Symbol('SKIP_COLLATION'),
CURSOR_CREATING: Symbol('CURSOR_CREATING'),
CURSOR_ITERATING: Symbol('CURSOR_ITERATING')
MUST_SELECT_SAME_SERVER: Symbol('MUST_SELECT_SAME_SERVER')
} as const;

/** @public */
Expand Down Expand Up @@ -94,6 +94,10 @@ export abstract class AbstractOperation<TResult = any> {
return this[kSession];
}

clearSession() {
this[kSession] = undefined;
}

get canRetryRead(): boolean {
return true;
}
Expand Down

0 comments on commit 89ad7c3

Please sign in to comment.