Skip to content

Commit

Permalink
feat: emit websocket events
Browse files Browse the repository at this point in the history
  • Loading branch information
samfundev committed Jul 7, 2022
1 parent bec475b commit 0debe56
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 8 deletions.
2 changes: 2 additions & 0 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
Expand Up @@ -22,6 +22,7 @@
import com.github.twitch4j.chat.events.channel.UserStateEvent;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.client.websocket.events.WebsocketStateEvent;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.BucketUtils;
import com.github.twitch4j.common.util.CryptoUtils;
Expand Down Expand Up @@ -288,6 +289,7 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(baseUrl);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((state) -> eventManager.publish(new WebsocketStateEvent(state, this)));
spec.onConnected(this::onConnected);
spec.onTextMessage(this::onTextMessage);
spec.onDisconnecting(this::onDisconnecting);
Expand Down
Expand Up @@ -87,7 +87,7 @@ public void onConnected(WebSocket ws, Map<String, List<String>> headers) {
config.onConnected().run();

// Connection Success
connectionState = WebsocketConnectionState.CONNECTED;
setState(WebsocketConnectionState.CONNECTED);
backoffClearer = config.taskExecutor().schedule(() -> {
if (connectionState == WebsocketConnectionState.CONNECTED)
config.backoffStrategy().reset();
Expand All @@ -103,6 +103,7 @@ public void onTextMessage(WebSocket ws, String text) {
@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
if (!connectionState.equals(WebsocketConnectionState.DISCONNECTING)) {
setState(WebsocketConnectionState.LOST);
log.info("Connection to WebSocket [{}] lost! Retrying soon ...", config.baseUrl());

// connection lost - reconnecting
Expand All @@ -115,7 +116,7 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
config.taskExecutor().schedule(() -> reconnect(), reconnectDelay, TimeUnit.MILLISECONDS);
}
} else {
connectionState = WebsocketConnectionState.DISCONNECTED;
setState(WebsocketConnectionState.DISCONNECTED);
log.info("Disconnected from WebSocket [{}]!", config.baseUrl());
}
}
Expand Down Expand Up @@ -149,6 +150,11 @@ protected WebSocket createWebsocket() throws IOException {
return ws;
}

protected void setState(WebsocketConnectionState state) {
connectionState = state;
config.onStateChanged().accept(state);
}

/**
* Connect to the WebSocket
*/
Expand All @@ -160,7 +166,7 @@ public void connect() {
config.onPreConnect().run();

// Change Connection State
connectionState = WebsocketConnectionState.CONNECTING;
setState(WebsocketConnectionState.CONNECTING);

// init websocket
webSocket = createWebsocket();
Expand Down Expand Up @@ -197,17 +203,17 @@ public void connect() {
*/
@Synchronized
public void disconnect() {
if (connectionState.equals(WebsocketConnectionState.CONNECTED)) {
if (connectionState.equals(WebsocketConnectionState.CONNECTED) || connectionState.equals(WebsocketConnectionState.LOST)) {
// hook: disconnecting
config.onDisconnecting().run();

connectionState = WebsocketConnectionState.DISCONNECTING;
setState(WebsocketConnectionState.DISCONNECTING);
}

// hook: pre disconnect
config.onPreDisconnect().run();

connectionState = WebsocketConnectionState.DISCONNECTED;
setState(WebsocketConnectionState.DISCONNECTED);

// CleanUp
this.webSocket.disconnect();
Expand All @@ -223,7 +229,7 @@ public void disconnect() {
*/
@Synchronized
public void reconnect() {
connectionState = WebsocketConnectionState.RECONNECTING;
setState(WebsocketConnectionState.RECONNECTING);
disconnect();
connect();
}
Expand Down
@@ -1,5 +1,6 @@
package com.github.twitch4j.client.websocket;

import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
import com.github.twitch4j.util.IBackoffStrategy;
Expand Down Expand Up @@ -36,6 +37,7 @@ public void validate() {
}
Objects.requireNonNull(taskExecutor, "taskExecutor may not be null!");
Objects.requireNonNull(backoffStrategy, "backoffStrategy may not be null!");
Objects.requireNonNull(onStateChanged, "onStateChanged may not be null!");
Objects.requireNonNull(onPreConnect, "onPreConnect may not be null!");
Objects.requireNonNull(onPostConnect, "onPostConnect may not be null!");
Objects.requireNonNull(onConnected, "onConnected may not be null!");
Expand Down Expand Up @@ -77,6 +79,11 @@ public void validate() {
.maximumBackoff(Duration.ofMinutes(5).toMillis())
.build();

/**
* called when the websocket's state changes
*/
private Consumer<WebsocketConnectionState> onStateChanged = (state) -> {};

/**
* called before connecting
*/
Expand Down
Expand Up @@ -5,5 +5,6 @@ public enum WebsocketConnectionState {
RECONNECTING,
DISCONNECTED,
CONNECTING,
CONNECTED
CONNECTED,
LOST
}
@@ -0,0 +1,26 @@
package com.github.twitch4j.client.websocket.events;

import com.github.philippheuer.events4j.core.domain.Event;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* Websocket State Event
*
* Called when the websocket changes it's connection state.
*/
@Data
@EqualsAndHashCode(callSuper=false)
public class WebsocketStateEvent extends Event {

/**
* The state of the websocket
*/
private final WebsocketConnectionState state;

/**
* The parent of the websocket
*/
private final AutoCloseable parent;
}
Expand Up @@ -5,6 +5,7 @@
import com.github.philippheuer.events4j.core.EventManager;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.client.websocket.events.WebsocketStateEvent;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.enums.CommandPermission;
import com.github.twitch4j.common.events.domain.EventUser;
Expand Down Expand Up @@ -255,6 +256,7 @@ public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventM
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(WEB_SOCKET_SERVER);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((state) -> eventManager.publish(new WebsocketStateEvent(state, this)));
spec.onConnected(this::onConnected);
spec.onTextMessage(this::onTextMessage);
spec.taskExecutor(taskExecutor);
Expand Down

0 comments on commit 0debe56

Please sign in to comment.