diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 4afb066622..077fd698f3 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -1,5 +1,12 @@ import { PromiseProvider } from '../promise_provider'; -import { Long, ObjectId, Document, BSONSerializeOptions, resolveBSONOptions } from '../bson'; +import { + Long, + ObjectId, + Document, + BSONSerializeOptions, + resolveBSONOptions, + Timestamp +} from '../bson'; import { MongoWriteConcernError, AnyError, @@ -433,8 +440,13 @@ export class WriteError { } } +/** Converts the number to a Long or returns it. */ +function longOrConvert(value: number | Long | Timestamp): Long | Timestamp { + return typeof value === 'number' ? Long.fromNumber(value) : value; +} + /** Merges results into shared data structure */ -function mergeBatchResults( +export function mergeBatchResults( batch: Batch, bulkResult: BulkResult, err?: AnyError, @@ -469,42 +481,37 @@ function mergeBatchResults( return; } - // Deal with opTime if available + // The server write command specification states that lastOp is an optional + // mongod only field that has a type of timestamp. Across various scarce specs + // where opTime is mentioned, it is an "opaque" object that can have a "ts" and + // "t" field with Timestamp and Long as their types respectively. + // The "lastOp" field of the bulk write result is never mentioned in the driver + // specifications or the bulk write spec, so we should probably just keep its + // value consistent since it seems to vary. + // See: https://github.com/mongodb/specifications/blob/master/source/driver-bulk-update.rst#results-object if (result.opTime || result.lastOp) { - const opTime = result.lastOp || result.opTime; - let lastOpTS = null; - let lastOpT = null; - - // We have a time stamp - if (opTime && opTime._bsontype === 'Timestamp') { - if (bulkResult.opTime == null) { - bulkResult.opTime = opTime; - } else if (opTime.greaterThan(bulkResult.opTime)) { - bulkResult.opTime = opTime; - } - } else { - // Existing TS - if (bulkResult.opTime) { - lastOpTS = - typeof bulkResult.opTime.ts === 'number' - ? Long.fromNumber(bulkResult.opTime.ts) - : bulkResult.opTime.ts; - lastOpT = - typeof bulkResult.opTime.t === 'number' - ? Long.fromNumber(bulkResult.opTime.t) - : bulkResult.opTime.t; - } + let opTime = result.lastOp || result.opTime; - // Current OpTime TS - const opTimeTS = typeof opTime.ts === 'number' ? Long.fromNumber(opTime.ts) : opTime.ts; - const opTimeT = typeof opTime.t === 'number' ? Long.fromNumber(opTime.t) : opTime.t; + // If the opTime is a Timestamp, convert it to a consistent format to be + // able to compare easily. Converting to the object from a timestamp is + // much more straightforward than the other direction. + if (opTime._bsontype === 'Timestamp') { + opTime = { ts: opTime, t: Long.ZERO }; + } - // Compare the opTime's - if (bulkResult.opTime == null) { - bulkResult.opTime = opTime; - } else if (opTimeTS.greaterThan(lastOpTS)) { + // If there's no lastOp, just set it. + if (!bulkResult.opTime) { + bulkResult.opTime = opTime; + } else { + // First compare the ts values and set if the opTimeTS value is greater. + const lastOpTS = longOrConvert(bulkResult.opTime.ts); + const opTimeTS = longOrConvert(opTime.ts); + if (opTimeTS.greaterThan(lastOpTS)) { bulkResult.opTime = opTime; } else if (opTimeTS.equals(lastOpTS)) { + // If the ts values are equal, then compare using the t values. + const lastOpT = longOrConvert(bulkResult.opTime.t); + const opTimeT = longOrConvert(opTime.t); if (opTimeT.greaterThan(lastOpT)) { bulkResult.opTime = opTime; } diff --git a/test/unit/bulk/common.test.js b/test/unit/bulk/common.test.js new file mode 100644 index 0000000000..5e50a5a202 --- /dev/null +++ b/test/unit/bulk/common.test.js @@ -0,0 +1,129 @@ +'use strict'; + +const { expect } = require('chai'); +const { mergeBatchResults } = require('../../../src/bulk/common'); +const { Timestamp, Long } = require('../../../src/bson'); + +describe('bulk/common', function () { + describe('#mergeBatchResults', function () { + let opTime; + let lastOp; + const bulkResult = { + ok: 1, + writeErrors: [], + writeConcernErrors: [], + insertedIds: [], + nInserted: 0, + nUpserted: 0, + nMatched: 0, + nModified: 0, + nRemoved: 1, + upserted: [] + }; + const result = { + n: 8, + nModified: 8, + electionId: '7fffffff0000000000000028', + ok: 1, + $clusterTime: { + clusterTime: '7020546605669417498', + signature: { + hash: 'AAAAAAAAAAAAAAAAAAAAAAAAAAA=', + keyId: 0 + } + }, + operationTime: '7020546605669417498' + }; + const batch = []; + + context('when lastOp is an object', function () { + context('when the opTime is a Timestamp', function () { + before(function () { + lastOp = { ts: 7020546605669417496, t: 10 }; + opTime = Timestamp.fromNumber(8020546605669417496); + bulkResult.opTime = lastOp; + result.opTime = opTime; + }); + + it('replaces the opTime with the properly formatted object', function () { + mergeBatchResults(batch, bulkResult, null, result); + expect(bulkResult.opTime).to.deep.equal({ ts: opTime, t: Long.ZERO }); + }); + }); + + context('when the opTime is an object', function () { + context('when the ts is greater', function () { + before(function () { + lastOp = { ts: 7020546605669417496, t: 10 }; + opTime = { ts: 7020546605669417497, t: 10 }; + bulkResult.opTime = lastOp; + result.opTime = opTime; + }); + + it('replaces the opTime with the new opTime', function () { + mergeBatchResults(batch, bulkResult, null, result); + expect(bulkResult.opTime).to.deep.equal(opTime); + }); + }); + + context('when the ts is equal', function () { + context('when the t is greater', function () { + before(function () { + lastOp = { ts: 7020546605669417496, t: 10 }; + opTime = { ts: 7020546605669417496, t: 20 }; + bulkResult.opTime = lastOp; + result.opTime = opTime; + }); + + it('replaces the opTime with the new opTime', function () { + mergeBatchResults(batch, bulkResult, null, result); + expect(bulkResult.opTime).to.deep.equal(opTime); + }); + }); + + context('when the t is equal', function () { + before(function () { + lastOp = { ts: 7020546605669417496, t: 10 }; + opTime = { ts: 7020546605669417496, t: 10 }; + bulkResult.opTime = lastOp; + result.opTime = opTime; + }); + + it('does not replace the opTime with the new opTime', function () { + mergeBatchResults(batch, bulkResult, null, result); + expect(bulkResult.opTime).to.deep.equal(lastOp); + }); + }); + + context('when the t is less', function () { + before(function () { + lastOp = { ts: 7020546605669417496, t: 10 }; + opTime = { ts: 7020546605669417496, t: 5 }; + bulkResult.opTime = lastOp; + result.opTime = opTime; + }); + + it('does not replace the opTime with the new opTime', function () { + mergeBatchResults(batch, bulkResult, null, result); + expect(bulkResult.opTime).to.deep.equal(lastOp); + }); + }); + }); + + context('when the ts is less', function () { + before(function () { + lastOp = { ts: 7020546605669417496, t: 10 }; + opTime = { ts: 7020546605669417495, t: 10 }; + bulkResult.opTime = lastOp; + result.opTime = opTime; + }); + + it('does not replace the opTime with the new opTime', function () { + mergeBatchResults(batch, bulkResult, null, result); + expect(bulkResult.opTime).to.deep.equal(lastOp); + }); + }); + }); + }); + }); +});