Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-4139): streaming protocol message changes #3256

Merged
merged 7 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this[kHello] = response;
}

// Set the whether the message stream is for a monitoring connection.
set isMonitoringConnection(value: boolean) {
this[kMessageStream].isMonitoringConnection = value;
}

get isMonitoringConnection(): boolean {
return this[kMessageStream].isMonitoringConnection;
}

get serviceId(): ObjectId | undefined {
return this.hello?.serviceId;
}
Expand Down
48 changes: 41 additions & 7 deletions src/cmap/message_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,19 @@ export class MessageStream extends Duplex {
maxBsonMessageSize: number;
/** @internal */
[kBuffer]: BufferPool;
/** @internal */
isMonitoringConnection = false;

constructor(options: MessageStreamOptions = {}) {
super(options);
this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;
this[kBuffer] = new BufferPool();
}

get buffer(): BufferPool {
return this[kBuffer];
}

override _write(chunk: Buffer, _: unknown, callback: Callback<Buffer>): void {
this[kBuffer].append(chunk);
processIncomingData(this, callback);
Expand Down Expand Up @@ -162,15 +168,36 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
opCode: message.readInt32LE(12)
};

const monitorHasAnotherHello = () => {
if (stream.isMonitoringConnection) {
// Can we read the next message size?
if (buffer.length >= 4) {
const sizeOfMessage = buffer.peek(4).readInt32LE();
if (sizeOfMessage < buffer.length) {
return true;
}
}
}
return false;
};

let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
if (messageHeader.opCode !== OP_COMPRESSED) {
const messageBody = message.slice(MESSAGE_HEADER_SIZE);
stream.emit('message', new ResponseType(message, messageHeader, messageBody));

if (buffer.length >= 4) {
// If we are a monitoring connection message stream and
// there is more in the buffer that can be read, skip processing since we
// want the last hello command response that is in the buffer.
if (monitorHasAnotherHello()) {
processIncomingData(stream, callback);
} else {
callback();
stream.emit('message', new ResponseType(message, messageHeader, messageBody));

if (buffer.length >= 4) {
processIncomingData(stream, callback);
dariakp marked this conversation as resolved.
Show resolved Hide resolved
} else {
callback();
}
}

return;
Expand Down Expand Up @@ -198,12 +225,19 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
return;
}

stream.emit('message', new ResponseType(message, messageHeader, messageBody));

if (buffer.length >= 4) {
// If we are a monitoring connection message stream and
// there is more in the buffer that can be read, skip processing since we
// want the last hello command response that is in the buffer.
if (monitorHasAnotherHello()) {
processIncomingData(stream, callback);
} else {
callback();
stream.emit('message', new ResponseType(message, messageHeader, messageBody));

if (buffer.length >= 4) {
processIncomingData(stream, callback);
} else {
callback();
}
}
});
}
8 changes: 8 additions & 0 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
[kMonitorId]?: InterruptibleAsyncInterval;
[kRTTPinger]?: RTTPinger;

get connection(): Connection | undefined {
return this[kConnection];
}

constructor(server: Server, options: MonitorOptions) {
super();

Expand Down Expand Up @@ -310,6 +314,10 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}

