Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-3515): do proper opTime merging in bulk results #3012

Merged
merged 5 commits into from Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 40 additions & 33 deletions src/bulk/common.ts
@@ -1,5 +1,12 @@
import { PromiseProvider } from '../promise_provider';
import { Long, ObjectId, Document, BSONSerializeOptions, resolveBSONOptions } from '../bson';
import {
Long,
ObjectId,
Document,
BSONSerializeOptions,
resolveBSONOptions,
Timestamp
} from '../bson';
import {
MongoWriteConcernError,
AnyError,
Expand Down Expand Up @@ -433,8 +440,13 @@ export class WriteError {
}
}

/** Converts the number to a Long or returns it. */
function longOrConvert(value: number | Long | Timestamp): Long | Timestamp {
return typeof value === 'number' ? Long.fromNumber(value) : value;
}

/** Merges results into shared data structure */
function mergeBatchResults(
export function mergeBatchResults(
batch: Batch,
bulkResult: BulkResult,
err?: AnyError,
Expand Down Expand Up @@ -469,42 +481,37 @@ function mergeBatchResults(
return;
}

// Deal with opTime if available
// The server write command specification states that lastOp is an optional
// mongod only field that has a type of timestamp. Across various scarce specs
// where opTime is mentioned, it is an "opaque" object that can have a "ts" and
// "t" field with Timestamp and Long as their types respectively.
// The "lastOp" field of the bulk write result is never mentioned in the driver
// specifications or the bulk write spec, so we should probably just keep its
// value consistent since it seems to vary.
// See: https://github.com/mongodb/specifications/blob/master/source/driver-bulk-update.rst#results-object
if (result.opTime || result.lastOp) {
const opTime = result.lastOp || result.opTime;
let lastOpTS = null;
let lastOpT = null;

// We have a time stamp
if (opTime && opTime._bsontype === 'Timestamp') {
if (bulkResult.opTime == null) {
bulkResult.opTime = opTime;
} else if (opTime.greaterThan(bulkResult.opTime)) {
bulkResult.opTime = opTime;
}
} else {
// Existing TS
if (bulkResult.opTime) {
lastOpTS =
typeof bulkResult.opTime.ts === 'number'
? Long.fromNumber(bulkResult.opTime.ts)
: bulkResult.opTime.ts;
lastOpT =
typeof bulkResult.opTime.t === 'number'
? Long.fromNumber(bulkResult.opTime.t)
: bulkResult.opTime.t;
}
let opTime = result.lastOp || result.opTime;

// Current OpTime TS
const opTimeTS = typeof opTime.ts === 'number' ? Long.fromNumber(opTime.ts) : opTime.ts;
const opTimeT = typeof opTime.t === 'number' ? Long.fromNumber(opTime.t) : opTime.t;
// If the opTime is a Timestamp, convert it to a consistent format to be
// able to compare easily. Converting to the object from a timestamp is
// much more straightforward than the other direction.
if (opTime._bsontype === 'Timestamp') {
opTime = { ts: opTime, t: Long.ZERO };
}

// Compare the opTime's
if (bulkResult.opTime == null) {
bulkResult.opTime = opTime;
} else if (opTimeTS.greaterThan(lastOpTS)) {
// If there's no lastOp, just set it.
if (!bulkResult.opTime) {
bulkResult.opTime = opTime;
} else {
// First compare the ts values and set if the opTimeTS value is greater.
const lastOpTS = longOrConvert(bulkResult.opTime.ts);
const opTimeTS = longOrConvert(opTime.ts);
if (opTimeTS.greaterThan(lastOpTS)) {
bulkResult.opTime = opTime;
} else if (opTimeTS.equals(lastOpTS)) {
// If the ts values are equal, then compare using the t values.
const lastOpT = longOrConvert(bulkResult.opTime.t);
const opTimeT = longOrConvert(opTime.t);
if (opTimeT.greaterThan(lastOpT)) {
bulkResult.opTime = opTime;
}
Expand Down
129 changes: 129 additions & 0 deletions test/unit/bulk/common.test.js
@@ -0,0 +1,129 @@
'use strict';

const { expect } = require('chai');
const { mergeBatchResults } = require('../../../src/bulk/common');
const { Timestamp, Long } = require('../../../src/bson');

describe('bulk/common', function () {
describe.only('#mergeBatchResults', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
let opTime;
let lastOp;
const bulkResult = {
ok: 1,
writeErrors: [],
writeConcernErrors: [],
insertedIds: [],
nInserted: 0,
nUpserted: 0,
nMatched: 0,
nModified: 0,
nRemoved: 1,
upserted: []
};
const result = {
n: 8,
nModified: 8,
electionId: '7fffffff0000000000000028',
ok: 1,
$clusterTime: {
clusterTime: '7020546605669417498',
signature: {
hash: 'AAAAAAAAAAAAAAAAAAAAAAAAAAA=',
keyId: 0
}
},
operationTime: '7020546605669417498'
};
const batch = [];

context('when lastOp is an object', function () {
context('when the opTime is a Timestamp', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = Timestamp.fromNumber(8020546605669417496);
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('replaces the opTime with the properly formatted object', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal({ ts: opTime, t: Long.ZERO });
});
});

context('when the opTime is an object', function () {
context('when the ts is greater', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417497, t: 10 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('replaces the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(opTime);
});
});

context('when the ts is equal', function () {
context('when the t is greater', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 20 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('replaces the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(opTime);
});
});

context('when the t is equal', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 10 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('does not replace the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(lastOp);
});
});

context('when the t is less', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 5 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('does not replace the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(lastOp);
});
});
});

context('when the ts is less', function () {
before(function () {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417495, t: 10 };
bulkResult.opTime = lastOp;
result.opTime = opTime;
});

it('does not replace the opTime with the new opTime', function () {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.opTime).to.deep.equal(lastOp);
});
});
});
});
});
});