Skip to content

Commit

Permalink
feat(NODE-4196): add support for showExpandedEvents in change streams (
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Jun 3, 2022
1 parent ed50ef5 commit 9c1782e
Show file tree
Hide file tree
Showing 8 changed files with 1,104 additions and 79 deletions.
156 changes: 148 additions & 8 deletions src/change_stream.ts
Expand Up @@ -2,7 +2,7 @@ import Denque = require('denque');
import type { Readable } from 'stream';
import { setTimeout } from 'timers';

import type { Document, Long, Timestamp } from './bson';
import type { Binary, Document, Long, Timestamp } from './bson';
import { Collection } from './collection';
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
import {
Expand Down Expand Up @@ -52,7 +52,8 @@ const CHANGE_STREAM_OPTIONS = [
'startAfter',
'startAtOperationTime',
'fullDocument',
'fullDocumentBeforeChange'
'fullDocumentBeforeChange',
'showExpandedEvents'
] as const;

const CHANGE_DOMAIN_TYPES = {
Expand Down Expand Up @@ -176,6 +177,19 @@ export interface ChangeStreamOptions extends AggregateOptions {
* @see https://docs.mongodb.com/manual/reference/command/aggregate
*/
batchSize?: number;

/**
* When enabled, configures the change stream to include extra change events.
*
* - createIndexes
* - dropIndexes
* - modify
* - create
* - shardCollection
* - reshardCollection
* - refineCollectionShardKey
*/
showExpandedEvents?: boolean;
}

/** @public */
Expand Down Expand Up @@ -225,13 +239,41 @@ export interface ChangeStreamDocumentCommon {
lsid?: ServerSessionId;
}

/** @public */
export interface ChangeStreamDocumentCollectionUUID {
/**
* The UUID (Binary subtype 4) of the collection that the operation was performed on.
*
* Only present when the `showExpandedEvents` flag is enabled.
*
* **NOTE:** collectionUUID will be converted to a NodeJS Buffer if the promoteBuffers
* flag is enabled.
*
* @since 6.1.0
*/
collectionUUID: Binary;
}

/** @public */
export interface ChangeStreamDocumentOperationDescription {
/**
* An description of the operation.
*
* Only present when the `showExpandedEvents` flag is enabled.
*
* @since 6.1.0
*/
operationDescription?: Document;
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#insert-event
*/
export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema> {
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'insert';
/** This key will contain the document being inserted */
Expand All @@ -246,7 +288,8 @@ export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
*/
export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema> {
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'update';
/**
Expand Down Expand Up @@ -299,7 +342,8 @@ export interface ChangeStreamReplaceDocument<TSchema extends Document = Document
*/
export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema> {
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'delete';
/** Namespace the delete event occured on */
Expand All @@ -318,7 +362,9 @@ export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#drop-event
*/
export interface ChangeStreamDropDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamDropDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'drop';
/** Namespace the drop event occured on */
Expand All @@ -329,7 +375,9 @@ export interface ChangeStreamDropDocument extends ChangeStreamDocumentCommon {
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#rename-event
*/
export interface ChangeStreamRenameDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamRenameDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'rename';
/** The new name for the `ns.coll` collection */
Expand Down Expand Up @@ -358,6 +406,91 @@ export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentComm
operationType: 'invalidate';
}

/**
* Only present when the `showExpandedEvents` flag is enabled.
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamCreateIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'createIndexes';
}

/**
* Only present when the `showExpandedEvents` flag is enabled.
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamDropIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'dropIndexes';
}

/**
* Only present when the `showExpandedEvents` flag is enabled.
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamCollModDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'modify';
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamCreateDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'create';
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamShardCollectionDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'shardCollection';
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamReshardCollectionDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'reshardCollection';
}

/**
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/
*/
export interface ChangeStreamRefineCollectionShardKeyDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
/** Describes the type of operation represented in this change notification */
operationType: 'refineCollectionShardKey';
}

/** @public */
export type ChangeStreamDocument<TSchema extends Document = Document> =
| ChangeStreamInsertDocument<TSchema>
Expand All @@ -367,7 +500,14 @@ export type ChangeStreamDocument<TSchema extends Document = Document> =
| ChangeStreamDropDocument
| ChangeStreamRenameDocument
| ChangeStreamDropDatabaseDocument
| ChangeStreamInvalidateDocument;
| ChangeStreamInvalidateDocument
| ChangeStreamCreateIndexDocument
| ChangeStreamCreateDocument
| ChangeStreamCollModDocument
| ChangeStreamDropIndexDocument
| ChangeStreamShardCollectionDocument
| ChangeStreamReshardCollectionDocument
| ChangeStreamRefineCollectionShardKeyDocument;

/** @public */
export interface UpdateDescription<TSchema extends Document = Document> {
Expand Down
9 changes: 9 additions & 0 deletions src/index.ts
Expand Up @@ -171,21 +171,30 @@ export type { UnorderedBulkOperation } from './bulk/unordered';
export type {
ChangeStream,
ChangeStreamAggregateRawResult,
ChangeStreamCollModDocument,
ChangeStreamCreateDocument,
ChangeStreamCreateIndexDocument,
ChangeStreamCursor,
ChangeStreamCursorOptions,
ChangeStreamDeleteDocument,
ChangeStreamDocument,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentCommon,
ChangeStreamDocumentKey,
ChangeStreamDocumentOperationDescription,
ChangeStreamDropDatabaseDocument,
ChangeStreamDropDocument,
ChangeStreamDropIndexDocument,
ChangeStreamEvents,
ChangeStreamInsertDocument,
ChangeStreamInvalidateDocument,
ChangeStreamNameSpace,
ChangeStreamOptions,
ChangeStreamRefineCollectionShardKeyDocument,
ChangeStreamRenameDocument,
ChangeStreamReplaceDocument,
ChangeStreamReshardCollectionDocument,
ChangeStreamShardCollectionDocument,
ChangeStreamUpdateDocument,
OperationTime,
PipeOptions,
Expand Down

0 comments on commit 9c1782e

Please sign in to comment.