Skip to content

Commit

Permalink
fix(NODE-3810): delay timeout errors by one event loop tick (#3180)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Apr 1, 2022
1 parent 4e2f9bf commit 0ed7cbf
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 260 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -77,4 +77,4 @@ etc/docs/build
!docs/*/lib
!docs/**/*.png
!docs/**/*.css
!docs/**/*.js
!docs/**/*.js
3 changes: 3 additions & 0 deletions src/cmap/commands.ts
Expand Up @@ -458,6 +458,7 @@ export class KillCursor {
}
}

/** @internal */
export interface MessageHeader {
length: number;
requestId: number;
Expand All @@ -466,6 +467,7 @@ export interface MessageHeader {
fromCompressed?: boolean;
}

/** @internal */
export interface OpResponseOptions extends BSONSerializeOptions {
raw?: boolean;
documentsReturnedIn?: string | null;
Expand Down Expand Up @@ -640,6 +642,7 @@ const OPTS_CHECKSUM_PRESENT = 1;
const OPTS_MORE_TO_COME = 2;
const OPTS_EXHAUST_ALLOWED = 1 << 16;

/** @internal */
export interface OpMsgOptions {
requestId: number;
serializeFunctions: boolean;
Expand Down
228 changes: 128 additions & 100 deletions src/cmap/connection.ts
Expand Up @@ -75,6 +75,8 @@ const kHello = Symbol('hello');
const kAutoEncrypter = Symbol('autoEncrypter');
/** @internal */
const kFullResult = Symbol('fullResult');
/** @internal */
const kDelayedTimeoutId = Symbol('delayedTimeoutId');

/** @internal */
export interface QueryOptions extends BSONSerializeOptions {
Expand Down Expand Up @@ -199,6 +201,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;

/**@internal */
[kDelayedTimeoutId]: NodeJS.Timeout | null;
/** @internal */
[kDescription]: StreamDescription;
/** @internal */
Expand Down Expand Up @@ -253,19 +258,21 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options,
maxBsonMessageSize: this.hello?.maxBsonMessageSize
});
this[kMessageStream].on('message', messageHandler(this));
this[kStream] = stream;
stream.on('error', () => {

this[kDelayedTimeoutId] = null;

this[kMessageStream].on('message', message => this.onMessage(message));
this[kMessageStream].on('error', error => this.onError(error));
this[kStream].on('close', () => this.onClose());
this[kStream].on('timeout', () => this.onTimeout());
this[kStream].on('error', () => {
/* ignore errors, listen to `close` instead */
});

this[kMessageStream].on('error', error => this.handleIssue({ destroy: error }));
stream.on('close', () => this.handleIssue({ isClose: true }));
stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true }));

// hook the message stream up to the passed in stream
stream.pipe(this[kMessageStream]);
this[kMessageStream].pipe(stream);
this[kStream].pipe(this[kMessageStream]);
this[kMessageStream].pipe(this[kStream]);
}

get description(): StreamDescription {
Expand Down Expand Up @@ -317,40 +324,133 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this[kLastUseTime] = now();
}

