Skip to content

Commit

Permalink
fix(NODE-4139): set flag on all monitor connections
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed May 23, 2022
1 parent 859d9e0 commit 9621129
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 23 deletions.
11 changes: 5 additions & 6 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this[kHello] = response;
}

// Set the whether the message stream is for a monitoring connection using the
// streaming protocol.
set isStreamingProtocol(value: boolean) {
this[kMessageStream].isStreamingProtocol = value;
// Set the whether the message stream is for a monitoring connection.
set isMonitoringConnection(value: boolean) {
this[kMessageStream].isMonitoringConnection = value;
}

get isStreamingProtocol(): boolean {
return this[kMessageStream].isStreamingProtocol;
get isMonitoringConnection(): boolean {
return this[kMessageStream].isMonitoringConnection;
}

get serviceId(): ObjectId | undefined {
Expand Down
15 changes: 7 additions & 8 deletions src/cmap/message_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export class MessageStream extends Duplex {
/** @internal */
[kBuffer]: BufferPool;
/** @internal */
isStreamingProtocol = false;
isMonitoringConnection = false;

constructor(options: MessageStreamOptions = {}) {
super(options);
Expand Down Expand Up @@ -168,15 +168,14 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
opCode: message.readInt32LE(12)
};

const streamingProtocolHasAnotherHello = () => {
if (stream.isStreamingProtocol) {
const monitorHasAnotherHello = () => {
if (stream.isMonitoringConnection) {
// Can we read the next message size?
if (buffer.length >= 4) {
const sizeOfMessage = buffer.peek(4).readInt32LE();
if (sizeOfMessage < buffer.length) {
return true;
}
return false;
}
}
return false;
Expand All @@ -186,10 +185,10 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
if (messageHeader.opCode !== OP_COMPRESSED) {
const messageBody = message.slice(MESSAGE_HEADER_SIZE);

// If we are a monitoring message stream using the streaming protocol and
// If we are a monitoring connection message stream and
// there is more in the buffer that can be read, skip processing since we
// want the last hello command response that is in the buffer.
if (streamingProtocolHasAnotherHello()) {
if (monitorHasAnotherHello()) {
processIncomingData(stream, callback);
} else {
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
Expand Down Expand Up @@ -226,10 +225,10 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
return;
}

// If we are a monitoring message stream using the streaming protocol and
// If we are a monitoring connection message stream and
// there is more in the buffer that can be read, skip processing since we
// want the last hello command response that is in the buffer.
if (streamingProtocolHasAnotherHello()) {
if (monitorHasAnotherHello()) {
processIncomingData(stream, callback);
} else {
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
Expand Down
12 changes: 8 additions & 4 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
[kMonitorId]?: InterruptibleAsyncInterval;
[kRTTPinger]?: RTTPinger;

get connection(): Connection | undefined {
return this[kConnection];
}

constructor(server: Server, options: MonitorOptions) {
super();

Expand Down Expand Up @@ -279,10 +283,6 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
// if we are using the streaming protocol then we immediately issue another `started`
// event, otherwise the "check" is complete and return to the main monitor loop
if (isAwaitable && hello.topologyVersion) {
// Tell the connection that we are using the streaming protocol so that the
// connection's message stream will only read the last hello on the buffer.
connection.isStreamingProtocol = true;

monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address)
Expand Down Expand Up @@ -314,6 +314,10 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}

if (conn) {
// Tell the connection that we are using the streaming protocol so that the
// connection's message stream will only read the last hello on the buffer.
conn.isMonitoringConnection = true;

if (isInCloseState(monitor)) {
conn.destroy({ force: true });
return;
Expand Down
8 changes: 4 additions & 4 deletions test/unit/cmap/message_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function bufferToStream(buffer) {
}

describe('MessageStream', function () {
context('when the stream uses the streaming protocol', function () {
context('when the stream is for a monitoring connection', function () {
const response = { isWritablePrimary: true };
let firstHello;
let secondHello;
Expand All @@ -39,7 +39,7 @@ describe('MessageStream', function () {
it('only reads the last message in the buffer', async function () {
const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello]));
const messageStream = new MessageStream();
messageStream.isStreamingProtocol = true;
messageStream.isMonitoringConnection = true;

inputStream.pipe(messageStream);
const messages = await once(messageStream, 'message');
Expand All @@ -55,7 +55,7 @@ describe('MessageStream', function () {
Buffer.concat([firstHello, secondHello, thirdHello, partial])
);
const messageStream = new MessageStream();
messageStream.isStreamingProtocol = true;
messageStream.isMonitoringConnection = true;

inputStream.pipe(messageStream);
const messages = await once(messageStream, 'message');
Expand All @@ -67,7 +67,7 @@ describe('MessageStream', function () {
});
});

context('when the stream is not using the streaming protocol', function () {
context('when the stream is not for a monitoring connection', function () {
context('when the messages are valid', function () {
const response = { isWritablePrimary: true };
let firstHello;
Expand Down
5 changes: 4 additions & 1 deletion test/unit/sdam/monitor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ describe('monitoring', function () {
monitor = new Monitor(server, {});

monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure')));
monitor.on('serverHeartbeatSucceeded', () => done());
monitor.on('serverHeartbeatSucceeded', () => {
expect(monitor.connection.isMonitoringConnection).to.be.true;
done();
});
monitor.connect();
});

Expand Down

0 comments on commit 9621129

Please sign in to comment.