Skip to content

Commit

Permalink
feat(NODE-3699): add support for comment field (#3167)
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Apr 1, 2022
1 parent 43ba9fc commit 4e2f9bf
Show file tree
Hide file tree
Showing 57 changed files with 6,619 additions and 344 deletions.
5 changes: 3 additions & 2 deletions src/change_stream.ts
Expand Up @@ -57,6 +57,7 @@ const CURSOR_OPTIONS = [
'maxAwaitTimeMS',
'collation',
'readPreference',
'comment',
...CHANGE_STREAM_OPTIONS
] as const;

Expand Down Expand Up @@ -410,7 +411,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
startAtOperationTime?: OperationTime;
resumeAfter?: ResumeToken;
startAfter?: boolean;
startAfter?: ResumeToken;
}

/** @internal */
Expand Down Expand Up @@ -617,7 +618,7 @@ function applyKnownOptions(source: Document, options: ReadonlyArray<string>) {
const result: Document = {};

for (const option of options) {
if (source[option]) {
if (option in source) {
result[option] = source[option];
}
}
Expand Down
15 changes: 14 additions & 1 deletion src/cmap/connection.ts
Expand Up @@ -124,7 +124,15 @@ export interface GetMoreOptions extends CommandOptions {
batchSize?: number;
maxTimeMS?: number;
maxAwaitTimeMS?: number;
comment?: Document | string;
/**
* Comment to apply to the operation.
*
* In server versions pre-4.4, 'comment' must be string. A server
* error will be thrown if any other type is provided.
*
* In server versions 4.4 and above, 'comment' can be any valid BSON type.
*/
comment?: unknown;
}

/** @public */
Expand Down Expand Up @@ -574,6 +582,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (typeof options.maxAwaitTimeMS === 'number') {
getMoreCmd.maxTimeMS = options.maxAwaitTimeMS;
}
// we check for undefined specifically here to allow falsy values
// eslint-disable-next-line no-restricted-syntax
if (options.comment !== undefined) {
getMoreCmd.comment = options.comment;
}

const commandOptions = Object.assign(
{
Expand Down
134 changes: 78 additions & 56 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -42,6 +42,8 @@ const kInitialized = Symbol('initialized');
const kClosed = Symbol('closed');
/** @internal */
const kKilled = Symbol('killed');
/** @internal */
const kInit = Symbol('kInit');

/** @public */
export const CURSOR_FLAGS = [
Expand Down Expand Up @@ -77,7 +79,15 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
readConcern?: ReadConcernLike;
batchSize?: number;
maxTimeMS?: number;
comment?: Document | string;
/**
* Comment to apply to the operation.
*
* In server versions pre-4.4, 'comment' must be string. A server
* error will be thrown if any other type is provided.
*
* In server versions 4.4 and above, 'comment' can be any valid BSON type.
*/
comment?: unknown;
tailable?: boolean;
awaitData?: boolean;
noCursorTimeout?: boolean;
Expand Down Expand Up @@ -162,7 +172,9 @@ export abstract class AbstractCursor<
this[kOptions].batchSize = options.batchSize;
}

if (options.comment != null) {
// we check for undefined specifically here to allow falsy values
// eslint-disable-next-line no-restricted-syntax
if (options.comment !== undefined) {
this[kOptions].comment = options.comment;
}

Expand Down Expand Up @@ -620,6 +632,65 @@ export abstract class AbstractCursor<

executeOperation(this, getMoreOperation, callback);
}

/**
* @internal
*
* This function is exposed for the unified test runner's createChangeStream
* operation. We cannot refactor to use the abstract _initialize method without
* a significant refactor.
*/
[kInit](callback: Callback<TSchema | null>): void {
if (this[kSession] == null) {
if (this[kTopology].shouldCheckForSessionSupport()) {
return this[kTopology].selectServer(ReadPreference.primaryPreferred, {}, err => {
if (err) return callback(err);
return this[kInit](callback);
});
} else if (this[kTopology].hasSessionSupport()) {
this[kSession] = this[kTopology].startSession({ owner: this, explicit: false });
}
}

this._initialize(this[kSession], (err, state) => {
if (state) {
const response = state.response;
this[kServer] = state.server;
this[kSession] = state.session;

if (response.cursor) {
this[kId] =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: response.cursor.id;

if (response.cursor.ns) {
this[kNamespace] = ns(response.cursor.ns);
}

this[kDocuments] = response.cursor.firstBatch;
}

// When server responses return without a cursor document, we close this cursor
// and return the raw server response. This is often the case for explain commands
// for example
if (this[kId] == null) {
this[kId] = Long.ZERO;
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
this[kDocuments] = [state.response as TODO_NODE_3286];
}
}

// the cursor is now initialized, even if an error occurred or it is dead
this[kInitialized] = true;

if (err || cursorIsDead(this)) {
return cleanupCursor(this, { error: err }, () => callback(err, nextDocument(this)));
}

callback();
});
}
}

function nextDocument<T>(cursor: AbstractCursor): T | null | undefined {
Expand Down Expand Up @@ -653,61 +724,12 @@ function next<T>(cursor: AbstractCursor, blocking: boolean, callback: Callback<T

if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
if (cursor[kSession] == null) {
if (cursor[kTopology].shouldCheckForSessionSupport()) {
return cursor[kTopology].selectServer(ReadPreference.primaryPreferred, {}, err => {
if (err) return callback(err);
return next(cursor, blocking, callback);
});
} else if (cursor[kTopology].hasSessionSupport()) {
cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: false });
}
}

cursor._initialize(cursor[kSession], (err, state) => {
if (state) {
const response = state.response;
cursor[kServer] = state.server;
cursor[kSession] = state.session;

if (response.cursor) {
cursor[kId] =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: response.cursor.id;

if (response.cursor.ns) {
cursor[kNamespace] = ns(response.cursor.ns);
}

cursor[kDocuments] = response.cursor.firstBatch;
} else {
// NOTE: This is for support of older servers (<3.2) which do not use commands
cursor[kId] =
typeof response.cursorId === 'number'
? Long.fromNumber(response.cursorId)
: response.cursorId;
cursor[kDocuments] = response.documents;
}

// When server responses return without a cursor document, we close this cursor
// and return the raw server response. This is often the case for explain commands
// for example
if (cursor[kId] == null) {
cursor[kId] = Long.ZERO;
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
cursor[kDocuments] = [state.response as TODO_NODE_3286];
}
}

// the cursor is now initialized, even if an error occurred or it is dead
cursor[kInitialized] = true;

if (err || cursorIsDead(cursor)) {
return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument(cursor)));
cursor[kInit]((err, value) => {
if (err) return callback(err);
if (value) {
return callback(undefined, value);
}

next(cursor, blocking, callback);
return next(cursor, blocking, callback);
});

return;
Expand Down
10 changes: 8 additions & 2 deletions src/operations/aggregate.ts
Expand Up @@ -2,8 +2,7 @@ import type { Document } from '../bson';
import { MongoInvalidArgumentError } from '../error';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { maxWireVersion, MongoDBNamespace } from '../utils';
import { Callback, maxWireVersion, MongoDBNamespace } from '../utils';
import { CollationOptions, CommandOperation, CommandOperationOptions } from './command';
import { Aspect, defineAspects, Hint } from './operation';

Expand Down Expand Up @@ -31,6 +30,7 @@ export interface AggregateOptions extends CommandOperationOptions {
hint?: Hint;
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
let?: Document;

out?: string;
}

Expand Down Expand Up @@ -121,6 +121,12 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
command.let = options.let;
}

// we check for undefined specifically here to allow falsy values
// eslint-disable-next-line no-restricted-syntax
if (options.comment !== undefined) {
command.comment = options.comment;
}

command.cursor = options.cursor || {};
if (options.batchSize && !this.hasWriteStage) {
command.cursor.batchSize = options.batchSize;
Expand Down
11 changes: 9 additions & 2 deletions src/operations/command.ts
Expand Up @@ -45,8 +45,15 @@ export interface CommandOperationOptions
/** Collation */
collation?: CollationOptions;
maxTimeMS?: number;
/** A user-provided comment to attach to this command */
comment?: string | Document;
/**
* Comment to apply to the operation.
*
* In server versions pre-4.4, 'comment' must be string. A server
* error will be thrown if any other type is provided.
*
* In server versions 4.4 and above, 'comment' can be any valid BSON type.
*/
comment?: unknown;
/** Should retry failed writes */
retryWrites?: boolean;

Expand Down
14 changes: 6 additions & 8 deletions src/operations/delete.ts
Expand Up @@ -12,8 +12,6 @@ import { Aspect, defineAspects, Hint } from './operation';
export interface DeleteOptions extends CommandOperationOptions, WriteConcernOptions {
/** If true, when an insert fails, don't execute the remaining writes. If false, continue with remaining inserts when one fails. */
ordered?: boolean;
/** A user-provided comment to attach to this command */
comment?: string | Document;
/** Specifies the collation to use for the operation */
collation?: CollationOptions;
/** Specify that the update query should only consider plans using the hinted index */
Expand Down Expand Up @@ -43,8 +41,6 @@ export interface DeleteStatement {
collation?: CollationOptions;
/** A document or string that specifies the index to use to support the query predicate. */
hint?: Hint;
/** A user-provided comment to attach to this command */
comment?: string | Document;
}

/** @internal */
Expand Down Expand Up @@ -80,6 +76,12 @@ export class DeleteOperation extends CommandOperation<Document> {
command.let = options.let;
}

// we check for undefined specifically here to allow falsy values
// eslint-disable-next-line no-restricted-syntax
if (options.comment !== undefined) {
command.comment = options.comment;
}

if (options.explain != null && maxWireVersion(server) < 3) {
return callback
? callback(
Expand Down Expand Up @@ -175,10 +177,6 @@ export function makeDeleteStatement(
op.hint = options.hint;
}

if (options.comment) {
op.comment = options.comment;
}

return op;
}

Expand Down
6 changes: 3 additions & 3 deletions src/operations/find.ts
Expand Up @@ -46,8 +46,6 @@ export interface FindOptions<TSchema extends Document = Document> extends Comman
min?: Document;
/** The exclusive upper bound for a specific index */
max?: Document;
/** You can put a $comment field on a query to make looking in the profiler logs simpler. */
comment?: string | Document;
/** Number of milliseconds to wait before aborting the query. */
maxTimeMS?: number;
/** The maximum amount of time for the server to wait on new documents to satisfy a tailable cursor query. Requires `tailable` and `awaitData` to be true */
Expand Down Expand Up @@ -241,7 +239,9 @@ function makeFindCommand(ns: MongoDBNamespace, filter: Document, options: FindOp
findCommand.singleBatch = options.singleBatch;
}

if (options.comment) {
// we check for undefined specifically here to allow falsy values
// eslint-disable-next-line no-restricted-syntax
if (options.comment !== undefined) {
findCommand.comment = options.comment;
}

Expand Down
15 changes: 15 additions & 0 deletions src/operations/find_and_modify.ts
Expand Up @@ -82,6 +82,15 @@ interface FindAndModifyCmdBase {
maxTimeMS?: number;
let?: Document;
writeConcern?: WriteConcern | WriteConcernSettings;
/**
* Comment to apply to the operation.
*
* In server versions pre-4.4, 'comment' must be string. A server
* error will be thrown if any other type is provided.
*
* In server versions 4.4 and above, 'comment' can be any valid BSON type.
*/
comment?: unknown;
}

function configureFindAndModifyCmdBaseUpdateOpts(
Expand Down Expand Up @@ -140,6 +149,12 @@ class FindAndModifyOperation extends CommandOperation<Document> {
this.cmdBase.let = options.let;
}

// we check for undefined specifically here to allow falsy values
// eslint-disable-next-line no-restricted-syntax
if (options.comment !== undefined) {
this.cmdBase.comment = options.comment;
}

// force primary read preference
this.readPreference = ReadPreference.primary;

Expand Down

0 comments on commit 4e2f9bf

Please sign in to comment.