Skip to content

Commit

Permalink
feat: add websocket connection state events / new connection lost sta…
Browse files Browse the repository at this point in the history
…te (#597)

* feat: emit websocket events

* feat: include old value in onStateChanged

* feat: split event into chat and pubsub events

* feat: include old state in fired event

* refactor: use equality operator on enums

* fix: allow early manual connect after connection lost

* refactor: rename chat var in connection event

* fix: avoid leak from early connect after lost

* refactor: rename connection events

* docs: add javadocs for WebsocketConnectionState

Co-authored-by: Sidd <iProdigy@users.noreply.github.com>
  • Loading branch information
samfundev and iProdigy committed Jul 11, 2022
1 parent 7e44087 commit 6080cde
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 19 deletions.
2 changes: 2 additions & 0 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
Expand Up @@ -12,6 +12,7 @@
import com.github.twitch4j.chat.enums.NoticeTag;
import com.github.twitch4j.chat.enums.TMIConnectionState;
import com.github.twitch4j.chat.events.AbstractChannelEvent;
import com.github.twitch4j.chat.events.ChatConnectionStateEvent;
import com.github.twitch4j.chat.events.CommandEvent;
import com.github.twitch4j.chat.events.IRCEventHandler;
import com.github.twitch4j.chat.events.channel.ChannelJoinFailureEvent;
Expand Down Expand Up @@ -288,6 +289,7 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(baseUrl);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new ChatConnectionStateEvent(oldState, newState, this)));
spec.onConnected(this::onConnected);
spec.onTextMessage(this::onTextMessage);
spec.onDisconnecting(this::onDisconnecting);
Expand Down
@@ -0,0 +1,33 @@
package com.github.twitch4j.chat.events;

import com.github.philippheuer.events4j.core.domain.Event;
import com.github.twitch4j.chat.TwitchChat;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* Chat Connection Event
* <p>
* Called when a chat socket's connection status changes.
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class ChatConnectionStateEvent extends Event {

/**
* The previous state of the websocket.
*/
private final WebsocketConnectionState previousState;

/**
* The updated state of the websocket.
*/
private final WebsocketConnectionState state;

/**
* The chat instance whose connection status changed.
*/
private final TwitchChat connection;

}
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

