Skip to content

Commit

Permalink
feat: split the events of the Manager and Socket
Browse files Browse the repository at this point in the history
Previously, most of the events emitted by the Manager were also emitted
by the Socket instances, but it was somehow misleading for the end
users because they don't have the same meaning:

- Manager: the state of the low-level connection (with connection and reconnection events)
- Socket: the state of the connection to the Namespace (only 'connect', 'disconnect' and 'error')

For example, the `reconnect` event:

```js
socket.on("reconnect", () => {
  console.log(socket.connected); // might be false, which is a bit surprising
});
```

Breaking change: the Socket instance will no longer forward the events
of its Manager

Those events can still be accessed on the Manager instance though:

```js
socket.io.on("reconnect", () => {
  // ...
});
```
  • Loading branch information
darrachequesne committed Oct 12, 2020
1 parent 6cd2e4e commit 132f8ec
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 227 deletions.
10 changes: 2 additions & 8 deletions build/manager.d.ts
Expand Up @@ -27,12 +27,6 @@ export declare class Manager extends Emitter {
* @api public
*/
constructor(uri: any, opts: any);
/**
* Propagate given event to sockets and emit on `this`
*
* @api private
*/
emitAll(event: string, arg?: any): void;
/**
* Sets the `reconnection` config.
*
Expand Down Expand Up @@ -87,8 +81,8 @@ export declare class Manager extends Emitter {
* @return {Manager} self
* @api public
*/
open(fn?: any, opts?: any): this;
connect(fn: any, opts: any): this;
open(fn?: any, opts?: any): Manager;
connect(fn: any, opts: any): Manager;
/**
* Called upon transport open.
*
Expand Down
94 changes: 14 additions & 80 deletions build/manager.js
Expand Up @@ -76,19 +76,6 @@ class Manager extends component_emitter_1.default {
if (this.autoConnect)
this.open();
}
/**
* Propagate given event to sockets and emit on `this`
*
* @api private
*/
emitAll(event, arg) {
super.emit(event, arg);
for (let nsp in this.nsps) {
if (has.call(this.nsps, nsp)) {
this.nsps[nsp].emit(event, arg);
}
}
}
/**
* Sets the `reconnection` config.
*
Expand Down Expand Up @@ -200,11 +187,11 @@ class Manager extends component_emitter_1.default {
fn && fn();
});
// emit `connect_error`
const errorSub = on_1.on(socket, "error", function (data) {
const errorSub = on_1.on(socket, "error", (data) => {
debug("connect_error");
self.cleanup();
self.readyState = "closed";
self.emitAll("connect_error", data);
super.emit("connect_error", data);
if (fn) {
const err = new Error("Connection error");
// err.data = data;
Expand All @@ -223,12 +210,12 @@ class Manager extends component_emitter_1.default {
openSub.destroy(); // prevents a race condition with the 'open' event
}
// set timer
const timer = setTimeout(function () {
const timer = setTimeout(() => {
debug("connect attempt timed out after %d", timeout);
openSub.destroy();
socket.close();
socket.emit("error", "timeout");
self.emitAll("connect_timeout", timeout);
super.emit("connect_timeout", "timeout");
}, timeout);
this.subs.push({
destroy: function () {
Expand All @@ -241,60 +228,7 @@ class Manager extends component_emitter_1.default {
return this;
}
connect(fn, opts) {
debug("readyState %s", this.readyState);
if (~this.readyState.indexOf("open"))
return this;
debug("opening %s", this.uri);
this.engine = engine_io_client_1.default(this.uri, this.opts);
const socket = this.engine;
const self = this;
this.readyState = "opening";
this.skipReconnect = false;
// emit `open`
const openSub = on_1.on(socket, "open", function () {
self.onopen();
fn && fn();
});
// emit `connect_error`
const errorSub = on_1.on(socket, "error", function (data) {
debug("connect_error");
self.cleanup();
self.readyState = "closed";
self.emitAll("connect_error", data);
if (fn) {
const err = new Error("Connection error");
// err.data = data;
fn(err);
}
else {
// Only do this if there is no fn to handle the error
self.maybeReconnectOnOpen();
}
});
// emit `connect_timeout`
if (false !== this._timeout) {
const timeout = this._timeout;
debug("connect attempt will timeout after %d", timeout);
if (timeout === 0) {
openSub.destroy(); // prevents a race condition with the 'open' event
}
// set timer
const timer = setTimeout(function () {
debug("connect attempt timed out after %d", timeout);
openSub.destroy();
socket.close();
socket.emit("error", "timeout");
self.emitAll("connect_timeout", timeout);
}, timeout);
this.subs.push({
destroy: function () {
clearTimeout(timer);
},
});
}
this.subs.push(openSub);
this.subs.push(errorSub);
return this;
return this.open(fn, opts);
}
/**
* Called upon transport open.
Expand Down Expand Up @@ -322,7 +256,7 @@ class Manager extends component_emitter_1.default {
* @api private
*/
onping() {
this.emitAll("ping");
super.emit("ping");
}
/**
* Called with data.
Expand All @@ -347,7 +281,7 @@ class Manager extends component_emitter_1.default {
*/
onerror(err) {
debug("error", err);
this.emitAll("error", err);
super.emit("error", err);
}
/**
* Creates a new socket for the given `nsp`.
Expand Down Expand Up @@ -476,28 +410,28 @@ class Manager extends component_emitter_1.default {
if (this.backoff.attempts >= this._reconnectionAttempts) {
debug("reconnect failed");
this.backoff.reset();
this.emitAll("reconnect_failed");
super.emit("reconnect_failed");
this.reconnecting = false;
}
else {
const delay = this.backoff.duration();
debug("will wait %dms before reconnect attempt", delay);
this.reconnecting = true;
const timer = setTimeout(function () {
const timer = setTimeout(() => {
if (self.skipReconnect)
return;
debug("attempting reconnect");
self.emitAll("reconnect_attempt", self.backoff.attempts);
self.emitAll("reconnecting", self.backoff.attempts);
super.emit("reconnect_attempt", self.backoff.attempts);
super.emit("reconnecting", self.backoff.attempts);
// check again for the case socket closed in above events
if (self.skipReconnect)
return;
self.open(function (err) {
self.open((err) => {
if (err) {
debug("reconnect attempt error");
self.reconnecting = false;
self.reconnect();
self.emitAll("reconnect_error", err.data);
super.emit("reconnect_error", err.data);
}
else {
debug("reconnect success");
Expand All @@ -521,7 +455,7 @@ class Manager extends component_emitter_1.default {
const attempt = this.backoff.attempts;
this.reconnecting = false;
this.backoff.reset();
this.emitAll("reconnect", attempt);
super.emit("reconnect", attempt);
}
}
exports.Manager = Manager;
106 changes: 16 additions & 90 deletions lib/manager.ts
Expand Up @@ -76,20 +76,6 @@ export class Manager extends Emitter {
if (this.autoConnect) this.open();
}

/**
* Propagate given event to sockets and emit on `this`
*
* @api private
*/
emitAll(event: string, arg?) {
super.emit(event, arg);
for (let nsp in this.nsps) {
if (has.call(this.nsps, nsp)) {
this.nsps[nsp].emit(event, arg);
}
}
}

/**
* Sets the `reconnection` config.
*
Expand Down Expand Up @@ -188,7 +174,7 @@ export class Manager extends Emitter {
* @return {Manager} self
* @api public
*/
open(fn?, opts?) {
open(fn?, opts?): Manager {
debug("readyState %s", this.readyState);
if (~this.readyState.indexOf("open")) return this;

Expand All @@ -206,11 +192,11 @@ export class Manager extends Emitter {
});

// emit `connect_error`
const errorSub = on(socket, "error", function (data) {
const errorSub = on(socket, "error", (data) => {
debug("connect_error");
self.cleanup();
self.readyState = "closed";
self.emitAll("connect_error", data);
super.emit("connect_error", data);
if (fn) {
const err = new Error("Connection error");
// err.data = data;
Expand All @@ -231,12 +217,12 @@ export class Manager extends Emitter {
}

// set timer
const timer = setTimeout(function () {
const timer = setTimeout(() => {
debug("connect attempt timed out after %d", timeout);
openSub.destroy();
socket.close();
socket.emit("error", "timeout");
self.emitAll("connect_timeout", timeout);
super.emit("connect_timeout", "timeout");
}, timeout);

this.subs.push({
Expand All @@ -252,68 +238,8 @@ export class Manager extends Emitter {
return this;
}

connect(fn, opts) {
debug("readyState %s", this.readyState);
if (~this.readyState.indexOf("open")) return this;

debug("opening %s", this.uri);
this.engine = eio(this.uri, this.opts);
const socket = this.engine;
const self = this;
this.readyState = "opening";
this.skipReconnect = false;

// emit `open`
const openSub = on(socket, "open", function () {
self.onopen();
fn && fn();
});

// emit `connect_error`
const errorSub = on(socket, "error", function (data) {
debug("connect_error");
self.cleanup();
self.readyState = "closed";
self.emitAll("connect_error", data);
if (fn) {
const err = new Error("Connection error");
// err.data = data;
fn(err);
} else {
// Only do this if there is no fn to handle the error
self.maybeReconnectOnOpen();
}
});

// emit `connect_timeout`
if (false !== this._timeout) {
const timeout = this._timeout;
debug("connect attempt will timeout after %d", timeout);

if (timeout === 0) {
openSub.destroy(); // prevents a race condition with the 'open' event
}

// set timer
const timer = setTimeout(function () {
debug("connect attempt timed out after %d", timeout);
openSub.destroy();
socket.close();
socket.emit("error", "timeout");
self.emitAll("connect_timeout", timeout);
}, timeout);

this.subs.push({
destroy: function () {
clearTimeout(timer);
},
});
}

this.subs.push(openSub);
this.subs.push(errorSub);

return this;
connect(fn, opts): Manager {
return this.open(fn, opts);
}

/**
Expand Down Expand Up @@ -346,7 +272,7 @@ export class Manager extends Emitter {
* @api private
*/
onping() {
this.emitAll("ping");
super.emit("ping");
}

/**
Expand Down Expand Up @@ -374,7 +300,7 @@ export class Manager extends Emitter {
*/
onerror(err) {
debug("error", err);
this.emitAll("error", err);
super.emit("error", err);
}

/**
Expand Down Expand Up @@ -516,29 +442,29 @@ export class Manager extends Emitter {
if (this.backoff.attempts >= this._reconnectionAttempts) {
debug("reconnect failed");
this.backoff.reset();
this.emitAll("reconnect_failed");
super.emit("reconnect_failed");
this.reconnecting = false;
} else {
const delay = this.backoff.duration();
debug("will wait %dms before reconnect attempt", delay);

this.reconnecting = true;
const timer = setTimeout(function () {
const timer = setTimeout(() => {
if (self.skipReconnect) return;

debug("attempting reconnect");
self.emitAll("reconnect_attempt", self.backoff.attempts);
self.emitAll("reconnecting", self.backoff.attempts);
super.emit("reconnect_attempt", self.backoff.attempts);
super.emit("reconnecting", self.backoff.attempts);

// check again for the case socket closed in above events
if (self.skipReconnect) return;

self.open(function (err) {
self.open((err) => {
if (err) {
debug("reconnect attempt error");
self.reconnecting = false;
self.reconnect();
self.emitAll("reconnect_error", err.data);
super.emit("reconnect_error", err.data);
} else {
debug("reconnect success");
self.onreconnect();
Expand All @@ -563,6 +489,6 @@ export class Manager extends Emitter {
const attempt = this.backoff.attempts;
this.reconnecting = false;
this.backoff.reset();
this.emitAll("reconnect", attempt);
super.emit("reconnect", attempt);
}
}

0 comments on commit 132f8ec

Please sign in to comment.