Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: websockets/ws
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 7.0.1
Choose a base ref
...
head repository: websockets/ws
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 7.1.0
Choose a head ref
  • 7 commits
  • 7 files changed
  • 4 contributors

Commits on Jun 19, 2019

  1. Copy the full SHA
    d9b5562 View commit details

Commits on Jun 22, 2019

  1. Copy the full SHA
    47e7d64 View commit details

Commits on Jun 23, 2019

  1. Copy the full SHA
    c62ea9f View commit details

Commits on Jun 25, 2019

  1. Copy the full SHA
    db14864 View commit details

Commits on Jul 5, 2019

  1. Revert "[minor] Remove unneeded if statement"

    This reverts commit 297f56d.
    lpinca committed Jul 5, 2019
    Copy the full SHA
    dbacf58 View commit details

Commits on Jul 8, 2019

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    a49a827 View commit details
  2. [dist] 7.1.0

    lpinca committed Jul 8, 2019
    Copy the full SHA
    dd42c8b View commit details
Showing with 629 additions and 20 deletions.
  1. +42 −12 README.md
  2. +15 −3 doc/ws.md
  3. +1 −0 index.js
  4. +150 −0 lib/stream.js
  5. +4 −2 lib/websocket.js
  6. +3 −3 package.json
  7. +414 −0 test/create-websocket-stream.test.js
