Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-4081): fix and deprecate change stream resume options #3270

Merged
merged 12 commits into from
May 31, 2022
49 changes: 24 additions & 25 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ const CHANGE_STREAM_OPTIONS = [
'fullDocumentBeforeChange'
] as const;

const CURSOR_OPTIONS = [
'batchSize',
'maxAwaitTimeMS',
'collation',
'readPreference',
'comment',
...CHANGE_STREAM_OPTIONS
] as const;

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
DATABASE: Symbol('Database'),
Expand All @@ -84,7 +75,10 @@ const NO_RESUME_TOKEN_ERROR =
const NO_CURSOR_ERROR = 'ChangeStream has no cursor';
const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed';

/** @public */
/**
* @public
* @deprecated Please use the ChangeStreamCursorOptions type instead.
*/
export interface ResumeOptions {
startAtOperationTime?: Timestamp;
batchSize?: number;
Expand All @@ -93,6 +87,7 @@ export interface ResumeOptions {
readPreference?: ReadPreference;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
fullDocument?: string;
}

/**
Expand Down Expand Up @@ -639,7 +634,7 @@ export class ChangeStream<
* @internal
*/
private _createChangeStreamCursor(
options: ChangeStreamOptions | ResumeOptions
options: ChangeStreamOptions | ChangeStreamCursorOptions
): ChangeStreamCursor<TSchema, TChange> {
const changeStreamStageOptions = filterOptions(options, CHANGE_STREAM_OPTIONS);
if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
Expand Down Expand Up @@ -880,6 +875,9 @@ export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
startAtOperationTime?: OperationTime;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
maxAwaitTimeMS?: number;
collation?: CollationOptions;
fullDocument?: string;
}

/** @internal */
Expand Down Expand Up @@ -926,25 +924,26 @@ export class ChangeStreamCursor<
return this._resumeToken;
}

get resumeOptions(): ResumeOptions {
const result: ResumeOptions = filterOptions(this.options, CURSOR_OPTIONS);

if (this.resumeToken || this.startAtOperationTime) {
for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime']) {
Reflect.deleteProperty(result, key);
}
get resumeOptions(): ChangeStreamCursorOptions {
const options: ChangeStreamCursorOptions = {
...this.options
};

if (this.resumeToken) {
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
delete options[key];
}

result[resumeKey] = this.resumeToken;
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
result.startAtOperationTime = this.startAtOperationTime;
if (this.resumeToken != null) {
if (this.options.startAfter && !this.hasReceived) {
options.startAfter = this.resumeToken;
} else {
options.resumeAfter = this.resumeToken;
}
} else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) {
options.startAtOperationTime = this.startAtOperationTime;
}

return result;
return options;
}

cacheResumeToken(resumeToken: ResumeToken): void {
Expand Down