Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-4196): add support for showExpandedEvents in change streams #3254

Merged
156 changes: 148 additions & 8 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Denque = require('denque');
import type { Readable } from 'stream';
import { setTimeout } from 'timers';

import type { Document, Long, Timestamp } from './bson';
import type { Binary, Document, Long, Timestamp } from './bson';
import { Collection } from './collection';
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
import {
Expand Down Expand Up @@ -52,7 +52,8 @@ const CHANGE_STREAM_OPTIONS = [
'startAfter',
'startAtOperationTime',
'fullDocument',
'fullDocumentBeforeChange'
'fullDocumentBeforeChange',
'showExpandedEvents'
] as const;

const CHANGE_DOMAIN_TYPES = {
Expand Down Expand Up @@ -176,6 +177,19 @@ export interface ChangeStreamOptions extends AggregateOptions {
* @see https://docs.mongodb.com/manual/reference/command/aggregate
*/
batchSize?: number;

/**
* When enabled, configures the change stream to include extra change events.
*
* - createIndexes
* - dropIndexes
* - modify
* - create
* - shardCollection
* - reshardCollection
* - refineCollectionShardKey
*/
showExpandedEvents?: boolean;
}

/** @public */
Expand Down Expand Up @@ -225,13 +239,41 @@ export interface ChangeStreamDocumentCommon {
lsid?: ServerSessionId;
}

/** @public */
export interface ChangeStreamDocumentCollectionUUID {
/**
* The UUID (Binary subtype 4) of the collection that the operation was performed on.
*
* Only present when the `showExpandedEvents` flag is enabled.
*
* **NOTE:** collectionUUID will be converted to a NodeJS Buffer if the promoteBuffers
* flag is enabled.
*
* @since 6.1.0
*/
collectionUUID: Binary;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
}

/** @public */
export interface ChangeStreamDocumentOperationDescription {
/**
* An description of the operation.
*
* Only present when the `showExpandedEvents` flag is enabled.
*
* @since 6.1.0
*/
operationDescription?: Document;
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#insert-event
*/
export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema> {
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'insert';
/** This key will contain the document being inserted */
Expand All @@ -246,7 +288,8 @@ export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
*/
export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema> {
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'update';
/**
Expand Down Expand Up @@ -299,7 +342,8 @@ export interface ChangeStreamReplaceDocument<TSchema extends Document = Document
*/
export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema> {
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'delete';
/** Namespace the delete event occured on */
Expand All @@ -318,7 +362,9 @@ export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#drop-event
*/
export interface ChangeStreamDropDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamDropDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'drop';
/** Namespace the drop event occured on */
Expand All @@ -329,7 +375,9 @@ export interface ChangeStreamDropDocument extends ChangeStreamDocumentCommon {
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#rename-event
*/
export interface ChangeStreamRenameDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamRenameDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'rename';
/** The new name for the `ns.coll` collection */
Expand Down Expand Up @@ -358,6 +406,91 @@ export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentComm
operationType: 'invalidate';
}

/**
* Only present when the `showExpandedEvents` flag is enabled.
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamCreateIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'createIndexes';
}

/**
* Only present when the `showExpandedEvents` flag is enabled.
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamDropIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'dropIndexes';
}

/**
* Only present when the `showExpandedEvents` flag is enabled.
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamCollModDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'modify';
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamCreateDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'create';
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamShardCollectionDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'shardCollection';
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamReshardCollectionDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'reshardCollection';
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamRefineCollectionShardKeyDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'refineCollectionShardKey';
}

/** @public */
export type ChangeStreamDocument<TSchema extends Document = Document> =
| ChangeStreamInsertDocument<TSchema>
Expand All @@ -367,7 +500,14 @@ export type ChangeStreamDocument<TSchema extends Document = Document> =
| ChangeStreamDropDocument
| ChangeStreamRenameDocument
| ChangeStreamDropDatabaseDocument
| ChangeStreamInvalidateDocument;
| ChangeStreamInvalidateDocument
| ChangeStreamCreateIndexDocument
| ChangeStreamCreateDocument
| ChangeStreamCollModDocument
| ChangeStreamDropIndexDocument
| ChangeStreamShardCollectionDocument
| ChangeStreamReshardCollectionDocument
| ChangeStreamRefineCollectionShardKeyDocument;

/** @public */
export interface UpdateDescription<TSchema extends Document = Document> {
Expand Down
9 changes: 9 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,30 @@ export type { UnorderedBulkOperation } from './bulk/unordered';
export type {
ChangeStream,
ChangeStreamAggregateRawResult,
ChangeStreamCollModDocument,
ChangeStreamCreateDocument,
ChangeStreamCreateIndexDocument,
ChangeStreamCursor,
ChangeStreamCursorOptions,
ChangeStreamDeleteDocument,
ChangeStreamDocument,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentCommon,
ChangeStreamDocumentKey,
ChangeStreamDocumentOperationDescription,
ChangeStreamDropDatabaseDocument,
ChangeStreamDropDocument,
ChangeStreamDropIndexDocument,
ChangeStreamEvents,
ChangeStreamInsertDocument,
ChangeStreamInvalidateDocument,
ChangeStreamNameSpace,
ChangeStreamOptions,
ChangeStreamRefineCollectionShardKeyDocument,
ChangeStreamRenameDocument,
ChangeStreamReplaceDocument,
ChangeStreamReshardCollectionDocument,
ChangeStreamShardCollectionDocument,
ChangeStreamUpdateDocument,
OperationTime,
PipeOptions,
Expand Down