Skip to content

Commit

Permalink
WebSocket: enable support for per-message compression (#2029)
Browse files Browse the repository at this point in the history
# Description

Closes #1734 

This PR enables per-message compression for websocket connections.

To do this, this PR replaces the [ws](https://www.npmjs.com/package/ws) module with the [uWebSockets.js](https://github.com/uNetworking/uWebSockets.js) one: the latter is faster, more stable, and not prone to the catastrophic memory fragmentation issue with Node.js (mitigated in later Node.js 12.x  versions).

Since µwebsockets uses C sockets, we cannot replace the WebSocket layer without replacing the HTTP one too: we cannot reuse the Node.js HTTP socket for WebSocket, and we don't want Kuzzle to have 2 different network ports for HTTP and WebSocket.
So this PR also lets µws handle HTTP connections, hence the sizeable refactor.

# Enhancements

* The `allow-encoding` HTTP header is now properly interpreted: before this PR, the priority argument `<algorithm>;q=<priority>` was ignored
* Content parsing for multipart form requests is now done by µws, letting us remove the `busboy` dependency
* Due to how µws handles idle client connections and heartbeat, I had to rework the websocket options in our RC file:
  * the `heartbeat` option is now deprecated and ignored
  * the `idleTimeout` now cannot be deactivated, and its value cannot be set lower than 1000ms. To prevent breaking changes with existing installs, this option is defaulted to 60000ms if set to a value lower than 1000.
  * a new `compression` option has been added. It takes a boolean (`false` by default): setting this option to `true` enables per-message compression
* Add a configurable rate limiter for websocket connections (disabled by default)

# Other changes

* Debug domain for the network layer has been renamed `kuzzle:network:*` (from `kuzzle:entry-point:*`)
* The `server:getConfig` removed the entire `http` configuration from its output, instead of just the HTTP routes one (`http.routes`). This has been fixed, as not returning the http config is a bit confusing (I know I was perplexed when trying to figure why I didn't have HTTP config in that route output)
* Default timeout of cucumber.js has been increased from 5000ms to 30000ms, to prevent functional tests to fail in github actions


# Bug fixes

* Numbers as raw request results are now correctly handled (trying to send a number as a raw response results in an error without this PR)
* Make a realtime functional test (using the functional tests plugin) reentrant, allowing it to be run multiple times on the same node
  • Loading branch information
scottinet committed Mar 26, 2021
1 parent e8d582a commit 306fb46
Show file tree
Hide file tree
Showing 38 changed files with 7,798 additions and 7,068 deletions.
6 changes: 3 additions & 3 deletions .github/actions/es-lint/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ description: Run ESLint

runs:
using: "composite"
steps:
- run: npm ci --silent
steps:
- run: npm ci
shell: bash
- run: npm run --silent test:lint
- run: npm run test:lint
shell: bash
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ lib/core/backend/backendPlugin.js
lib/core/shared/sdk/embeddedSdk.js
lib/core/plugin/pluginContext.js
lib/api/request/index.js
lib/api/request/kuzzleRequest.js
lib/api/request/kuzzle-request.js
lib/api/request/requestContext.js
lib/api/request/requestInput.js
Expand Down
51 changes: 25 additions & 26 deletions .kuzzlerc.sample
Original file line number Diff line number Diff line change
Expand Up @@ -372,27 +372,27 @@
// protocols can be extended and configured in this section.
"protocols": {
"http": {
// * allowCompression:
// Enable support for compressed requests, using the
// Content-Encoding header
// Currently supported compression algorithms:
// gzip, deflate, identity
// Note: "identity" is always an accepted value, even if
// compression support is disabled
// * enabled:
// Set to "false" to disable HTTP support
// * maxFormFileSize:
// Maximum size of requests sent via http forms
// * maxEncodingLayers:
// Maximum number of encoding layers that can be applied
// to an http message, using the Content-Encoding header.
// This parameter is meant to prevent abuses by setting an
// abnormally large number of encodings, forcing Kuzzle to
// allocate as many decoders to handle the incoming request.
// * allowCompression:
// Enable support for compressed requests, using the
// Content-Encoding header
// Currently supported compression algorithms:
// gzip, deflate, identity
// Note: "identity" is always an accepted value, even if
// compression support is disabled
// * maxFormFileSize:
// Maximum size of requests sent via http forms
"allowCompression": true,
"enabled": true,
"maxFormFileSize": "1mb",
"maxEncodingLayers": 3,
"allowCompression": true
"maxFormFileSize": "1mb"
},
"mqtt": {
// * enabled:
Expand Down Expand Up @@ -422,28 +422,27 @@
}
},
"websocket": {
// * compression:
// Enable/Disable per message compression
// * enabled:
// Set to true to enable WebSocket support
// * idleTimeout:
// The maximum time (in milliseconds) without sending or receiving a
// message from a client. Once reached, the client's socket is
// forcibly closed.
// Contrary to heartbeats (see below), this is a passive check,
// forcing all clients to actively send either PINGs or messages to
// maintain their connection active.
// Set the value to 0 to disable this feature (should only be
// activated if heartbeat is disabled)
// * heartbeat:
// The time, in milliseconds, between the server's PING requests to
// clients, to make sure they are still active.
// Setting this value to 0 disables PING requests from the server
// (it will still respond with a PONG to PING requests from clients).
// If heartbeat is deactivated, then setting a non-zero value to
// idleTimeout is strongly recommended to detect and remove
// dead sockets.
// If a client socket is inactive for too long, the server will send
// a PING request before closing the socket.
// Minimum value: 1000 (but it's strongly advised to not set a value
// this low to forcibly close idle client sockets)
// * rateLimit:
// The maximum number of messages per second a single socket can
// submit to the server.
// Requests exceeding that rate limit are rejected.
// Disabled if set to 0.
"compression": false,
"enabled": true,
"idleTimeout": 0,
"heartbeat": 60000
"idleTimeout": 60000,
"rateLimit": 0
}
}
},
Expand Down
1 change: 1 addition & 0 deletions doc/2/api/errors/error-codes/network/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ description: Error codes definitions
| id / code | class / status | message | description |
| --------- | -------------- | --------| ----------- |
| network.websocket.unexpected_error<br/><pre>0x03030001</pre> | [BadRequestError](/core/2/api/errors/error-codes#badrequesterror) <pre>(400)</pre> | Caught an unexpected WebSocket error: %s | Caught an unexpected WebSocket error |
| network.websocket.ratelimit_exceeded<br/><pre>0x03030002</pre> | [TooManyRequestsError](/core/2/api/errors/error-codes#toomanyrequestserror) <pre>(429)</pre> | Rejected: too many requests received from that socket | Too many requests received from a client socket |

---

Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ x-kuzzle-config: &kuzzle-config
- kuzzle_services__memoryStorage__node__host=redis
- kuzzle_server__protocols__mqtt__enabled=true
- kuzzle_server__protocols__mqtt__developmentMode=false
- kuzzle_http__accessControlAllowOrigin=localhost
- kuzzle_limits__loginsPerSecond=50
- NODE_ENV=${NODE_ENV:-development}
- DEBUG=${DEBUG:-kuzzle:*,-kuzzle:entry-point:protocols:websocket,-kuzzle:cluster:heartbeat}
- DEBUG=${DEBUG:-kuzzle:*,-kuzzle:network:protocols:websocket,-kuzzle:cluster:heartbeat}
# - DEBUG=${DEBUG:-none}
- DEBUG_DEPTH=${DEBUG_DEPTH:-0}
- DEBUG_MAX_ARRAY_LENGTH=${DEBUG_MAX_ARRAY:-100}
Expand Down
1 change: 1 addition & 0 deletions features/Api.feature
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Feature: API

@mappings
@http
Scenario: Send Request to the HTTP JSON endpoint
Given an existing collection "nyc-open-data":"yellow-taxi"
When I send a HTTP "post" request with:
Expand Down
7 changes: 4 additions & 3 deletions features/PluginContext.feature
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ Feature: Plugin context

@realtime
Scenario: Subscribe and unsubscribe to realtime notifications
Given I successfully execute the action "functional-test-plugin/realtime":"subscribeOnce"
Given I subscribe to "test":"answer" notifications
When I successfully execute the action "realtime":"publish" with args:
| index | "test" |
| collection | "question" |
| body | {} |
Then I should have receive "1" notifications for "test":"answer"
# should not be subscribed anymore
# (see the hook 'kuzzle:state:live' in the plugin)
# should not be subscribed anymore: the plugin unsubscribes after sending
# a single notification
When I successfully execute the action "realtime":"publish" with args:
| index | "test" |
| collection | "question" |
Expand Down Expand Up @@ -84,4 +85,4 @@ Feature: Plugin context
Then I should receive an error matching:
| id | "security.rights.forbidden" |

# @todo add cluster tests
# @todo add cluster tests
14 changes: 10 additions & 4 deletions features/step_definitions/realtime-steps.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ Then('I subscribe to {string}:{string} notifications', async function (index, co
Then('I unsubscribe from the current room via the plugin', async function () {
const roomId = this.props.result.roomId;
const connectionId = this.props.result.connectionId;
const response = await this.sdk.query({

const response = await this.sdk.query({
controller: 'functional-test-plugin/accessors',
action: 'unregisterSubscription',
body: {
Expand All @@ -38,8 +38,14 @@ Then('I unsubscribe from the current room via the plugin', async function () {
this.props.result = response.result;
});

Then('I should have receive {string} notifications for {string}:{string}', function (rawNumber, index, collection) {
Then('I should have receive {string} notifications for {string}:{string}', async function (rawNumber, index, collection) {
const expectedCount = parseInt(rawNumber, 10);
const notifications = this.props.subscriptions[`${index}:${collection}`].notifications;

// retry
for (let i = 0; notifications.length < expectedCount && i < 10; i++) {
await new Promise(resolve => setTimeout(resolve, 200));
}

should(this.props.subscriptions[`${index}:${collection}`].notifications)
.have.length(expectedCount);
Expand Down Expand Up @@ -75,4 +81,4 @@ Then('I should receive realtime notifications for {string}:{string} matching:',
}, 500);
}
}, 100);
});
});
3 changes: 2 additions & 1 deletion features/support/world.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const { setWorldConstructor } = require('cucumber');
const { setDefaultTimeout, setWorldConstructor } = require('cucumber');
const { Kuzzle, WebSocket, Http } = require('kuzzle-sdk');

const config = require('../../lib/config');
Expand Down Expand Up @@ -127,5 +127,6 @@ class KuzzleWorld {
}

setWorldConstructor(KuzzleWorld);
setDefaultTimeout(30000);

module.exports = KuzzleWorld;
4 changes: 2 additions & 2 deletions lib/api/controllers/serverController.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ class ServerController extends NativeController {
* @returns {Promise<Object>}
*/
async getConfig () {
const config = Object.assign({}, global.kuzzle.config);
const config = JSON.parse(JSON.stringify(global.kuzzle.config));

// Already and more appropriately returned by server:info
config.http = undefined;
config.http.routes = undefined;

// Not a good idea to export Kuzzle's salt, hash algorithm
// and default rights
Expand Down
1 change: 0 additions & 1 deletion lib/api/funnel.js
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ class Funnel {
* @returns {Promise<Request>}
*/
async checkRights (request) {

if ( global.kuzzle.config.http.accessControlAllowOrigin === '*'
&& request.input.args.cookieAuth
) {
Expand Down
9 changes: 5 additions & 4 deletions lib/config/default.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ module.exports = {
port: 7512,
protocols: {
http: {
allowCompression: true,
enabled: true,
maxFormFileSize: '1MB',
maxEncodingLayers: 3,
allowCompression: true
maxFormFileSize: '1MB'
},
mqtt: {
enabled: false,
Expand All @@ -223,8 +223,9 @@ module.exports = {
},
websocket: {
enabled: true,
idleTimeout: 0,
heartbeat: 60000
idleTimeout: 60000,
compression: false,
rateLimit: 0,
}
}
},
Expand Down
34 changes: 34 additions & 0 deletions lib/config/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

'use strict';

const assert = require('assert');

const rc = require('rc');

const packageJson = require('../../package.json');
const kerror = require('../kerror').wrap('core', 'configuration');
const { isPlainObject } = require('../util/safeObject');
const bytes = require('../util/bytes');

/**
* Loads, interprets and checks configuration files
Expand All @@ -44,6 +47,8 @@ function load () {
config = unstringify(config);

checkLimitsConfig(config);
checkHttpOptions(config);
checkWebSocketOptions(config);

config.internal = {
hash: {
Expand Down Expand Up @@ -158,4 +163,33 @@ function checkLimitsConfig (cfg) {
}
}

function checkWebSocketOptions (config) {
const cfg = config.server.protocols.websocket;

if (cfg === undefined) {
return;
}

assert(typeof cfg.enabled === 'boolean', `[websocket] "enabled" parameter: invalid value "${cfg.enabled}" (boolean expected)`);
assert(Number.isInteger(cfg.idleTimeout) && cfg.idleTimeout >= 0, `[websocket] "idleTimeout" parameter: invalid value "${cfg.idleTimeout}" (integer >= 1000 expected)`);
assert(Number.isInteger(cfg.rateLimit) && cfg.rateLimit >= 0, `[websocket] "rateLimit" parameter: invalid value "${cfg.rateLimit}" (integer >= 0 expected)`);
assert(typeof cfg.compression === 'boolean', `[websocket] "compression" parameter: invalid value "${cfg.compression}" (boolean value expected)`);
}

function checkHttpOptions (config) {
const cfg = config.server.protocols.http;

if (cfg === undefined) {
return;
}

assert(typeof cfg.enabled === 'boolean', `[http] "enabled" parameter: invalid value "${cfg.enabled}" (boolean expected)`);
assert(typeof cfg.allowCompression === 'boolean', `[http] "allowCompression" parameter: invalid value "${cfg.allowCompression}" (boolean expected)`);
assert(Number.isInteger(cfg.maxEncodingLayers) && cfg.maxEncodingLayers >= 1, `[http] "maxEncodingLayers" parameter: invalid value "${cfg.maxEncodingLayers}" (integer >= 1 expected)`);

const maxFormFileSize = bytes(cfg.maxFormFileSize);
assert(Number.isInteger(maxFormFileSize) && maxFormFileSize >= 0, `[http] "maxFormFileSize" parameter: cannot parse "${cfg.maxFormFileSize}"`);
cfg.maxFormFileSize = maxFormFileSize;
}

module.exports = { load };
2 changes: 1 addition & 1 deletion lib/core/network/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const {
const ClientConnection = require('./clientConnection');
const ProtocolBase = require('./protocols/protocol');

const debug = require('../../util/debug')('kuzzle:entry-point:protocols');
const debug = require('../../util/debug')('kuzzle:network:protocols');

class Context {
constructor () {
Expand Down
28 changes: 9 additions & 19 deletions lib/core/network/entryPoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
'use strict';

const fs = require('fs');
const http = require('http');
const path = require('path');

const Bluebird = require('bluebird');
Expand All @@ -33,11 +32,10 @@ const WinstonSyslog = require('winston-syslog');

const { RequestContext } = require('../../api/request');
const Context = require('./context');
const debug = require('../../util/debug')('kuzzle:entry-point:embedded');
const { HttpProtocol } = require('./protocols/http');
const MqttProtocol = require('./protocols/mqtt');
const WebSocketProtocol = require('./protocols/websocket');
const InternalProtocol = require('./protocols/internal');
const debug = require('../../util/debug')('kuzzle:network:embedded');
const MqttProtocol = require('./protocols/mqttProtocol');
const InternalProtocol = require('./protocols/internalProtocol');
const HttpWsProtocol = require('./protocols/httpwsProtocol');
const Manifest = require('./protocolManifest');
const ClientConnection = require('./clientConnection');
const removeErrorStack = require('./removeErrorStack');
Expand All @@ -46,18 +44,15 @@ const kerror = require('../../kerror');
const networkError = kerror.wrap('network', 'entrypoint');

const DEFAULT_PROTOCOLS = [
HttpProtocol,
HttpWsProtocol,
MqttProtocol,
WebSocketProtocol,
InternalProtocol,
];

class EntryPoint {
constructor () {
this.config = global.kuzzle.config.server;

this.httpServer = null;

this.protocols = new Map();

this._clients = new Map();
Expand Down Expand Up @@ -116,13 +111,6 @@ class EntryPoint {

this.initLogger();

await new Promise((resolve, reject) => {
this.httpServer = http.createServer();
this.httpServer.on('error', reject);
this.httpServer.on('listening', resolve);
this.httpServer.listen(this.config.port, this.config.host);
});

const initPromises = [];

for (const protocol of this.protocols.values()) {
Expand Down Expand Up @@ -552,11 +540,13 @@ class EntryPoint {
// --------------------------------------------------------------------

_broadcast (data) {
debug('[server] broadcasting data through all protocols: %a', data);
const sanitized = removeErrorStack(data);

debug('[server] broadcasting data through all protocols: %a', sanitized);

for (const [name, protocol] of this.protocols.entries()) {
try {
protocol.broadcast(removeErrorStack(data));
protocol.broadcast(sanitized);
}
catch (e) {
global.kuzzle.log.error(`[broadcast] protocol ${name} failed: ${e.message}\n${e.stack}`);
Expand Down

0 comments on commit 306fb46

Please sign in to comment.