Skip to content

Commit

Permalink
test(NODE-4139): make all tests async
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed May 23, 2022
1 parent ca6e0aa commit 859d9e0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/cmap/message_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
}
}
return false;
}
};

let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
if (messageHeader.opCode !== OP_COMPRESSED) {
Expand Down
57 changes: 36 additions & 21 deletions test/unit/cmap/message_stream.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
const { once } = require('events');
const { on, once } = require('events');
const { Readable, Writable } = require('stream');

const { MessageStream } = require('../../../src/cmap/message_stream');
Expand All @@ -26,11 +26,14 @@ describe('MessageStream', function () {
let firstHello;
let secondHello;
let thirdHello;
let partial;

beforeEach(function () {
firstHello = generateOpMsgBuffer(response);
secondHello = generateOpMsgBuffer(response);
thirdHello = generateOpMsgBuffer(response);
partial = Buffer.alloc(5);
partial.writeInt32LE(100, 0);
});

it('only reads the last message in the buffer', async function () {
Expand All @@ -46,6 +49,22 @@ describe('MessageStream', function () {
// Make sure there is nothing left in the buffer.
expect(messageStream.buffer.length).to.equal(0);
});

it('does not read partial messages', async function () {
const inputStream = bufferToStream(
Buffer.concat([firstHello, secondHello, thirdHello, partial])
);
const messageStream = new MessageStream();
messageStream.isStreamingProtocol = true;

inputStream.pipe(messageStream);
const messages = await once(messageStream, 'message');
const msg = messages[0];
msg.parse();
expect(msg).to.have.property('documents').that.deep.equals([response]);
// Make sure the buffer wasn't read to the end.
expect(messageStream.buffer.length).to.equal(5);
});
});

context('when the stream is not using the streaming protocol', function () {
Expand All @@ -62,52 +81,48 @@ describe('MessageStream', function () {
thirdHello = generateOpMsgBuffer(response);
});

it('reads all messages in the buffer', function (done) {
it('reads all messages in the buffer', async function () {
const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello]));
const messageStream = new MessageStream();

messageStream.on('message', msg => {
inputStream.pipe(messageStream);
for await (const messages of on(messageStream, 'message')) {
messageCount++;
const msg = messages[0];
msg.parse();
expect(msg).to.have.property('documents').that.deep.equals([response]);
// Test will not complete until 3 messages processed.
if (messageCount === 3) {
done();
return;
}
});

inputStream.pipe(messageStream);
}
});
});

context('when the messages are invalid', function () {
context('when the message size is negative', function () {
it('emits an error', function (done) {
it('emits an error', async function () {
const inputStream = bufferToStream(Buffer.from('ffffffff', 'hex'));
const messageStream = new MessageStream();

messageStream.on('error', err => {
expect(err).to.have.property('message').that.equals('Invalid message size: -1');
done();
});

inputStream.pipe(messageStream);
const errors = await once(messageStream, 'error');
const err = errors[0];
expect(err).to.have.property('message').that.equals('Invalid message size: -1');
});
});

context('when the message size exceeds the bson maximum', function () {
it('emits an error', function (done) {
it('emits an error', async function () {
const inputStream = bufferToStream(Buffer.from('01000004', 'hex'));
const messageStream = new MessageStream();

messageStream.on('error', err => {
expect(err)
.to.have.property('message')
.that.equals('Invalid message size: 67108865, max allowed: 67108864');
done();
});

inputStream.pipe(messageStream);
const errors = await once(messageStream, 'error');
const err = errors[0];
expect(err)
.to.have.property('message')
.that.equals('Invalid message size: 67108865, max allowed: 67108864');
});
});
});
Expand Down

0 comments on commit 859d9e0

Please sign in to comment.