@Slf4j
Expand All @@ -35,8 +36,7 @@ public class WebsocketConnection implements AutoCloseable {
/**
* connection state
*/
@Getter
private volatile WebsocketConnectionState connectionState = WebsocketConnectionState.DISCONNECTED;
private final AtomicReference<WebsocketConnectionState> connectionState = new AtomicReference<>(WebsocketConnectionState.DISCONNECTED);

/**
* Calls {@link ExponentialBackoffStrategy#reset()} upon a successful websocket connection
Expand Down Expand Up @@ -87,9 +87,9 @@ public void onConnected(WebSocket ws, Map<String, List<String>> headers) {
config.onConnected().run();

// Connection Success
connectionState = WebsocketConnectionState.CONNECTED;
setState(WebsocketConnectionState.CONNECTED);
backoffClearer = config.taskExecutor().schedule(() -> {
if (connectionState == WebsocketConnectionState.CONNECTED)
if (connectionState.get() == WebsocketConnectionState.CONNECTED)
config.backoffStrategy().reset();
}, 30, TimeUnit.SECONDS);
}
Expand All @@ -102,7 +102,9 @@ public void onTextMessage(WebSocket ws, String text) {

@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
if (!connectionState.equals(WebsocketConnectionState.DISCONNECTING)) {
if (connectionState.get() != WebsocketConnectionState.DISCONNECTING) {
closeSocket(); // avoid possible resource leak
setState(WebsocketConnectionState.LOST);
log.info("Connection to WebSocket [{}] lost! Retrying soon ...", config.baseUrl());

// connection lost - reconnecting
Expand All @@ -112,10 +114,14 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
log.debug("Maximum retry count for websocket reconnection attempts was hit.");
config.backoffStrategy().reset(); // start fresh on the next manual connect() call
} else {
config.taskExecutor().schedule(() -> reconnect(), reconnectDelay, TimeUnit.MILLISECONDS);
config.taskExecutor().schedule(() -> {
WebsocketConnectionState state = connectionState.get();
if (state != WebsocketConnectionState.CONNECTING && state != WebsocketConnectionState.CONNECTED)
reconnect();
}, reconnectDelay, TimeUnit.MILLISECONDS);
}
} else {
connectionState = WebsocketConnectionState.DISCONNECTED;
setState(WebsocketConnectionState.DISCONNECTED);
log.info("Disconnected from WebSocket [{}]!", config.baseUrl());
}
}
Expand Down Expand Up @@ -149,18 +155,29 @@ protected WebSocket createWebsocket() throws IOException {
return ws;
}

protected void setState(WebsocketConnectionState newState) {
WebsocketConnectionState oldState = connectionState.getAndSet(newState);
if (oldState != newState) {
config.onStateChanged().accept(oldState, newState);
}
}

/**
* Connect to the WebSocket
*/
@Synchronized
public void connect() {
if (connectionState.equals(WebsocketConnectionState.DISCONNECTED) || connectionState.equals(WebsocketConnectionState.RECONNECTING)) {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState == WebsocketConnectionState.DISCONNECTED || connectionState == WebsocketConnectionState.RECONNECTING || connectionState == WebsocketConnectionState.LOST) {
try {
// avoid any resource leaks
this.closeSocket();

// hook: on pre connect
config.onPreConnect().run();

// Change Connection State
connectionState = WebsocketConnectionState.CONNECTING;
setState(WebsocketConnectionState.CONNECTING);

// init websocket
webSocket = createWebsocket();
Expand Down Expand Up @@ -197,22 +214,28 @@ public void connect() {
*/
@Synchronized
public void disconnect() {
if (connectionState.equals(WebsocketConnectionState.CONNECTED)) {
WebsocketConnectionState connectionState = this.connectionState.get();

if (connectionState == WebsocketConnectionState.DISCONNECTED) {
// have already disconnected
return;
}

if (connectionState == WebsocketConnectionState.CONNECTED || connectionState == WebsocketConnectionState.LOST) {
// hook: disconnecting
config.onDisconnecting().run();

connectionState = WebsocketConnectionState.DISCONNECTING;
setState(WebsocketConnectionState.DISCONNECTING);
}

// hook: pre disconnect
config.onPreDisconnect().run();

connectionState = WebsocketConnectionState.DISCONNECTED;

// CleanUp
this.webSocket.disconnect();
this.webSocket.clearListeners();
this.webSocket = null;
this.closeSocket();

// update state
setState(WebsocketConnectionState.DISCONNECTED);

// hook: post disconnect
config.onPostDisconnect().run();
Expand All @@ -223,7 +246,7 @@ public void disconnect() {
*/
@Synchronized
public void reconnect() {
connectionState = WebsocketConnectionState.RECONNECTING;
setState(WebsocketConnectionState.RECONNECTING);
disconnect();
connect();
}
Expand All @@ -236,7 +259,8 @@ public void reconnect() {
*/
public boolean sendText(String message) {
// only send if state is CONNECTING or CONNECTED
if (!connectionState.equals(WebsocketConnectionState.CONNECTED) && !connectionState.equals(WebsocketConnectionState.CONNECTING)) {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState != WebsocketConnectionState.CONNECTED && connectionState != WebsocketConnectionState.CONNECTING) {
return false;
}

Expand All @@ -245,8 +269,25 @@ public boolean sendText(String message) {
return true;
}

/**
* @return the socket's connection state
*/
public WebsocketConnectionState getConnectionState() {
return connectionState.get();
}

@Override
public void close() throws Exception {
disconnect();
}

@Synchronized
private void closeSocket() {
if (webSocket != null) {
this.webSocket.disconnect();
this.webSocket.clearListeners();
this.webSocket = null;
}
}

}
@@ -1,5 +1,6 @@
package com.github.twitch4j.client.websocket;

import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
import com.github.twitch4j.util.IBackoffStrategy;
Expand All @@ -12,6 +13,7 @@
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

@Accessors(chain = true, fluent = true, prefix = "")
Expand All @@ -36,6 +38,7 @@ public void validate() {
}
Objects.requireNonNull(taskExecutor, "taskExecutor may not be null!");
Objects.requireNonNull(backoffStrategy, "backoffStrategy may not be null!");
Objects.requireNonNull(onStateChanged, "onStateChanged may not be null!");
Objects.requireNonNull(onPreConnect, "onPreConnect may not be null!");
Objects.requireNonNull(onPostConnect, "onPostConnect may not be null!");
Objects.requireNonNull(onConnected, "onConnected may not be null!");
Expand Down Expand Up @@ -77,6 +80,11 @@ public void validate() {
.maximumBackoff(Duration.ofMinutes(5).toMillis())
.build();

/**
* called when the websocket's state changes
*/
private BiConsumer<WebsocketConnectionState, WebsocketConnectionState> onStateChanged = (oldState, newState) -> {};

/**
* called before connecting
*/
Expand Down
@@ -1,9 +1,45 @@
package com.github.twitch4j.client.websocket.domain;

import com.github.twitch4j.client.websocket.WebsocketConnection;

/**
* The state of a {@link WebsocketConnection}.
*/
public enum WebsocketConnectionState {

/**
* The websocket is in the process of disconnecting after a call to {@link WebsocketConnection#disconnect()}.
*/
DISCONNECTING,

/**
* The websocket has started its reconnection procedure after a call to {@link WebsocketConnection#reconnect()}.
*/
RECONNECTING,

/**
* The websocket is (deliberately) fully disconnected.
* <p>
* This state will eventually be set after a call to {@link WebsocketConnection#disconnect()}.
* This is also the initial state upon creation of {@link WebsocketConnection}.
*/
DISCONNECTED,

/**
* The websocket has started its connection procedure after a call to {@link WebsocketConnection#connect()}.
*/
CONNECTING,
CONNECTED

/**
* The websocket has established a connection after completing the connection handshake procedure.
*/
CONNECTED,

/**
* The websocket inadvertently lost connection.
* <p>
* This could occur for a variety of reasons including: network issues, firewall changes, or even a server-side crash.
*/
LOST

}
Expand Up @@ -20,6 +20,7 @@ public void testFullConfiguration() throws IOException {
spec.onPreConnect(() -> System.out.println("on-pre-connect"));
spec.onPostConnect(() -> System.out.println("on-post-connect"));
spec.onConnected(() -> System.out.println("on-connected"));
spec.onStateChanged((oldState, newState) -> System.out.println("on-state-change"));
spec.onTextMessage((text) -> System.out.println("on-text-message"));
spec.onDisconnecting(() -> System.out.println("on-disconnecting"));
spec.onPreDisconnect(() -> System.out.println("on-pre-disconnect"));
Expand Down
Expand Up @@ -104,6 +104,7 @@
import com.github.twitch4j.pubsub.events.PredictionCreatedEvent;
import com.github.twitch4j.pubsub.events.PredictionUpdatedEvent;
import com.github.twitch4j.pubsub.events.PresenceSettingsEvent;
import com.github.twitch4j.pubsub.events.PubSubConnectionStateEvent;
import com.github.twitch4j.pubsub.events.PubSubListenResponseEvent;
import com.github.twitch4j.pubsub.events.RadioEvent;
import com.github.twitch4j.pubsub.events.RaidCancelEvent;
Expand Down Expand Up @@ -255,6 +256,7 @@ public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventM
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(WEB_SOCKET_SERVER);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new PubSubConnectionStateEvent(oldState, newState, this)));
spec.onPreConnect(this::onPreConnect);
spec.onConnected(this::onConnected);
spec.onTextMessage(this::onTextMessage);
Expand Down
@@ -0,0 +1,33 @@
package com.github.twitch4j.pubsub.events;

import com.github.philippheuer.events4j.core.domain.Event;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.pubsub.TwitchPubSub;
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* PubSub Connection Event
* <p>
* Called when a PubSub socket's connection status changes.
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class PubSubConnectionStateEvent extends Event {

/**
* The previous state of the websocket.
*/
private final WebsocketConnectionState previousState;

/**
* The updated state of the websocket.
*/
private final WebsocketConnectionState state;

/**
* The PubSub instance whose connection status changed.
*/
private final TwitchPubSub connection;

}

0 comments on commit 6080cde

Please sign in to comment.