Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-3810): delay timeout errors by one event loop tick #3180

Merged
merged 18 commits into from Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -77,4 +77,4 @@ etc/docs/build
!docs/*/lib
!docs/**/*.png
!docs/**/*.css
!docs/**/*.js
!docs/**/*.js
3 changes: 3 additions & 0 deletions src/cmap/commands.ts
Expand Up @@ -458,6 +458,7 @@ export class KillCursor {
}
}

/** @internal */
export interface MessageHeader {
length: number;
requestId: number;
Expand All @@ -466,6 +467,7 @@ export interface MessageHeader {
fromCompressed?: boolean;
}

/** @internal */
export interface OpResponseOptions extends BSONSerializeOptions {
raw?: boolean;
documentsReturnedIn?: string | null;
Expand Down Expand Up @@ -640,6 +642,7 @@ const OPTS_CHECKSUM_PRESENT = 1;
const OPTS_MORE_TO_COME = 2;
const OPTS_EXHAUST_ALLOWED = 1 << 16;

/** @internal */
export interface OpMsgOptions {
requestId: number;
serializeFunctions: boolean;
Expand Down
228 changes: 128 additions & 100 deletions src/cmap/connection.ts
Expand Up @@ -75,6 +75,8 @@ const kHello = Symbol('hello');
const kAutoEncrypter = Symbol('autoEncrypter');
/** @internal */
const kFullResult = Symbol('fullResult');
/** @internal */
const kDelayedTimeoutId = Symbol('delayedTimeoutId');

/** @internal */
export interface QueryOptions extends BSONSerializeOptions {
Expand Down Expand Up @@ -191,6 +193,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;

/**@internal */
[kDelayedTimeoutId]: NodeJS.Timeout | null;
/** @internal */
[kDescription]: StreamDescription;
/** @internal */
Expand Down Expand Up @@ -245,19 +250,21 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options,
maxBsonMessageSize: this.hello?.maxBsonMessageSize
});
this[kMessageStream].on('message', messageHandler(this));
this[kStream] = stream;
stream.on('error', () => {

this[kDelayedTimeoutId] = null;

this[kMessageStream].on('message', message => this.onMessage(message));
this[kMessageStream].on('error', error => this.onError(error));
this[kStream].on('close', () => this.onClose());
this[kStream].on('timeout', () => this.onTimeout());
this[kStream].on('error', () => {
/* ignore errors, listen to `close` instead */
});

this[kMessageStream].on('error', error => this.handleIssue({ destroy: error }));
stream.on('close', () => this.handleIssue({ isClose: true }));
stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true }));

// hook the message stream up to the passed in stream
stream.pipe(this[kMessageStream]);
this[kMessageStream].pipe(stream);
this[kStream].pipe(this[kMessageStream]);
this[kMessageStream].pipe(this[kStream]);
}

get description(): StreamDescription {
Expand Down Expand Up @@ -309,40 +316,133 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this[kLastUseTime] = now();
}

