Skip to content

Commit

Permalink
feat(NODE-2843): implement sessions advanceClusterTime method (#2920)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariakp committed Aug 2, 2021
1 parent a9c0de8 commit 1fd0244
Show file tree
Hide file tree
Showing 5 changed files with 502 additions and 190 deletions.
14 changes: 7 additions & 7 deletions src/sdam/common.ts
Expand Up @@ -66,16 +66,16 @@ export interface ClusterTime {
};
}

/** Shared function to determine clusterTime for a given topology */
export function resolveClusterTime(
topology: Topology | ClientSession,
/** Shared function to determine clusterTime for a given topology or session */
export function _advanceClusterTime(
entity: Topology | ClientSession,
$clusterTime: ClusterTime
): void {
if (topology.clusterTime == null) {
topology.clusterTime = $clusterTime;
if (entity.clusterTime == null) {
entity.clusterTime = $clusterTime;
} else {
if ($clusterTime.clusterTime.greaterThan(topology.clusterTime.clusterTime)) {
topology.clusterTime = $clusterTime;
if ($clusterTime.clusterTime.greaterThan(entity.clusterTime.clusterTime)) {
entity.clusterTime = $clusterTime;
}
}
}
4 changes: 2 additions & 2 deletions src/sdam/topology.ts
Expand Up @@ -28,7 +28,7 @@ import {
ServerType,
ClusterTime,
TimerQueue,
resolveClusterTime,
_advanceClusterTime,
drainTimerQueue,
clearAndRemoveTimerFrom,
STATE_CLOSED,
Expand Down Expand Up @@ -681,7 +681,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
// value of the clusterTime embedded field."
const clusterTime = serverDescription.$clusterTime;
if (clusterTime) {
resolveClusterTime(this, clusterTime);
_advanceClusterTime(this, clusterTime);
}

// If we already know all the information contained in this updated description, then
Expand Down
32 changes: 30 additions & 2 deletions src/sessions.ts
Expand Up @@ -2,7 +2,7 @@ import { PromiseProvider } from './promise_provider';
import { Binary, Long, Timestamp, Document } from './bson';
import { ReadPreference } from './read_preference';
import { isTransactionCommand, TxnState, Transaction, TransactionOptions } from './transactions';
import { resolveClusterTime, ClusterTime } from './sdam/common';
import { _advanceClusterTime, ClusterTime } from './sdam/common';
import { isSharded } from './cmap/wire_protocol/shared';
import {
MongoError,
Expand Down Expand Up @@ -249,6 +249,34 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
}
}

/**
* Advances the clusterTime for a ClientSession to the provided clusterTime of another ClientSession
*
* @param clusterTime - the $clusterTime returned by the server from another session in the form of a document containing the `BSON.Timestamp` clusterTime and signature
*/
advanceClusterTime(clusterTime: ClusterTime): void {
if (!clusterTime || typeof clusterTime !== 'object') {
throw new MongoInvalidArgumentError('input cluster time must be an object');
}
if (!clusterTime.clusterTime || clusterTime.clusterTime._bsontype !== 'Timestamp') {
throw new MongoInvalidArgumentError(
'input cluster time "clusterTime" property must be a valid BSON Timestamp'
);
}
if (
!clusterTime.signature ||
clusterTime.signature.hash?._bsontype !== 'Binary' ||
(typeof clusterTime.signature.keyId !== 'number' &&
clusterTime.signature.keyId?._bsontype !== 'Long') // apparently we decode the key to number?
) {
throw new MongoInvalidArgumentError(
'input cluster time must have a valid "signature" property with BSON Binary hash and BSON Long keyId'
);
}

_advanceClusterTime(this, clusterTime);
}

/**
* Used to determine if this session equals another
*
Expand Down Expand Up @@ -886,7 +914,7 @@ export function applySession(

export function updateSessionFromResponse(session: ClientSession, document: Document): void {
if (document.$clusterTime) {
resolveClusterTime(session, document.$clusterTime);
_advanceClusterTime(session, document.$clusterTime);
}

if (document.operationTime && session && session.supports.causalConsistency) {
Expand Down

0 comments on commit 1fd0244

Please sign in to comment.