54 changes: 42 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ can use one of the many wrappers available on npm, like
- [Multiple servers sharing a single HTTP/S server](#multiple-servers-sharing-a-single-https-server)
- [Server broadcast](#server-broadcast)
- [echo.websocket.org demo](#echowebsocketorg-demo)
- [Use the Node.js streams API](#use-the-nodejs-streams-api)
- [Other examples](#other-examples)
- [FAQ](#faq)
- [How to get the IP address of the client?](#how-to-get-the-ip-address-of-the-client)
@@ -69,7 +70,8 @@ necessarily need to have a C++ compiler installed on your machine.

## API docs

See [`/doc/ws.md`](./doc/ws.md) for Node.js-like docs for the ws classes.
See [`/doc/ws.md`](./doc/ws.md) for Node.js-like documentation of ws classes and
utility functions.

## WebSocket compression

@@ -249,23 +251,35 @@ server.listen(8080);

### Server broadcast

A client WebSocket broadcasting to all connected WebSocket clients, including
itself.

```js
const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

// Broadcast to all.
wss.broadcast = function broadcast(data) {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(data) {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
});
};
});
```

A client WebSocket broadcasting to every other connected WebSocket clients,
excluding itself.

```js
const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
ws.on('message', function incoming(data) {
// Broadcast to everyone else.
wss.clients.forEach(function each(client) {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(data);
@@ -302,6 +316,21 @@ ws.on('message', function incoming(data) {
});
```

### Use the Node.js streams API

```js
const WebSocket = require('ws');

const ws = new WebSocket('wss://echo.websocket.org/', {
origin: 'https://websocket.org'
});

const duplex = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' });

duplex.pipe(process.stdout);
process.stdin.pipe(duplex);
```

### Other examples

For a full example with a browser client communicating with a ws server, see the
@@ -382,9 +411,10 @@ const WebSocket = require('ws');
function heartbeat() {
clearTimeout(this.pingTimeout);

// Use `WebSocket#terminate()` and not `WebSocket#close()`. Delay should be
// equal to the interval at which your server sends out pings plus a
// conservative assumption of the latency.
// Use `WebSocket#terminate()`, which immediately destroys the connection,
// instead of `WebSocket#close()`, which waits for the close timer.
// Delay should be equal to the interval at which your server
// sends out pings plus a conservative assumption of the latency.
this.pingTimeout = setTimeout(() => {
this.terminate();
}, 30000 + 1000);
18 changes: 15 additions & 3 deletions doc/ws.md
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@
- [websocket.send(data[, options][, callback])](#websocketsenddata-options-callback)
- [websocket.terminate()](#websocketterminate)
- [websocket.url](#websocketurl)
- [WebSocket.createWebSocketStream(websocket[, options])](#websocketcreatewebsocketstreamwebsocket-options)

## Class: WebSocket.Server

@@ -463,11 +464,22 @@ Forcibly close the connection.

The URL of the WebSocket server. Server clients don't have this attribute.

## WebSocket.createWebSocketStream(websocket[, options])

- `websocket` {WebSocket} A `WebSocket` object.
- `options` {Object} [Options][duplex-options] to pass to the `Duplex`
constructor.

Returns a `Duplex` stream that allows to use the Node.js streams API on top of a
given `WebSocket`.

[concurrency-limit]: https://github.com/websockets/ws/issues/1202
[permessage-deflate]:
https://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-19
[zlib-options]: https://nodejs.org/api/zlib.html#zlib_class_options
[duplex-options]:
https://nodejs.org/api/stream.html#stream_new_stream_duplex_options
[http.request()]:
https://nodejs.org/api/http.html#http_http_request_options_callback
[https.request()]:
https://nodejs.org/api/https.html#https_https_request_options_callback
[permessage-deflate]:
https://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-19
[zlib-options]: https://nodejs.org/api/zlib.html#zlib_class_options
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

const WebSocket = require('./lib/websocket');

WebSocket.createWebSocketStream = require('./lib/stream');
WebSocket.Server = require('./lib/websocket-server');
WebSocket.Receiver = require('./lib/receiver');
WebSocket.Sender = require('./lib/sender');
150 changes: 150 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
'use strict';

const { Duplex } = require('stream');

/**
* Emits the `'close'` event on a stream.
*
* @param {stream.Duplex} The stream.
* @private
*/
function emitClose(stream) {
stream.emit('close');
}

/**
* The listener of the `'end'` event.
*
* @private
*/
function duplexOnEnd() {
if (!this.destroyed && this._writableState.finished) {
this.destroy();
}
}

/**
* The listener of the `'error'` event.
*
* @private
*/
function duplexOnError(err) {
this.removeListener('error', duplexOnError);
this.destroy();
if (this.listenerCount('error') === 0) {
// Do not suppress the throwing behavior.
this.emit('error', err);
}
}

/**
* Wraps a `WebSocket` in a duplex stream.
*
* @param {WebSocket} ws The `WebSocket` to wrap
* @param {Object} options The options for the `Duplex` constructor
* @return {stream.Duplex} The duplex stream
* @public
*/
function createWebSocketStream(ws, options) {
let resumeOnReceiverDrain = true;

function receiverOnDrain() {
if (resumeOnReceiverDrain) ws._socket.resume();
}

if (ws.readyState === ws.CONNECTING) {
ws.once('open', function open() {
ws._receiver.removeAllListeners('drain');
ws._receiver.on('drain', receiverOnDrain);
});
} else {
ws._receiver.removeAllListeners('drain');
ws._receiver.on('drain', receiverOnDrain);
}

const duplex = new Duplex({
...options,
autoDestroy: false,
emitClose: false,
objectMode: false,
readableObjectMode: false,
writableObjectMode: false
});

ws.on('message', function message(msg) {
if (!duplex.push(msg)) {
resumeOnReceiverDrain = false;
ws._socket.pause();
}
});

ws.once('error', function error(err) {
duplex.destroy(err);
});

ws.once('close', function close() {
if (duplex.destroyed) return;

duplex.push(null);
});

duplex._destroy = function(err, callback) {
if (ws.readyState === ws.CLOSED) {
callback(err);
process.nextTick(emitClose, duplex);
return;
}

ws.once('close', function close() {
callback(err);
process.nextTick(emitClose, duplex);
});
ws.terminate();
};

duplex._final = function(callback) {
if (ws.readyState === ws.CONNECTING) {
ws.once('open', function open() {
duplex._final(callback);
});
return;
}

if (ws._socket._writableState.finished) {
if (duplex._readableState.endEmitted) duplex.destroy();
callback();
} else {
ws._socket.once('finish', function finish() {
// `duplex` is not destroyed here because the `'end'` event will be
// emitted on `duplex` after this `'finish'` event. The EOF signaling
// `null` chunk is, in fact, pushed when the WebSocket emits `'close'`.
callback();
});
ws.close();
}
};

duplex._read = function() {
if (ws.readyState === ws.OPEN && !resumeOnReceiverDrain) {
resumeOnReceiverDrain = true;
if (!ws._receiver._writableState.needDrain) ws._socket.resume();
}
};

duplex._write = function(chunk, encoding, callback) {
if (ws.readyState === ws.CONNECTING) {
ws.once('open', function open() {
duplex._write(chunk, encoding, callback);
});
return;
}

ws.send(chunk, callback);
};

duplex.on('end', duplexOnEnd);
duplex.on('error', duplexOnError);
return duplex;
}

module.exports = createWebSocketStream;
6 changes: 4 additions & 2 deletions lib/websocket.js
Original file line number Diff line number Diff line change
@@ -901,6 +901,8 @@ function socketOnError() {
this.removeListener('error', socketOnError);
this.on('error', NOOP);

websocket.readyState = WebSocket.CLOSING;
this.destroy();
if (websocket) {
websocket.readyState = WebSocket.CLOSING;
this.destroy();
}
}
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ws",
"version": "7.0.1",
"version": "7.1.0",
"description": "Simple to use, blazing fast and thoroughly tested websocket client and server for Node.js",
"keywords": [
"HyBi",
@@ -34,8 +34,8 @@
"benchmark": "^2.1.4",
"bufferutil": "^4.0.1",
"coveralls": "^3.0.3",
"eslint": "^5.16.0",
"eslint-config-prettier": "^5.0.0",
"eslint": "^6.0.0",
"eslint-config-prettier": "^6.0.0",
"eslint-plugin-prettier": "^3.0.1",
"mocha": "^6.1.3",
"nyc": "^14.0.0",
414 changes: 414 additions & 0 deletions test/create-websocket-stream.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,414 @@
'use strict';

const assert = require('assert');
const EventEmitter = require('events');
const { Duplex } = require('stream');
const { randomBytes } = require('crypto');

const createWebSocketStream = require('../lib/stream');
const Sender = require('../lib/sender');
const WebSocket = require('..');

describe('createWebSocketStream', () => {
it('is exposed as a property of the `WebSocket` class', () => {
assert.strictEqual(WebSocket.createWebSocketStream, createWebSocketStream);
});

it('returns a `Duplex` stream', () => {
const duplex = createWebSocketStream(new EventEmitter());

assert.ok(duplex instanceof Duplex);
});

it('passes the options object to the `Duplex` constructor', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws, {
allowHalfOpen: false,
encoding: 'utf8'
});

duplex.on('data', (chunk) => {
assert.strictEqual(chunk, 'hi');

duplex.on('close', () => {
wss.close(done);
});
});
});

wss.on('connection', (ws) => {
ws.send(Buffer.from('hi'));
ws.close();
});
});

describe('The returned stream', () => {
it('buffers writes if `readyState` is `CONNECTING`', (done) => {
const chunk = randomBytes(1024);
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

assert.strictEqual(ws.readyState, 0);

const duplex = createWebSocketStream(ws);

duplex.write(chunk);
});

wss.on('connection', (ws) => {
ws.on('message', (message) => {
ws.on('close', (code, reason) => {
assert.ok(message.equals(chunk));
assert.strictEqual(code, 1005);
assert.strictEqual(reason, '');
wss.close(done);
});
});

ws.close();
});
});

it('errors if a write occurs when `readyState` is `CLOSING`', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);

duplex.on('error', (err) => {
assert.ok(duplex.destroyed);
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);

duplex.on('close', () => {
wss.close(done);
});
});

ws.on('open', () => {
ws._receiver.on('conclude', () => {
duplex.write('hi');
});
});
});

wss.on('connection', (ws) => {
ws.close();
});
});

it('errors if a write occurs when `readyState` is `CLOSED`', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);

duplex.on('error', (err) => {
assert.ok(duplex.destroyed);
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 3 (CLOSED)'
);

duplex.on('close', () => {
wss.close(done);
});
});

ws.on('close', () => {
duplex.write('hi');
});
});

wss.on('connection', (ws) => {
ws.close();
});
});

it('does not error if `_final()` is called while connecting', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

assert.strictEqual(ws.readyState, 0);

const duplex = createWebSocketStream(ws);

duplex.on('close', () => {
wss.close(done);
});

duplex.resume();
duplex.end();
});
});