handleIssue(issue: { isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }): void {
onError(error: Error) {
if (this.closed) {
return;
}

if (issue.destroy) {
this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
this[kStream].destroy(error);

this.closed = true;

for (const op of this[kQueue].values()) {
op.cb(error);
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
}

onClose() {
if (this.closed) {
return;
}

this.closed = true;

for (const [, op] of this[kQueue]) {
if (issue.isTimeout) {
op.cb(
new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
beforeHandshake: this.hello == null
})
);
} else if (issue.isClose) {
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`));
} else {
op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
}
const message = `connection ${this.id} to ${this.address} closed`;
for (const op of this[kQueue].values()) {
op.cb(new MongoNetworkError(message));
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
}

destroy(): void;
destroy(callback: Callback): void;
destroy(options: DestroyOptions): void;
destroy(options: DestroyOptions, callback: Callback): void;
destroy(options?: DestroyOptions | Callback, callback?: Callback): void {
onTimeout() {
if (this.closed) {
return;
}

this[kDelayedTimeoutId] = setTimeout(() => {
this[kStream].destroy();

this.closed = true;

const message = `connection ${this.id} to ${this.address} timed out`;
const beforeHandshake = this.hello == null;
for (const op of this[kQueue].values()) {
op.cb(new MongoNetworkTimeoutError(message, { beforeHandshake }));
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
}, 1).unref(); // No need for this timer to hold the event loop open
}

onMessage(message: BinMsg | Response) {
const delayedTimeoutId = this[kDelayedTimeoutId];
if (delayedTimeoutId != null) {
clearTimeout(delayedTimeoutId);
this[kDelayedTimeoutId] = null;
}

// always emit the message, in case we are streaming
this.emit('message', message);
const operationDescription = this[kQueue].get(message.responseTo);
if (!operationDescription) {
return;
}

const callback = operationDescription.cb;

// SERVER-45775: For exhaust responses we should be able to use the same requestId to
// track response, however the server currently synthetically produces remote requests
// making the `responseTo` change on each response
this[kQueue].delete(message.responseTo);
if ('moreToCome' in message && message.moreToCome) {
// requeue the callback for next synthetic request
this[kQueue].set(message.requestId, operationDescription);
} else if (operationDescription.socketTimeoutOverride) {
this[kStream].setTimeout(this.socketTimeoutMS);
}

try {
// Pass in the entire description because it has BSON parsing options
message.parse(operationDescription);
} catch (err) {
// If this error is generated by our own code, it will already have the correct class applied
// if it is not, then it is coming from a catastrophic data parse failure or the BSON library
// in either case, it should not be wrapped
callback(err);
return;
}

if (message.documents[0]) {
const document: Document = message.documents[0];
const session = operationDescription.session;
if (session) {
updateSessionFromResponse(session, document);
}

if (document.$clusterTime) {
this[kClusterTime] = document.$clusterTime;
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
}

if (operationDescription.command) {
if (document.writeConcernError) {
callback(new MongoWriteConcernError(document.writeConcernError, document));
return;
}

if (document.ok === 0 || document.$err || document.errmsg || document.code) {
callback(new MongoServerError(document));
return;
}
} else {
// Pre 3.2 support
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
if (document.ok === 0 || document.$err || document.errmsg) {
callback(new MongoServerError(document));
return;
}
}
}

callback(undefined, operationDescription.fullResult ? message : message.documents[0]);
}

destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
Expand Down Expand Up @@ -379,7 +479,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
});
}

/** @internal */
command(
ns: MongoDBNamespace,
cmd: Document,
Expand Down Expand Up @@ -456,7 +555,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}
}

/** @internal */
query(ns: MongoDBNamespace, cmd: Document, options: QueryOptions, callback: Callback): void {
const isExplain = cmd.$explain != null;
const readPreference = options.readPreference ?? ReadPreference.primary;
Expand Down Expand Up @@ -529,7 +627,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
);
}

/** @internal */
getMore(
ns: MongoDBNamespace,
cursorId: Long,
Expand Down Expand Up @@ -586,7 +683,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.command(ns, getMoreCmd, commandOptions, callback);
}

/** @internal */
killCursors(
ns: MongoDBNamespace,
cursorIds: Long[],
Expand Down Expand Up @@ -706,74 +802,6 @@ function supportsOpMsg(conn: Connection) {
return maxWireVersion(conn) >= 6 && !description.__nodejs_mock_server__;
}

function messageHandler(conn: Connection) {
return function messageHandler(message: BinMsg | Response) {
// always emit the message, in case we are streaming
conn.emit('message', message);
const operationDescription = conn[kQueue].get(message.responseTo);
if (!operationDescription) {
return;
}

const callback = operationDescription.cb;

// SERVER-45775: For exhaust responses we should be able to use the same requestId to
// track response, however the server currently synthetically produces remote requests
// making the `responseTo` change on each response
conn[kQueue].delete(message.responseTo);
if ('moreToCome' in message && message.moreToCome) {
// requeue the callback for next synthetic request
conn[kQueue].set(message.requestId, operationDescription);
} else if (operationDescription.socketTimeoutOverride) {
conn[kStream].setTimeout(conn.socketTimeoutMS);
}

try {
// Pass in the entire description because it has BSON parsing options
message.parse(operationDescription);
} catch (err) {
// If this error is generated by our own code, it will already have the correct class applied
// if it is not, then it is coming from a catastrophic data parse failure or the BSON library
// in either case, it should not be wrapped
callback(err);
return;
}

if (message.documents[0]) {
const document: Document = message.documents[0];
const session = operationDescription.session;
if (session) {
updateSessionFromResponse(session, document);
}

if (document.$clusterTime) {
conn[kClusterTime] = document.$clusterTime;
conn.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
}

if (operationDescription.command) {
if (document.writeConcernError) {
callback(new MongoWriteConcernError(document.writeConcernError, document));
return;
}

if (document.ok === 0 || document.$err || document.errmsg || document.code) {
callback(new MongoServerError(document));
return;
}
} else {
// Pre 3.2 support
if (document.ok === 0 || document.$err || document.errmsg) {
callback(new MongoServerError(document));
return;
}
}
}

callback(undefined, operationDescription.fullResult ? message : message.documents[0]);
};
}

function streamIdentifier(stream: Stream, options: ConnectionOptions): string {
if (options.proxyHost) {
// If proxy options are specified, the properties of `stream` itself
Expand Down
5 changes: 5 additions & 0 deletions src/index.ts
Expand Up @@ -186,12 +186,17 @@ export type {
MongoCredentialsOptions
} from './cmap/auth/mongo_credentials';
export type {
BinMsg,
GetMore,
KillCursor,
MessageHeader,
Msg,
OpGetMoreOptions,
OpMsgOptions,
OpQueryOptions,
OpResponseOptions,
Query,
Response,
WriteProtocolMessageType
} from './cmap/commands';
export type { LEGAL_TCP_SOCKET_OPTIONS, LEGAL_TLS_SOCKET_OPTIONS, Stream } from './cmap/connect';
Expand Down
7 changes: 3 additions & 4 deletions test/tools/runner/hooks/configuration.js
Expand Up @@ -66,15 +66,14 @@ async function initializeFilters(client) {
}

const testSkipBeforeEachHook = async function () {
// `metadata` always exists, `requires` is optional
const requires = this.currentTest.metadata.requires;
const metadata = this.currentTest.metadata;
baileympearson marked this conversation as resolved.
Show resolved Hide resolved

if (requires && Object.keys(requires).length > 0) {
if (metadata && metadata.requires && Object.keys(metadata.requires).length > 0) {
const failedFilter = filters.find(filter => !filter.filter(this.currentTest));

if (failedFilter) {
const filterName = failedFilter.constructor.name;
const metadataString = inspect(requires, {
const metadataString = inspect(metadata.requires, {
colors: true,
compact: true,
depth: 10,
Expand Down