if (conn) {
// Tell the connection that we are using the streaming protocol so that the
// connection's message stream will only read the last hello on the buffer.
conn.isMonitoringConnection = true;

if (isInCloseState(monitor)) {
conn.destroy({ force: true });
return;
Expand Down
21 changes: 21 additions & 0 deletions test/tools/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { EJSON } from 'bson';
import * as BSON from 'bson';
import { expect } from 'chai';
import { inspect, promisify } from 'util';

import { OP_MSG } from '../../src/cmap/wire_protocol/constants';
import { Document } from '../../src/index';
import { Logger } from '../../src/logger';
import { deprecateOptions, DeprecateOptionsConfig } from '../../src/utils';
import { runUnifiedSuite } from './unified-spec-runner/runner';
Expand Down Expand Up @@ -343,6 +346,24 @@ export class TestBuilder {
}
}

export function generateOpMsgBuffer(document: Document): Buffer {
const header = Buffer.alloc(4 * 4 + 4);

const typeBuffer = Buffer.alloc(1);
typeBuffer[0] = 0;

const docBuffer = BSON.serialize(document);

const totalLength = header.length + typeBuffer.length + docBuffer.length;

header.writeInt32LE(totalLength, 0);
header.writeInt32LE(0, 4);
header.writeInt32LE(0, 8);
header.writeInt32LE(OP_MSG, 12);
header.writeUInt32LE(0, 16);
return Buffer.concat([header, typeBuffer, docBuffer]);
}

export class UnifiedTestSuiteBuilder {
private _description = 'Default Description';
private _schemaVersion = '1.0';
Expand Down
197 changes: 99 additions & 98 deletions test/unit/cmap/message_stream.test.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
'use strict';
const Readable = require('stream').Readable;
const Writable = require('stream').Writable;
const { on, once } = require('events');
const { Readable, Writable } = require('stream');

const { MessageStream } = require('../../../src/cmap/message_stream');
const { Msg } = require('../../../src/cmap/commands');
const expect = require('chai').expect;
const { LEGACY_HELLO_COMMAND } = require('../../../src/constants');
const { generateOpMsgBuffer } = require('../../tools/utils');

function bufferToStream(buffer) {
const stream = new Readable();
Expand All @@ -18,117 +20,116 @@ function bufferToStream(buffer) {
return stream;
}

describe('Message Stream', function () {
describe('reading', function () {
[
{
description: 'valid OP_REPLY',
data: Buffer.from(
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000',
'hex'
),
documents: [{ [LEGACY_HELLO_COMMAND]: 1 }]
},
{
description: 'valid multiple OP_REPLY',
expectedMessageCount: 4,
data: Buffer.from(
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' +
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' +
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' +
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000',
'hex'
),
documents: [{ [LEGACY_HELLO_COMMAND]: 1 }]
},
{
description: 'valid OP_REPLY (partial)',
data: [
Buffer.from('37', 'hex'),
Buffer.from('0000', 'hex'),
Buffer.from(
'000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000',
'hex'
)
],
documents: [{ [LEGACY_HELLO_COMMAND]: 1 }]
},

{
description: 'valid OP_MSG',
data: Buffer.from(
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000',
'hex'
),
documents: [{ $db: 'admin', [LEGACY_HELLO_COMMAND]: 1 }]
},
{
description: 'valid multiple OP_MSG',
expectedMessageCount: 4,
data: Buffer.from(
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' +
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' +
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' +
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000',
'hex'
),
documents: [{ $db: 'admin', [LEGACY_HELLO_COMMAND]: 1 }]
},

{
description: 'Invalid message size (negative)',
data: Buffer.from('ffffffff', 'hex'),
error: 'Invalid message size: -1'
},
{
description: 'Invalid message size (exceeds maximum)',
data: Buffer.from('01000004', 'hex'),
error: 'Invalid message size: 67108865, max allowed: 67108864'
}
].forEach(test => {
it(test.description, function (done) {
const error = test.error;
const expectedMessageCount = test.expectedMessageCount || 1;
const inputStream = bufferToStream(test.data);
const messageStream = new MessageStream();
describe('MessageStream', function () {
context('when the stream is for a monitoring connection', function () {
const response = { isWritablePrimary: true };
let firstHello;
let secondHello;
let thirdHello;
let partial;

beforeEach(function () {
firstHello = generateOpMsgBuffer(response);
secondHello = generateOpMsgBuffer(response);
thirdHello = generateOpMsgBuffer(response);
partial = Buffer.alloc(5);
partial.writeInt32LE(100, 0);
});

let messageCount = 0;
messageStream.on('message', msg => {
messageCount++;
if (error) {
done(new Error(`expected error: ${error}`));
return;
}
it('only reads the last message in the buffer', async function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello]));
const messageStream = new MessageStream();
messageStream.isMonitoringConnection = true;

inputStream.pipe(messageStream);
const messages = await once(messageStream, 'message');
const msg = messages[0];
msg.parse();
expect(msg).to.have.property('documents').that.deep.equals([response]);
// Make sure there is nothing left in the buffer.
expect(messageStream.buffer.length).to.equal(0);
});

msg.parse();
it('does not read partial messages', async function () {
const inputStream = bufferToStream(
Buffer.concat([firstHello, secondHello, thirdHello, partial])
);
const messageStream = new MessageStream();
messageStream.isMonitoringConnection = true;

inputStream.pipe(messageStream);
const messages = await once(messageStream, 'message');
const msg = messages[0];
msg.parse();
expect(msg).to.have.property('documents').that.deep.equals([response]);
// Make sure the buffer wasn't read to the end.
expect(messageStream.buffer.length).to.equal(5);
});
});

if (test.documents) {
expect(msg).to.have.property('documents').that.deep.equals(test.documents);
}
context('when the stream is not for a monitoring connection', function () {
context('when the messages are valid', function () {
const response = { isWritablePrimary: true };
let firstHello;
let secondHello;
let thirdHello;
let messageCount = 0;

beforeEach(function () {
firstHello = generateOpMsgBuffer(response);
secondHello = generateOpMsgBuffer(response);
thirdHello = generateOpMsgBuffer(response);
});

if (messageCount === expectedMessageCount) {
done();
}
});
it('reads all messages in the buffer', async function () {
const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello]));
const messageStream = new MessageStream();

messageStream.on('error', err => {
if (error == null) {
done(err);
inputStream.pipe(messageStream);
for await (const messages of on(messageStream, 'message')) {
messageCount++;
const msg = messages[0];
msg.parse();
expect(msg).to.have.property('documents').that.deep.equals([response]);
// Test will not complete until 3 messages processed.
if (messageCount === 3) {
return;
}
}
});
});

expect(err).to.have.property('message').that.equals(error);
context('when the messages are invalid', function () {
context('when the message size is negative', function () {
it('emits an error', async function () {
const inputStream = bufferToStream(Buffer.from('ffffffff', 'hex'));
const messageStream = new MessageStream();

done();
inputStream.pipe(messageStream);
const errors = await once(messageStream, 'error');
const err = errors[0];
expect(err).to.have.property('message').that.equals('Invalid message size: -1');
});
});

inputStream.pipe(messageStream);
context('when the message size exceeds the bson maximum', function () {
it('emits an error', async function () {
const inputStream = bufferToStream(Buffer.from('01000004', 'hex'));
const messageStream = new MessageStream();

inputStream.pipe(messageStream);
const errors = await once(messageStream, 'error');
const err = errors[0];
expect(err)
.to.have.property('message')
.that.equals('Invalid message size: 67108865, max allowed: 67108864');
});
});
});
});

describe('writing', function () {
it('should write a message to the stream', function (done) {
context('when writing to the message stream', function () {
it('pushes the message', function (done) {
const readableStream = new Readable({ read() {} });
const writeableStream = new Writable({
write: (chunk, _, callback) => {
Expand Down
5 changes: 4 additions & 1 deletion test/unit/sdam/monitor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ describe('monitoring', function () {
monitor = new Monitor(server, {});

monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure')));
monitor.on('serverHeartbeatSucceeded', () => done());
monitor.on('serverHeartbeatSucceeded', () => {
expect(monitor.connection.isMonitoringConnection).to.be.true;
done();
});
monitor.connect();
});

Expand Down