it('reemits errors', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);

duplex.on('error', (err) => {
assert.ok(err instanceof RangeError);
assert.strictEqual(
err.message,
'Invalid WebSocket frame: invalid opcode 5'
);

duplex.on('close', () => {
wss.close(done);
});
});
});

wss.on('connection', (ws) => {
ws._socket.write(Buffer.from([0x85, 0x00]));
});
});

it("does not suppress the throwing behavior of 'error' events", (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
createWebSocketStream(ws);
});

wss.on('connection', (ws) => {
ws._socket.write(Buffer.from([0x85, 0x00]));
});

assert.strictEqual(process.listenerCount('uncaughtException'), 1);

const [listener] = process.listeners('uncaughtException');

process.removeAllListeners('uncaughtException');
process.once('uncaughtException', (err) => {
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'Invalid WebSocket frame: invalid opcode 5'
);

process.on('uncaughtException', listener);
wss.close(done);
});
});

it("is destroyed after 'end' and 'finish' are emitted (1/2)", (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const events = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);

duplex.on('end', () => {
events.push('end');
assert.ok(duplex.destroyed);
});

duplex.on('close', () => {
assert.deepStrictEqual(events, ['finish', 'end']);
wss.close(done);
});

duplex.on('finish', () => {
events.push('finish');
assert.ok(!duplex.destroyed);
assert.ok(duplex.readable);

duplex.resume();
});

ws.on('close', () => {
duplex.end();
});
});

