Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-3938): Add support for pre/post images in change streams #3250

Merged
merged 9 commits into from
May 18, 2022
63 changes: 56 additions & 7 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const CHANGE_STREAM_OPTIONS = [
'resumeAfter',
'startAfter',
'startAtOperationTime',
'fullDocument'
'fullDocument',
'fullDocumentBeforeChange'
] as const;

const CURSOR_OPTIONS = [
Expand Down Expand Up @@ -131,11 +132,35 @@ export type ChangeStreamAggregateRawResult<TChange> = {
*/
export interface ChangeStreamOptions extends AggregateOptions {
/**
* Allowed values: 'updateLookup'. When set to 'updateLookup',
* the change stream will include both a delta describing the changes to the document,
* as well as a copy of the entire document that was changed from some time after the change occurred.
* Allowed values: 'updateLookup', 'whenAvailable', 'required'.
*
* When set to 'updateLookup', the change notification for partial updates
* will include both a delta describing the changes to the document as well
* as a copy of the entire document that was changed from some time after
* the change occurred.
*
* When set to 'whenAvailable', configures the change stream to return the
* post-image of the modified document for replace and update change events
* if the post-image for this event is available.
*
* When set to 'required', the same behavior as 'whenAvailable' except that
* an error is raised if the post-image is not available.
*/
fullDocument?: string;

/**
* Allowed values: 'whenAvailable', 'required', 'off'.
*
* The default is to not send a value, which is equivalent to 'off'.
*
* When set to 'whenAvailable', configures the change stream to return the
* pre-image of the modified document for replace, update, and delete change
* events if it is available.
*
* When set to 'required', the same behavior as 'whenAvailable' except that
* an error is raised if the pre-image is not available.
*/
fullDocumentBeforeChange?: string;
/** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */
maxAwaitTimeMS?: number;
/**
Expand Down Expand Up @@ -230,15 +255,23 @@ export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
operationType: 'update';
/**
* This is only set if `fullDocument` is set to `'updateLookup'`
* The fullDocument document represents the most current majority-committed version of the updated document.
* The fullDocument document may vary from the document at the time of the update operation depending on the
* number of interleaving majority-committed operations that occur between the update operation and the document lookup.
* Contains the point-in-time post-image of the modified document if the
* post-image is available and either 'required' or 'whenAvailable' was
* specified for the 'fullDocument' option when creating the change stream.
*/
fullDocument?: TSchema;
/** Contains a description of updated and removed fields in this operation */
updateDescription: UpdateDescription<TSchema>;
/** Namespace the update event occured on */
ns: ChangeStreamNameSpace;
/**
* Contains the pre-image of the modified or deleted document if the
* pre-image is available for the change event and either 'required' or
* 'whenAvailable' was specified for the 'fullDocumentBeforeChange' option
* when creating the change stream. If 'whenAvailable' was specified but the
* pre-image is unavailable, this will be explicitly set to null.
*/
fullDocumentBeforeChange?: TSchema;
}

/**
Expand All @@ -254,6 +287,14 @@ export interface ChangeStreamReplaceDocument<TSchema extends Document = Document
fullDocument: TSchema;
/** Namespace the replace event occured on */
ns: ChangeStreamNameSpace;
/**
* Contains the pre-image of the modified or deleted document if the
* pre-image is available for the change event and either 'required' or
* 'whenAvailable' was specified for the 'fullDocumentBeforeChange' option
* when creating the change stream. If 'whenAvailable' was specified but the
* pre-image is unavailable, this will be explicitly set to null.
*/
fullDocumentBeforeChange?: TSchema;
}

/**
Expand All @@ -267,6 +308,14 @@ export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
operationType: 'delete';
/** Namespace the delete event occured on */
ns: ChangeStreamNameSpace;
/**
* Contains the pre-image of the modified or deleted document if the
* pre-image is available for the change event and either 'required' or
* 'whenAvailable' was specified for the 'fullDocumentBeforeChange' option
* when creating the change stream. If 'whenAvailable' was specified but the
* pre-image is unavailable, this will be explicitly set to null.
*/
fullDocumentBeforeChange?: TSchema;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/operations/create_collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ export interface CreateCollectionOptions extends CommandOperationOptions {
expireAfterSeconds?: number;
/** @experimental */
encryptedFields?: Document;
/**
* If set, enables pre-update and post-update document events to be included for any
* change streams that listen on this collection.
*/
changeStreamPreAndPostImages?: { enabled: boolean };
}

/** @internal */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
const { loadSpecTests } = require('../../spec/index');
const { runUnifiedSuite } = require('../../tools/unified-spec-runner/runner');

// The Node driver does not have a Collection.modifyCollection helper.
const SKIPPED_TESTS = ['modifyCollection to changeStreamPreAndPostImages enabled'];
durran marked this conversation as resolved.
Show resolved Hide resolved

describe('Collection management unified spec tests', function () {
runUnifiedSuite(loadSpecTests('collection-management'));
runUnifiedSuite(loadSpecTests('collection-management'), SKIPPED_TESTS);
});