diff --git a/src/change_stream.ts b/src/change_stream.ts index 5ce07fda95..e268097bd6 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -2,7 +2,7 @@ import Denque = require('denque'); import type { Readable } from 'stream'; import { setTimeout } from 'timers'; -import type { Document, Long, Timestamp } from './bson'; +import type { Binary, Document, Long, Timestamp } from './bson'; import { Collection } from './collection'; import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants'; import { @@ -52,7 +52,8 @@ const CHANGE_STREAM_OPTIONS = [ 'startAfter', 'startAtOperationTime', 'fullDocument', - 'fullDocumentBeforeChange' + 'fullDocumentBeforeChange', + 'showExpandedEvents' ] as const; const CHANGE_DOMAIN_TYPES = { @@ -176,6 +177,19 @@ export interface ChangeStreamOptions extends AggregateOptions { * @see https://docs.mongodb.com/manual/reference/command/aggregate */ batchSize?: number; + + /** + * When enabled, configures the change stream to include extra change events. + * + * - createIndexes + * - dropIndexes + * - modify + * - create + * - shardCollection + * - reshardCollection + * - refineCollectionShardKey + */ + showExpandedEvents?: boolean; } /** @public */ @@ -225,13 +239,41 @@ export interface ChangeStreamDocumentCommon { lsid?: ServerSessionId; } +/** @public */ +export interface ChangeStreamDocumentCollectionUUID { + /** + * The UUID (Binary subtype 4) of the collection that the operation was performed on. + * + * Only present when the `showExpandedEvents` flag is enabled. + * + * **NOTE:** collectionUUID will be converted to a NodeJS Buffer if the promoteBuffers + * flag is enabled. + * + * @since 6.1.0 + */ + collectionUUID: Binary; +} + +/** @public */ +export interface ChangeStreamDocumentOperationDescription { + /** + * An description of the operation. + * + * Only present when the `showExpandedEvents` flag is enabled. + * + * @since 6.1.0 + */ + operationDescription?: Document; +} + /** * @public * @see https://www.mongodb.com/docs/manual/reference/change-events/#insert-event */ export interface ChangeStreamInsertDocument extends ChangeStreamDocumentCommon, - ChangeStreamDocumentKey { + ChangeStreamDocumentKey, + ChangeStreamDocumentCollectionUUID { /** Describes the type of operation represented in this change notification */ operationType: 'insert'; /** This key will contain the document being inserted */ @@ -246,7 +288,8 @@ export interface ChangeStreamInsertDocument */ export interface ChangeStreamUpdateDocument extends ChangeStreamDocumentCommon, - ChangeStreamDocumentKey { + ChangeStreamDocumentKey, + ChangeStreamDocumentCollectionUUID { /** Describes the type of operation represented in this change notification */ operationType: 'update'; /** @@ -299,7 +342,8 @@ export interface ChangeStreamReplaceDocument extends ChangeStreamDocumentCommon, - ChangeStreamDocumentKey { + ChangeStreamDocumentKey, + ChangeStreamDocumentCollectionUUID { /** Describes the type of operation represented in this change notification */ operationType: 'delete'; /** Namespace the delete event occured on */ @@ -318,7 +362,9 @@ export interface ChangeStreamDeleteDocument * @public * @see https://www.mongodb.com/docs/manual/reference/change-events/#drop-event */ -export interface ChangeStreamDropDocument extends ChangeStreamDocumentCommon { +export interface ChangeStreamDropDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentCollectionUUID { /** Describes the type of operation represented in this change notification */ operationType: 'drop'; /** Namespace the drop event occured on */ @@ -329,7 +375,9 @@ export interface ChangeStreamDropDocument extends ChangeStreamDocumentCommon { * @public * @see https://www.mongodb.com/docs/manual/reference/change-events/#rename-event */ -export interface ChangeStreamRenameDocument extends ChangeStreamDocumentCommon { +export interface ChangeStreamRenameDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentCollectionUUID { /** Describes the type of operation represented in this change notification */ operationType: 'rename'; /** The new name for the `ns.coll` collection */ @@ -358,6 +406,91 @@ export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentComm operationType: 'invalidate'; } +/** + * Only present when the `showExpandedEvents` flag is enabled. + * @public + * @see https://www.mongodb.com/docs/manual/reference/change-events/ + */ +export interface ChangeStreamCreateIndexDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentOperationDescription { + /** Describes the type of operation represented in this change notification */ + operationType: 'createIndexes'; +} + +/** + * Only present when the `showExpandedEvents` flag is enabled. + * @public + * @see https://www.mongodb.com/docs/manual/reference/change-events/ + */ +export interface ChangeStreamDropIndexDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentOperationDescription { + /** Describes the type of operation represented in this change notification */ + operationType: 'dropIndexes'; +} + +/** + * Only present when the `showExpandedEvents` flag is enabled. + * @public + * @see https://www.mongodb.com/docs/manual/reference/change-events/ + */ +export interface ChangeStreamCollModDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentCollectionUUID { + /** Describes the type of operation represented in this change notification */ + operationType: 'modify'; +} + +/** + * @public + * @see https://www.mongodb.com/docs/manual/reference/change-events/ + */ +export interface ChangeStreamCreateDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentCollectionUUID { + /** Describes the type of operation represented in this change notification */ + operationType: 'create'; +} + +/** + * @public + * @see https://www.mongodb.com/docs/manual/reference/change-events/ + */ +export interface ChangeStreamShardCollectionDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentOperationDescription { + /** Describes the type of operation represented in this change notification */ + operationType: 'shardCollection'; +} + +/** + * @public + * @see https://www.mongodb.com/docs/manual/reference/change-events/ + */ +export interface ChangeStreamReshardCollectionDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentOperationDescription { + /** Describes the type of operation represented in this change notification */ + operationType: 'reshardCollection'; +} + +/** + * @public + * @see https://www.mongodb.com/docs/manual/reference/change-events/ + */ +export interface ChangeStreamRefineCollectionShardKeyDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentOperationDescription { + /** Describes the type of operation represented in this change notification */ + operationType: 'refineCollectionShardKey'; +} + /** @public */ export type ChangeStreamDocument = | ChangeStreamInsertDocument @@ -367,7 +500,14 @@ export type ChangeStreamDocument = | ChangeStreamDropDocument | ChangeStreamRenameDocument | ChangeStreamDropDatabaseDocument - | ChangeStreamInvalidateDocument; + | ChangeStreamInvalidateDocument + | ChangeStreamCreateIndexDocument + | ChangeStreamCreateDocument + | ChangeStreamCollModDocument + | ChangeStreamDropIndexDocument + | ChangeStreamShardCollectionDocument + | ChangeStreamReshardCollectionDocument + | ChangeStreamRefineCollectionShardKeyDocument; /** @public */ export interface UpdateDescription { diff --git a/src/index.ts b/src/index.ts index b074b2f8d0..410af2ee8d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -171,21 +171,30 @@ export type { UnorderedBulkOperation } from './bulk/unordered'; export type { ChangeStream, ChangeStreamAggregateRawResult, + ChangeStreamCollModDocument, + ChangeStreamCreateDocument, + ChangeStreamCreateIndexDocument, ChangeStreamCursor, ChangeStreamCursorOptions, ChangeStreamDeleteDocument, ChangeStreamDocument, + ChangeStreamDocumentCollectionUUID, ChangeStreamDocumentCommon, ChangeStreamDocumentKey, + ChangeStreamDocumentOperationDescription, ChangeStreamDropDatabaseDocument, ChangeStreamDropDocument, + ChangeStreamDropIndexDocument, ChangeStreamEvents, ChangeStreamInsertDocument, ChangeStreamInvalidateDocument, ChangeStreamNameSpace, ChangeStreamOptions, + ChangeStreamRefineCollectionShardKeyDocument, ChangeStreamRenameDocument, ChangeStreamReplaceDocument, + ChangeStreamReshardCollectionDocument, + ChangeStreamShardCollectionDocument, ChangeStreamUpdateDocument, OperationTime, PipeOptions, diff --git a/test/spec/change-streams/unified/change-streams-showExpandedEvents.json b/test/spec/change-streams/unified/change-streams-showExpandedEvents.json new file mode 100644 index 0000000000..9dd6f4d04d --- /dev/null +++ b/test/spec/change-streams/unified/change-streams-showExpandedEvents.json @@ -0,0 +1,507 @@ +{ + "description": "change-streams-showExpandedEvents", + "schemaVersion": "1.7", + "runOnRequirements": [ + { + "minServerVersion": "6.0.0", + "topologies": [ + "replicaset", + "sharded-replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent" + ], + "ignoreCommandMonitoringEvents": [ + "killCursors" + ], + "useMultipleMongoses": false + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + }, + { + "database": { + "id": "database1", + "client": "client0", + "databaseName": "database1" + } + }, + { + "collection": { + "id": "collection1", + "database": "database1", + "collectionName": "collection1" + } + }, + { + "database": { + "id": "shardedDb", + "client": "client0", + "databaseName": "shardedDb" + } + }, + { + "database": { + "id": "adminDb", + "client": "client0", + "databaseName": "admin" + } + }, + { + "collection": { + "id": "shardedCollection", + "database": "shardedDb", + "collectionName": "shardedCollection" + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [] + } + ], + "tests": [ + { + "description": "when provided, showExpandedEvents is sent as a part of the aggregate command", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + } + ], + "expectEvents": [ + { + "client": "client0", + "ignoreExtraEvents": true, + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "showExpandedEvents": true + } + } + ] + }, + "commandName": "aggregate", + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "when omitted, showExpandedEvents is not sent as a part of the aggregate command", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [] + }, + "saveResultAsEntity": "changeStream0" + } + ], + "expectEvents": [ + { + "client": "client0", + "ignoreExtraEvents": true, + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "showExpandedEvents": { + "$$exists": false + } + } + } + ] + }, + "commandName": "aggregate", + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "when showExpandedEvents is true, new fields on change stream events are handled appropriately", + "operations": [ + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "a": 1 + } + } + }, + { + "name": "createIndex", + "object": "collection0", + "arguments": { + "keys": { + "x": 1 + }, + "name": "x_1" + } + }, + { + "name": "rename", + "object": "collection0", + "arguments": { + "to": "foo", + "dropTarget": true + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "collectionUUID": { + "$$exists": true + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "createIndexes", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "operationDescription": { + "$$exists": true + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "rename", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "to": { + "db": "database0", + "coll": "foo" + }, + "operationDescription": { + "dropTarget": { + "$$exists": true + }, + "to": { + "db": "database0", + "coll": "foo" + } + } + } + } + ] + }, + { + "description": "when showExpandedEvents is true, createIndex events are reported", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "createIndex", + "object": "collection0", + "arguments": { + "keys": { + "x": 1 + }, + "name": "x_1" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "createIndexes" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, dropIndexes events are reported", + "operations": [ + { + "name": "createIndex", + "object": "collection0", + "arguments": { + "keys": { + "x": 1 + }, + "name": "x_1" + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "dropIndex", + "object": "collection0", + "arguments": { + "name": "x_1" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "dropIndexes" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, create events are reported", + "operations": [ + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createChangeStream", + "object": "database0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "createCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "create" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, create events on views are reported", + "operations": [ + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createChangeStream", + "object": "database0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "createCollection", + "object": "database0", + "arguments": { + "collection": "foo", + "viewOn": "testName" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "create" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, modify events are reported", + "operations": [ + { + "name": "createIndex", + "object": "collection0", + "arguments": { + "keys": { + "x": 1 + }, + "name": "x_2" + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "runCommand", + "object": "database0", + "arguments": { + "command": { + "collMod": "collection0" + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "modify" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, shardCollection events are reported", + "runOnRequirements": [ + { + "topologies": [ + "sharded-replicaset", + "sharded" + ] + } + ], + "operations": [ + { + "name": "dropCollection", + "object": "shardedDb", + "arguments": { + "collection": "shardedCollection" + } + }, + { + "name": "createCollection", + "object": "shardedDb", + "arguments": { + "collection": "shardedCollection" + } + }, + { + "name": "createChangeStream", + "object": "shardedCollection", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "runCommand", + "object": "adminDb", + "arguments": { + "command": { + "shardCollection": "shardedDb.shardedCollection", + "key": { + "_id": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "shardCollection" + } + } + ] + } + ] +} diff --git a/test/spec/change-streams/unified/change-streams-showExpandedEvents.yml b/test/spec/change-streams/unified/change-streams-showExpandedEvents.yml new file mode 100644 index 0000000000..18f5fe6394 --- /dev/null +++ b/test/spec/change-streams/unified/change-streams-showExpandedEvents.yml @@ -0,0 +1,295 @@ +description: "change-streams-showExpandedEvents" +schemaVersion: "1.7" +runOnRequirements: + - minServerVersion: "6.0.0" + topologies: [ replicaset, sharded-replicaset, sharded ] +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent ] + ignoreCommandMonitoringEvents: [ killCursors ] + useMultipleMongoses: false + - database: + id: &database0 database0 + client: *client0 + databaseName: *database0 + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: *collection0 + - database: + id: &database1 database1 + client: *client0 + databaseName: *database1 + - collection: + id: &collection1 collection1 + database: *database1 + collectionName: *collection1 + - database: + id: &shardedDb shardedDb + client: *client0 + databaseName: *shardedDb + - database: + id: &adminDb adminDb + client: *client0 + databaseName: admin + - collection: + id: &shardedCollection shardedCollection + database: *shardedDb + collectionName: *shardedCollection + +initialData: + - collectionName: *collection0 + databaseName: *database0 + documents: [] + +tests: + - description: "when provided, showExpandedEvents is sent as a part of the aggregate command" + operations: + - name: createChangeStream + object: *collection0 + arguments: + pipeline: [] + showExpandedEvents: true + saveResultAsEntity: &changeStream0 changeStream0 + expectEvents: + - client: *client0 + ignoreExtraEvents: true + events: + - commandStartedEvent: + command: + aggregate: *collection0 + cursor: {} + pipeline: + - $changeStream: + showExpandedEvents: true + commandName: aggregate + databaseName: *database0 + + - description: "when omitted, showExpandedEvents is not sent as a part of the aggregate command" + operations: + - name: createChangeStream + object: *collection0 + arguments: + pipeline: [] + saveResultAsEntity: &changeStream0 changeStream0 + expectEvents: + - client: *client0 + ignoreExtraEvents: true + events: + - commandStartedEvent: + command: + aggregate: *collection0 + cursor: {} + pipeline: + - $changeStream: + showExpandedEvents: + $$exists: false + commandName: aggregate + databaseName: *database0 + + - description: "when showExpandedEvents is true, new fields on change stream events are handled appropriately" + operations: + - name: dropCollection + object: *database0 + arguments: + collection: &existing-collection foo + - name: createCollection + object: *database0 + arguments: + collection: *existing-collection + - name: createChangeStream + object: *collection0 + arguments: + pipeline: [] + showExpandedEvents: true + saveResultAsEntity: &changeStream0 changeStream0 + - name: insertOne + object: *collection0 + arguments: + document: + a: 1 + - name: createIndex + object: *collection0 + arguments: + keys: + x: 1 + name: x_1 + - name: rename + object: *collection0 + arguments: + to: *existing-collection + dropTarget: true + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: insert + ns: + db: *database0 + coll: *collection0 + collectionUUID: + $$exists: true + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: createIndexes + ns: + db: *database0 + coll: *collection0 + operationDescription: + $$exists: true + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: rename + ns: + db: *database0 + coll: *collection0 + to: + db: *database0 + coll: *existing-collection + operationDescription: + dropTarget: + $$exists: true + to: + db: *database0 + coll: *existing-collection + + - description: "when showExpandedEvents is true, createIndex events are reported" + operations: + - name: createChangeStream + object: *collection0 + arguments: + pipeline: [] + showExpandedEvents: true + saveResultAsEntity: &changeStream0 changeStream0 + - name: createIndex + object: *collection0 + arguments: + keys: + x: 1 + name: x_1 + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: createIndexes + + - description: "when showExpandedEvents is true, dropIndexes events are reported" + operations: + - name: createIndex + object: *collection0 + arguments: + keys: + x: 1 + name: &index1 x_1 + - name: createChangeStream + object: *collection0 + arguments: + pipeline: [] + showExpandedEvents: true + saveResultAsEntity: &changeStream0 changeStream0 + - name: dropIndex + object: *collection0 + arguments: + name: *index1 + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: dropIndexes + + - description: "when showExpandedEvents is true, create events are reported" + operations: + - name: dropCollection + object: *database0 + arguments: + collection: &collection1 foo + - name: createChangeStream + object: *database0 + arguments: + pipeline: [] + showExpandedEvents: true + saveResultAsEntity: &changeStream0 changeStream0 + - name: createCollection + object: *database0 + arguments: + collection: *collection1 + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: create + + - description: "when showExpandedEvents is true, create events on views are reported" + operations: + - name: dropCollection + object: *database0 + arguments: + collection: &collection1 foo + - name: createChangeStream + object: *database0 + arguments: + pipeline: [] + showExpandedEvents: true + saveResultAsEntity: &changeStream0 changeStream0 + - name: createCollection + object: *database0 + arguments: + collection: *collection1 + viewOn: testName + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: create + + - description: "when showExpandedEvents is true, modify events are reported" + operations: + - name: createIndex + object: *collection0 + arguments: + keys: + x: 1 + name: &index2 x_2 + - name: createChangeStream + object: *collection0 + arguments: + pipeline: [] + showExpandedEvents: true + saveResultAsEntity: &changeStream0 changeStream0 + - name: runCommand + object: *database0 + arguments: + command: + collMod: *collection0 + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: modify + + - description: "when showExpandedEvents is true, shardCollection events are reported" + runOnRequirements: + - topologies: [ sharded-replicaset, sharded ] + operations: + - name: dropCollection + object: *shardedDb + arguments: + collection: *shardedCollection + - name: createCollection + object: *shardedDb + arguments: + collection: *shardedCollection + - name: createChangeStream + object: *shardedCollection + arguments: + pipeline: [] + showExpandedEvents: true + saveResultAsEntity: &changeStream0 changeStream0 + - name: runCommand + object: *adminDb + arguments: + command: + shardCollection: shardedDb.shardedCollection + key: + _id: 1 + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: shardCollection diff --git a/test/spec/change-streams/unified/change-streams.json b/test/spec/change-streams/unified/change-streams.json index eb8dd73af7..599905aaaa 100644 --- a/test/spec/change-streams/unified/change-streams.json +++ b/test/spec/change-streams/unified/change-streams.json @@ -5,8 +5,7 @@ { "minServerVersion": "3.6", "topologies": [ - "replicaset", - "sharded-replicaset" + "replicaset" ] } ], @@ -313,10 +312,7 @@ "description": "Test that comment is set on getMore", "runOnRequirements": [ { - "minServerVersion": "4.4.0", - "topologies": [ - "replicaset" - ] + "minServerVersion": "4.4.0" } ], "operations": [ @@ -404,10 +400,7 @@ "description": "Test that comment is not set on getMore - pre 4.4", "runOnRequirements": [ { - "maxServerVersion": "4.3.99", - "topologies": [ - "replicaset" - ] + "maxServerVersion": "4.3.99" } ], "operations": [ @@ -796,10 +789,7 @@ "description": "$changeStream must be the first stage in a change stream pipeline sent to the server", "runOnRequirements": [ { - "minServerVersion": "3.6.0", - "topologies": [ - "replicaset" - ] + "minServerVersion": "3.6.0" } ], "operations": [ @@ -872,10 +862,7 @@ "description": "The server returns change stream responses in the specified server response format", "runOnRequirements": [ { - "minServerVersion": "3.6.0", - "topologies": [ - "replicaset" - ] + "minServerVersion": "3.6.0" } ], "operations": [ @@ -925,10 +912,7 @@ "description": "Executing a watch helper on a Collection results in notifications for changes to the specified collection", "runOnRequirements": [ { - "minServerVersion": "3.6.0", - "topologies": [ - "replicaset" - ] + "minServerVersion": "3.6.0" } ], "operations": [ @@ -1013,10 +997,7 @@ "description": "Change Stream should allow valid aggregate pipeline stages", "runOnRequirements": [ { - "minServerVersion": "3.6.0", - "topologies": [ - "replicaset" - ] + "minServerVersion": "3.6.0" } ], "operations": [ @@ -1103,10 +1084,7 @@ "description": "Executing a watch helper on a Database results in notifications for changes to all collections in the specified database.", "runOnRequirements": [ { - "minServerVersion": "3.8.0", - "topologies": [ - "replicaset" - ] + "minServerVersion": "3.8.0" } ], "operations": [ @@ -1208,10 +1186,7 @@ "description": "Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster.", "runOnRequirements": [ { - "minServerVersion": "3.8.0", - "topologies": [ - "replicaset" - ] + "minServerVersion": "3.8.0" } ], "operations": [ @@ -1332,10 +1307,7 @@ "description": "Test insert, update, replace, and delete event types", "runOnRequirements": [ { - "minServerVersion": "3.6.0", - "topologies": [ - "replicaset" - ] + "minServerVersion": "3.6.0" } ], "operations": [ @@ -1487,10 +1459,7 @@ "description": "Test rename and invalidate event types", "runOnRequirements": [ { - "minServerVersion": "4.0.1", - "topologies": [ - "replicaset" - ] + "minServerVersion": "4.0.1" } ], "operations": [ @@ -1567,10 +1536,7 @@ "description": "Test drop and invalidate event types", "runOnRequirements": [ { - "minServerVersion": "4.0.1", - "topologies": [ - "replicaset" - ] + "minServerVersion": "4.0.1" } ], "operations": [ @@ -1636,10 +1602,7 @@ "description": "Test consecutive resume", "runOnRequirements": [ { - "minServerVersion": "4.1.7", - "topologies": [ - "replicaset" - ] + "minServerVersion": "4.1.7" } ], "operations": [ @@ -1775,6 +1738,48 @@ ] } ] + }, + { + "description": "Test wallTime field is set in a change event", + "runOnRequirements": [ + { + "minServerVersion": "6.0.0" + } + ], + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "wallTime": { + "$$exists": true + } + } + } + ] } ] } diff --git a/test/spec/change-streams/unified/change-streams.yml b/test/spec/change-streams/unified/change-streams.yml index 65554a285a..3f0d14c096 100644 --- a/test/spec/change-streams/unified/change-streams.yml +++ b/test/spec/change-streams/unified/change-streams.yml @@ -1,8 +1,13 @@ description: "change-streams" + schemaVersion: "1.7" + runOnRequirements: - minServerVersion: "3.6" - topologies: [ replicaset, sharded-replicaset ] + # TODO(DRIVERS-2323): Run all possible tests against sharded clusters once we know the + # cause of unexpected command monitoring events. + topologies: [ replicaset ] + createEntities: - client: id: &client0 client0 @@ -175,10 +180,6 @@ tests: - description: "Test that comment is set on getMore" runOnRequirements: - minServerVersion: "4.4.0" - # Topologies are limited because of potentially empty getMore responses - # on sharded clusters. - # See https://jira.mongodb.org/browse/DRIVERS-2248 for more details. - topologies: [ replicaset ] operations: - name: createChangeStream object: *collection0 @@ -220,10 +221,6 @@ tests: - description: "Test that comment is not set on getMore - pre 4.4" runOnRequirements: - maxServerVersion: "4.3.99" - # Topologies are limited because of potentially empty getMore responses - # on sharded clusters. - # See https://jira.mongodb.org/browse/DRIVERS-2248 for more details. - topologies: [ replicaset ] operations: - name: createChangeStream object: *collection0 @@ -330,6 +327,10 @@ tests: newField: "newFieldValue" - description: "Test new structure in ns document MUST NOT err" + runOnRequirements: + - minServerVersion: "3.6" + maxServerVersion: "5.2.99" + - minServerVersion: "6.0" operations: - name: createChangeStream object: *collection0 @@ -413,7 +414,6 @@ tests: - description: $changeStream must be the first stage in a change stream pipeline sent to the server runOnRequirements: - minServerVersion: "3.6.0" - topologies: [ replicaset ] operations: - name: createChangeStream object: *collection0 @@ -450,7 +450,6 @@ tests: - description: The server returns change stream responses in the specified server response format runOnRequirements: - minServerVersion: "3.6.0" - topologies: [ replicaset ] operations: - name: createChangeStream object: *collection0 @@ -476,7 +475,6 @@ tests: - description: Executing a watch helper on a Collection results in notifications for changes to the specified collection runOnRequirements: - minServerVersion: "3.6.0" - topologies: [ replicaset ] operations: - name: createChangeStream object: *collection0 @@ -519,7 +517,6 @@ tests: - description: Change Stream should allow valid aggregate pipeline stages runOnRequirements: - minServerVersion: "3.6.0" - topologies: [ replicaset ] operations: - name: createChangeStream object: *collection0 @@ -564,7 +561,6 @@ tests: - description: Executing a watch helper on a Database results in notifications for changes to all collections in the specified database. runOnRequirements: - minServerVersion: "3.8.0" - topologies: [ replicaset ] operations: - name: createChangeStream object: *database0 @@ -617,7 +613,6 @@ tests: - description: Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster. runOnRequirements: - minServerVersion: "3.8.0" - topologies: [ replicaset ] operations: - name: createChangeStream object: *client0 @@ -681,7 +676,6 @@ tests: - description: "Test insert, update, replace, and delete event types" runOnRequirements: - minServerVersion: "3.6.0" - topologies: [ replicaset ] operations: - name: createChangeStream object: *collection0 @@ -759,7 +753,6 @@ tests: - description: Test rename and invalidate event types runOnRequirements: - minServerVersion: "4.0.1" - topologies: [ replicaset ] operations: - name: createChangeStream object: *collection0 @@ -802,7 +795,6 @@ tests: - description: Test drop and invalidate event types runOnRequirements: - minServerVersion: "4.0.1" - topologies: [ replicaset ] operations: - name: createChangeStream object: *collection0 @@ -836,11 +828,10 @@ tests: databaseName: *database0 # Test that resume logic works correctly even after consecutive retryable failures of a getMore command, - # with no intervening events. This is ensured by setting the batch size of the change stream to 1, + # with no intervening events. This is ensured by setting the batch size of the change stream to 1, - description: Test consecutive resume runOnRequirements: - minServerVersion: "4.1.7" - topologies: [ replicaset ] operations: - name: failPoint object: testRunner @@ -912,3 +903,24 @@ tests: pipeline: [ { $changeStream: {} } ] commandName: aggregate databaseName: *database0 + + - description: "Test wallTime field is set in a change event" + runOnRequirements: + - minServerVersion: "6.0.0" + operations: + - name: createChangeStream + object: *collection0 + arguments: { pipeline: [] } + saveResultAsEntity: &changeStream0 changeStream0 + - name: insertOne + object: *collection0 + arguments: + document: { "_id": 1, "a": 1 } + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: "insert" + ns: + db: *database0 + coll: *collection0 + wallTime: { $$exists: true } diff --git a/test/tools/unified-spec-runner/operations.ts b/test/tools/unified-spec-runner/operations.ts index 8b5ec24b57..a3aa393d73 100644 --- a/test/tools/unified-spec-runner/operations.ts +++ b/test/tools/unified-spec-runner/operations.ts @@ -236,6 +236,12 @@ operations.set('createIndex', async ({ entities, operation }) => { await collection.createIndex(keys, opts); }); +operations.set('dropIndex', async ({ entities, operation }) => { + const collection = entities.getEntity('collection', operation.object); + const { name, ...opts } = operation.arguments; + await collection.dropIndex(name, opts); +}); + operations.set('deleteOne', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); const { filter, ...options } = operation.arguments; diff --git a/test/types/change_stream.test-d.ts b/test/types/change_stream.test-d.ts index 996c4d41b0..5f36d10d99 100644 --- a/test/types/change_stream.test-d.ts +++ b/test/types/change_stream.test-d.ts @@ -1,12 +1,16 @@ import { expectError, expectType } from 'tsd'; import type { + ChangeStreamCollModDocument, + ChangeStreamCreateDocument, + ChangeStreamCreateIndexDocument, ChangeStreamDeleteDocument, ChangeStreamDocument, ChangeStreamDocumentCommon, ChangeStreamDocumentKey, ChangeStreamDropDatabaseDocument, ChangeStreamDropDocument, + ChangeStreamDropIndexDocument, ChangeStreamInsertDocument, ChangeStreamInvalidateDocument, ChangeStreamNameSpace, @@ -21,6 +25,11 @@ import type { Timestamp, UpdateDescription } from '../../src'; +import type { + ChangeStreamRefineCollectionShardKeyDocument, + ChangeStreamReshardCollectionDocument, + ChangeStreamShardCollectionDocument +} from '../../src/change_stream'; declare const changeStreamOptions: ChangeStreamOptions; type ChangeStreamOperationType = @@ -31,7 +40,14 @@ type ChangeStreamOperationType = | 'invalidate' | 'drop' | 'dropDatabase' - | 'rename'; + | 'rename' + | 'create' + | 'modify' + | 'createIndexes' + | 'dropIndexes' + | 'shardCollection' + | 'reshardCollection' + | 'refineCollectionShardKey'; // The change stream spec says that we cannot throw an error for invalid values to `fullDocument` // for future compatibility. This means we must leave `fullDocument` as type string. @@ -122,6 +138,41 @@ switch (change.operationType) { expectType<'invalidate'>(change.operationType); break; } + case 'create': { + expectType(change); + expectType<'create'>(change.operationType); + break; + } + case 'modify': { + expectType(change); + expectType<'modify'>(change.operationType); + break; + } + case 'createIndexes': { + expectType(change); + expectType<'createIndexes'>(change.operationType); + break; + } + case 'dropIndexes': { + expectType(change); + expectType<'dropIndexes'>(change.operationType); + break; + } + case 'shardCollection': { + expectType(change); + expectType<'shardCollection'>(change.operationType); + break; + } + case 'reshardCollection': { + expectType(change); + expectType<'reshardCollection'>(change.operationType); + break; + } + case 'refineCollectionShardKey': { + expectType(change); + expectType<'refineCollectionShardKey'>(change.operationType); + break; + } default: { expectType(change); }