Skip to content

Commit

Permalink
feat(NODE-4081): fix and deprecate change stream resume options (#3270)
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed May 31, 2022
1 parent cb3e860 commit 47adfb3
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 25 deletions.
49 changes: 24 additions & 25 deletions src/change_stream.ts
Expand Up @@ -54,15 +54,6 @@ const CHANGE_STREAM_OPTIONS = [
'fullDocumentBeforeChange'
] as const;

const CURSOR_OPTIONS = [
'batchSize',
'maxAwaitTimeMS',
'collation',
'readPreference',
'comment',
...CHANGE_STREAM_OPTIONS
] as const;

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
DATABASE: Symbol('Database'),
Expand All @@ -84,7 +75,10 @@ const NO_RESUME_TOKEN_ERROR =
const NO_CURSOR_ERROR = 'ChangeStream has no cursor';
const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed';

/** @public */
/**
* @public
* @deprecated Please use the ChangeStreamCursorOptions type instead.
*/
export interface ResumeOptions {
startAtOperationTime?: Timestamp;
batchSize?: number;
Expand All @@ -93,6 +87,7 @@ export interface ResumeOptions {
readPreference?: ReadPreference;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
fullDocument?: string;
}

/**
Expand Down Expand Up @@ -639,7 +634,7 @@ export class ChangeStream<
* @internal
*/
private _createChangeStreamCursor(
options: ChangeStreamOptions | ResumeOptions
options: ChangeStreamOptions | ChangeStreamCursorOptions
): ChangeStreamCursor<TSchema, TChange> {
const changeStreamStageOptions = filterOptions(options, CHANGE_STREAM_OPTIONS);
if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
Expand Down Expand Up @@ -880,6 +875,9 @@ export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
startAtOperationTime?: OperationTime;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
maxAwaitTimeMS?: number;
collation?: CollationOptions;
fullDocument?: string;
}

/** @internal */
Expand Down Expand Up @@ -926,25 +924,26 @@ export class ChangeStreamCursor<
return this._resumeToken;
}

get resumeOptions(): ResumeOptions {
const result: ResumeOptions = filterOptions(this.options, CURSOR_OPTIONS);

if (this.resumeToken || this.startAtOperationTime) {
for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime']) {
Reflect.deleteProperty(result, key);
}
get resumeOptions(): ChangeStreamCursorOptions {
const options: ChangeStreamCursorOptions = {
...this.options
};

if (this.resumeToken) {
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) {
delete options[key];
}

result[resumeKey] = this.resumeToken;
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
result.startAtOperationTime = this.startAtOperationTime;
if (this.resumeToken != null) {
if (this.options.startAfter && !this.hasReceived) {
options.startAfter = this.resumeToken;
} else {
options.resumeAfter = this.resumeToken;
}
} else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) {
options.startAtOperationTime = this.startAtOperationTime;
}

return result;
return options;
}

cacheResumeToken(resumeToken: ResumeToken): void {
Expand Down

0 comments on commit 47adfb3

Please sign in to comment.