wss.on('connection', (ws) => {
ws.send('foo');
ws.close();
});
});

it("is destroyed after 'end' and 'finish' are emitted (2/2)", (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const events = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);

duplex.on('end', () => {
events.push('end');
assert.ok(!duplex.destroyed);
assert.ok(duplex.writable);

duplex.end();
});

duplex.on('close', () => {
assert.deepStrictEqual(events, ['end', 'finish']);
wss.close(done);
});

duplex.on('finish', () => {
events.push('finish');
assert.ok(duplex.destroyed);
});

duplex.resume();
});

wss.on('connection', (ws) => {
ws.close();
});
});

it('handles backpressure (1/3)', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
// eslint-disable-next-line no-unused-vars
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
const duplex = createWebSocketStream(ws);

duplex.resume();

duplex.on('drain', () => {
duplex.on('close', () => {
wss.close(done);
});

duplex.end();
});

const chunk = randomBytes(1024);
let ret;

do {
ret = duplex.write(chunk);
} while (ret !== false);
});
});

it('handles backpressure (2/3)', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const called = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
const read = duplex._read;

duplex._read = () => {
called.push('read');
assert.ok(ws._receiver._writableState.needDrain);
read();
assert.ok(ws._socket.isPaused());
};

ws.on('open', () => {
ws._socket.on('pause', () => {
duplex.resume();
});

ws._receiver.on('drain', () => {
called.push('drain');
assert.ok(!ws._socket.isPaused());
});

const list = Sender.frame(randomBytes(16 * 1024), {
fin: true,
rsv1: false,
opcode: 0x02,
mask: false,
readOnly: false
});

// This hack is used because there is no guarantee that more than
// 16KiB will be sent as a single TCP packet.
ws._socket.push(Buffer.concat(list));
});

duplex.on('resume', duplex.end);
duplex.on('close', () => {
assert.deepStrictEqual(called, ['read', 'drain']);
wss.close(done);
});
});
});

it('handles backpressure (3/3)', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const called = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);

const read = duplex._read;

duplex._read = () => {
called.push('read');
assert.ok(!ws._receiver._writableState.needDrain);
read();
assert.ok(!ws._socket.isPaused());
};

ws.on('open', () => {
ws._receiver.on('drain', () => {
called.push('drain');
assert.ok(ws._socket.isPaused());
duplex.resume();
});

const list = Sender.frame(randomBytes(16 * 1024), {
fin: true,
rsv1: false,
opcode: 0x02,
mask: false,
readOnly: false
});

ws._socket.push(Buffer.concat(list));
});

duplex.on('resume', duplex.end);
duplex.on('close', () => {
assert.deepStrictEqual(called, ['drain', 'read']);
wss.close(done);
});
});
});

it('can be destroyed (1/2)', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const error = new Error('Oops');
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);

duplex.on('error', (err) => {
assert.strictEqual(err, error);

duplex.on('close', () => {
wss.close(done);
});
});

ws.on('open', () => {
duplex.destroy(error);
});
});
});

it('can be destroyed (2/2)', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);

duplex.on('close', () => {
wss.close(done);
});

ws.on('open', () => {
duplex.destroy();
});
});
});
});
});