handleIssue(issue: { isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }): void {
onError(error: Error) {
if (this.closed) {
return;
}

if (issue.destroy) {
this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
this[kStream].destroy(error);

this.closed = true;

for (const op of this[kQueue].values()) {
op.cb(error);
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
}

onClose() {
if (this.closed) {
return;
}

this.closed = true;

for (const [, op] of this[kQueue]) {
if (issue.isTimeout) {
op.cb(
new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
beforeHandshake: this.hello == null
})
);
} else if (issue.isClose) {
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`));
} else {
op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
}
const message = `connection ${this.id} to ${this.address} closed`;
for (const op of this[kQueue].values()) {
op.cb(new MongoNetworkError(message));
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
}

destroy(): void;
destroy(callback: Callback): void;
destroy(options: DestroyOptions): void;
destroy(options: DestroyOptions, callback: Callback): void;
destroy(options?: DestroyOptions | Callback, callback?: Callback): void {
onTimeout() {
if (this.closed) {
return;
}

this[kDelayedTimeoutId] = setTimeout(() => {
this[kStream].destroy();

this.closed = true;

const message = `connection ${this.id} to ${this.address} timed out`;
const beforeHandshake = this.hello == null;
for (const op of this[kQueue].values()) {
op.cb(new MongoNetworkTimeoutError(message, { beforeHandshake }));
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
}, 1).unref(); // No need for this timer to hold the event loop open
}

onMessage(message: BinMsg | Response) {
const delayedTimeoutId = this[kDelayedTimeoutId];
if (delayedTimeoutId != null) {
clearTimeout(delayedTimeoutId);
this[kDelayedTimeoutId] = null;
}

// always emit the message, in case we are streaming
this.emit('message', message);
const operationDescription = this[kQueue].get(message.responseTo);
if (!operationDescription) {
return;
}

const callback = operationDescription.cb;

// SERVER-45775: For exhaust responses we should be able to use the same requestId to
// track response, however the server currently synthetically produces remote requests
// making the `responseTo` change on each response
this[kQueue].delete(message.responseTo);
if ('moreToCome' in message && message.moreToCome) {
// requeue the callback for next synthetic request
this[kQueue].set(message.requestId, operationDescription);
} else if (operationDescription.socketTimeoutOverride) {
this[kStream].setTimeout(this.socketTimeoutMS);
}

try {
// Pass in the entire description because it has BSON parsing options
message.parse(operationDescription);
} catch (err) {
// If this error is generated by our own code, it will already have the correct class applied
// if it is not, then it is coming from a catastrophic data parse failure or the BSON library
// in either case, it should not be wrapped
callback(err);
return;
}

if (message.documents[0]) {
const document: Document = message.documents[0];
const session = operationDescription.session;
if (session) {
updateSessionFromResponse(session, document);
}

if (document.$clusterTime) {
this[kClusterTime] = document.$clusterTime;
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
}

if (operationDescription.command) {
if (document.writeConcernError) {
callback(new MongoWriteConcernError(document.writeConcernError, document));
return;
}

if (document.ok === 0 || document.$err || document.errmsg || document.code) {
callback(new MongoServerError(document));
return;
}
} else {
// Pre 3.2 support
if (document.ok === 0 || document.$err || document.errmsg) {
callback(new MongoServerError(document));
return;
}
}
}

callback(undefined, operationDescription.fullResult ? message : message.documents[0]);
}

destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
Expand Down Expand Up @@ -387,7 +487,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
});
}

/** @internal */
command(
ns: MongoDBNamespace,
cmd: Document,
Expand Down Expand Up @@ -464,7 +563,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}
}

/** @internal */
query(ns: MongoDBNamespace, cmd: Document, options: QueryOptions, callback: Callback): void {
const isExplain = cmd.$explain != null;
const readPreference = options.readPreference ?? ReadPreference.primary;
Expand Down Expand Up @@ -537,7 +635,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
);
}

/** @internal */
getMore(
ns: MongoDBNamespace,
cursorId: Long,
Expand Down Expand Up @@ -599,7 +696,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.command(ns, getMoreCmd, commandOptions, callback);
}

/** @internal */
killCursors(
ns: MongoDBNamespace,
cursorIds: Long[],
Expand Down Expand Up @@ -719,74 +815,6 @@ function supportsOpMsg(conn: Connection) {
return maxWireVersion(conn) >= 6 && !description.__nodejs_mock_server__;
}

function messageHandler(conn: Connection) {
return function messageHandler(message: BinMsg | Response) {
// always emit the message, in case we are streaming
conn.emit('message', message);
const operationDescription = conn[kQueue].get(message.responseTo);
if (!operationDescription) {
return;
}

const callback = operationDescription.cb;

// SERVER-45775: For exhaust responses we should be able to use the same requestId to
// track response, however the server currently synthetically produces remote requests
// making the `responseTo` change on each response
conn[kQueue].delete(message.responseTo);
if ('moreToCome' in message && message.moreToCome) {
// requeue the callback for next synthetic request
conn[kQueue].set(message.requestId, operationDescription);
} else if (operationDescription.socketTimeoutOverride) {
conn[kStream].setTimeout(conn.socketTimeoutMS);
}

try {
// Pass in the entire description because it has BSON parsing options
message.parse(operationDescription);
} catch (err) {
// If this error is generated by our own code, it will already have the correct class applied
// if it is not, then it is coming from a catastrophic data parse failure or the BSON library
// in either case, it should not be wrapped
callback(err);
return;
}

if (message.documents[0]) {
const document: Document = message.documents[0];
const session = operationDescription.session;
if (session) {
updateSessionFromResponse(session, document);
}

if (document.$clusterTime) {
conn[kClusterTime] = document.$clusterTime;
conn.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
}

if (operationDescription.command) {
if (document.writeConcernError) {
callback(new MongoWriteConcernError(document.writeConcernError, document));
return;
}

if (document.ok === 0 || document.$err || document.errmsg || document.code) {
callback(new MongoServerError(document));
return;
}
} else {
// Pre 3.2 support
if (document.ok === 0 || document.$err || document.errmsg) {
callback(new MongoServerError(document));
return;
}
}
}

callback(undefined, operationDescription.fullResult ? message : message.documents[0]);
};
}

function streamIdentifier(stream: Stream, options: ConnectionOptions): string {
if (options.proxyHost) {
// If proxy options are specified, the properties of `stream` itself
Expand Down
5 changes: 5 additions & 0 deletions src/index.ts
Expand Up @@ -186,12 +186,17 @@ export type {
MongoCredentialsOptions
} from './cmap/auth/mongo_credentials';
export type {
BinMsg,
GetMore,
KillCursor,
MessageHeader,
Msg,
OpGetMoreOptions,
OpMsgOptions,
OpQueryOptions,
OpResponseOptions,
Query,
Response,
WriteProtocolMessageType
} from './cmap/commands';
export type { LEGAL_TCP_SOCKET_OPTIONS, LEGAL_TLS_SOCKET_OPTIONS, Stream } from './cmap/connect';
Expand Down
7 changes: 3 additions & 4 deletions test/tools/runner/hooks/configuration.js
Expand Up @@ -66,15 +66,14 @@ async function initializeFilters(client) {
}

const testSkipBeforeEachHook = async function () {
// `metadata` always exists, `requires` is optional
const requires = this.currentTest.metadata.requires;
const metadata = this.currentTest.metadata;

if (requires && Object.keys(requires).length > 0) {
if (metadata && metadata.requires && Object.keys(metadata.requires).length > 0) {
const failedFilter = filters.find(filter => !filter.filter(this.currentTest));

if (failedFilter) {
const filterName = failedFilter.constructor.name;
const metadataString = inspect(requires, {
const metadataString = inspect(metadata.requires, {
colors: true,
compact: true,
depth: 10,
Expand Down

0 comments on commit 0ed7cbf

Please sign in to comment.