Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add websocket connection state events / new connection lost state #597

Merged
merged 11 commits into from Jul 11, 2022
2 changes: 2 additions & 0 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
Expand Up @@ -12,6 +12,7 @@
import com.github.twitch4j.chat.enums.NoticeTag;
import com.github.twitch4j.chat.enums.TMIConnectionState;
import com.github.twitch4j.chat.events.AbstractChannelEvent;
import com.github.twitch4j.chat.events.ChatConnectionStateEvent;
import com.github.twitch4j.chat.events.CommandEvent;
import com.github.twitch4j.chat.events.IRCEventHandler;
import com.github.twitch4j.chat.events.channel.ChannelJoinFailureEvent;
Expand Down Expand Up @@ -288,6 +289,7 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(baseUrl);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new ChatConnectionStateEvent(oldState, newState, this)));
spec.onConnected(this::onConnected);
spec.onTextMessage(this::onTextMessage);
spec.onDisconnecting(this::onDisconnecting);
Expand Down
@@ -0,0 +1,33 @@
package com.github.twitch4j.chat.events;

import com.github.philippheuer.events4j.core.domain.Event;
import com.github.twitch4j.chat.TwitchChat;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* Chat Connection Event
* <p>
* Called when a chat socket's connection status changes.
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class ChatConnectionStateEvent extends Event {

/**
* The previous state of the websocket.
*/
private final WebsocketConnectionState previousState;

/**
* The updated state of the websocket.
*/
private final WebsocketConnectionState state;

/**
* The chat instance whose connection status changed.
*/
private final TwitchChat connection;

}
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

@Slf4j
Expand All @@ -35,8 +36,7 @@ public class WebsocketConnection implements AutoCloseable {
/**
* connection state
*/
@Getter
private volatile WebsocketConnectionState connectionState = WebsocketConnectionState.DISCONNECTED;
private final AtomicReference<WebsocketConnectionState> connectionState = new AtomicReference<>(WebsocketConnectionState.DISCONNECTED);

/**
* Calls {@link ExponentialBackoffStrategy#reset()} upon a successful websocket connection
Expand Down Expand Up @@ -87,9 +87,9 @@ public void onConnected(WebSocket ws, Map<String, List<String>> headers) {
config.onConnected().run();

// Connection Success
connectionState = WebsocketConnectionState.CONNECTED;
setState(WebsocketConnectionState.CONNECTED);
backoffClearer = config.taskExecutor().schedule(() -> {
if (connectionState == WebsocketConnectionState.CONNECTED)
if (connectionState.get() == WebsocketConnectionState.CONNECTED)
config.backoffStrategy().reset();
}, 30, TimeUnit.SECONDS);
}
Expand All @@ -102,7 +102,9 @@ public void onTextMessage(WebSocket ws, String text) {

@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
if (!connectionState.equals(WebsocketConnectionState.DISCONNECTING)) {
if (connectionState.get() != WebsocketConnectionState.DISCONNECTING) {
closeSocket(); // avoid possible resource leak
setState(WebsocketConnectionState.LOST);
log.info("Connection to WebSocket [{}] lost! Retrying soon ...", config.baseUrl());

// connection lost - reconnecting
Expand All @@ -112,10 +114,14 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
log.debug("Maximum retry count for websocket reconnection attempts was hit.");
config.backoffStrategy().reset(); // start fresh on the next manual connect() call
} else {
config.taskExecutor().schedule(() -> reconnect(), reconnectDelay, TimeUnit.MILLISECONDS);
config.taskExecutor().schedule(() -> {
WebsocketConnectionState state = connectionState.get();
if (state != WebsocketConnectionState.CONNECTING && state != WebsocketConnectionState.CONNECTED)
reconnect();
}, reconnectDelay, TimeUnit.MILLISECONDS);
}
} else {
connectionState = WebsocketConnectionState.DISCONNECTED;
setState(WebsocketConnectionState.DISCONNECTED);
log.info("Disconnected from WebSocket [{}]!", config.baseUrl());
}
}
Expand Down Expand Up @@ -149,18 +155,29 @@ protected WebSocket createWebsocket() throws IOException {
return ws;
}

protected void setState(WebsocketConnectionState newState) {
WebsocketConnectionState oldState = connectionState.getAndSet(newState);
if (oldState != newState) {
config.onStateChanged().accept(oldState, newState);
}
}

/**
* Connect to the WebSocket
*/
@Synchronized
public void connect() {
if (connectionState.equals(WebsocketConnectionState.DISCONNECTED) || connectionState.equals(WebsocketConnectionState.RECONNECTING)) {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState == WebsocketConnectionState.DISCONNECTED || connectionState == WebsocketConnectionState.RECONNECTING || connectionState == WebsocketConnectionState.LOST) {
try {
// avoid any resource leaks
this.closeSocket();

// hook: on pre connect
config.onPreConnect().run();

// Change Connection State
connectionState = WebsocketConnectionState.CONNECTING;
setState(WebsocketConnectionState.CONNECTING);

// init websocket
webSocket = createWebsocket();
Expand Down Expand Up @@ -197,22 +214,28 @@ public void connect() {
*/
@Synchronized
public void disconnect() {
if (connectionState.equals(WebsocketConnectionState.CONNECTED)) {
WebsocketConnectionState connectionState = this.connectionState.get();

if (connectionState == WebsocketConnectionState.DISCONNECTED) {
// have already disconnected
return;
}

if (connectionState == WebsocketConnectionState.CONNECTED || connectionState == WebsocketConnectionState.LOST) {
// hook: disconnecting
config.onDisconnecting().run();

connectionState = WebsocketConnectionState.DISCONNECTING;
setState(WebsocketConnectionState.DISCONNECTING);
}

// hook: pre disconnect
config.onPreDisconnect().run();

connectionState = WebsocketConnectionState.DISCONNECTED;

// CleanUp
this.webSocket.disconnect();
this.webSocket.clearListeners();
this.webSocket = null;
this.closeSocket();

// update state
setState(WebsocketConnectionState.DISCONNECTED);

// hook: post disconnect
config.onPostDisconnect().run();
Expand All @@ -223,7 +246,7 @@ public void disconnect() {
*/
@Synchronized
public void reconnect() {
connectionState = WebsocketConnectionState.RECONNECTING;
setState(WebsocketConnectionState.RECONNECTING);
disconnect();
connect();
}
Expand All @@ -236,7 +259,8 @@ public void reconnect() {
*/
public boolean sendText(String message) {
// only send if state is CONNECTING or CONNECTED
if (!connectionState.equals(WebsocketConnectionState.CONNECTED) && !connectionState.equals(WebsocketConnectionState.CONNECTING)) {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState != WebsocketConnectionState.CONNECTED && connectionState != WebsocketConnectionState.CONNECTING) {
return false;
}

Expand All @@ -245,8 +269,25 @@ public boolean sendText(String message) {
return true;
}

/**
* @return the socket's connection state
*/
public WebsocketConnectionState getConnectionState() {
return connectionState.get();
}

@Override
public void close() throws Exception {
disconnect();
}

@Synchronized
private void closeSocket() {
if (webSocket != null) {
this.webSocket.disconnect();
this.webSocket.clearListeners();
this.webSocket = null;
}
}

}
@@ -1,5 +1,6 @@
package com.github.twitch4j.client.websocket;

import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
import com.github.twitch4j.util.IBackoffStrategy;
Expand All @@ -12,6 +13,7 @@
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

@Accessors(chain = true, fluent = true, prefix = "")
Expand All @@ -36,6 +38,7 @@ public void validate() {
}
Objects.requireNonNull(taskExecutor, "taskExecutor may not be null!");
Objects.requireNonNull(backoffStrategy, "backoffStrategy may not be null!");
Objects.requireNonNull(onStateChanged, "onStateChanged may not be null!");
Objects.requireNonNull(onPreConnect, "onPreConnect may not be null!");
Objects.requireNonNull(onPostConnect, "onPostConnect may not be null!");
Objects.requireNonNull(onConnected, "onConnected may not be null!");
Expand Down Expand Up @@ -77,6 +80,11 @@ public void validate() {
.maximumBackoff(Duration.ofMinutes(5).toMillis())
.build();

/**
* called when the websocket's state changes
*/
private BiConsumer<WebsocketConnectionState, WebsocketConnectionState> onStateChanged = (oldState, newState) -> {};

/**
* called before connecting
*/
Expand Down
@@ -1,9 +1,45 @@
package com.github.twitch4j.client.websocket.domain;

import com.github.twitch4j.client.websocket.WebsocketConnection;

/**
* The state of a {@link WebsocketConnection}.
*/
public enum WebsocketConnectionState {

/**
* The websocket is in the process of disconnecting after a call to {@link WebsocketConnection#disconnect()}.
*/
DISCONNECTING,

/**
* The websocket has started its reconnection procedure after a call to {@link WebsocketConnection#reconnect()}.
*/
RECONNECTING,

/**
* The websocket is (deliberately) fully disconnected.
* <p>
* This state will eventually be set after a call to {@link WebsocketConnection#disconnect()}.
* This is also the initial state upon creation of {@link WebsocketConnection}.
*/
DISCONNECTED,

/**
* The websocket has started its connection procedure after a call to {@link WebsocketConnection#connect()}.
*/
CONNECTING,
CONNECTED

/**
* The websocket has established a connection after completing the connection handshake procedure.
*/
CONNECTED,

/**
* The websocket inadvertently lost connection.
* <p>
* This could occur for a variety of reasons including: network issues, firewall changes, or even a server-side crash.
*/
LOST

}
Expand Up @@ -20,6 +20,7 @@ public void testFullConfiguration() throws IOException {
spec.onPreConnect(() -> System.out.println("on-pre-connect"));
spec.onPostConnect(() -> System.out.println("on-post-connect"));
spec.onConnected(() -> System.out.println("on-connected"));
spec.onStateChanged((oldState, newState) -> System.out.println("on-state-change"));
spec.onTextMessage((text) -> System.out.println("on-text-message"));
spec.onDisconnecting(() -> System.out.println("on-disconnecting"));
spec.onPreDisconnect(() -> System.out.println("on-pre-disconnect"));
Expand Down
Expand Up @@ -104,6 +104,7 @@
import com.github.twitch4j.pubsub.events.PredictionCreatedEvent;
import com.github.twitch4j.pubsub.events.PredictionUpdatedEvent;
import com.github.twitch4j.pubsub.events.PresenceSettingsEvent;
import com.github.twitch4j.pubsub.events.PubSubConnectionStateEvent;
import com.github.twitch4j.pubsub.events.PubSubListenResponseEvent;
import com.github.twitch4j.pubsub.events.RadioEvent;
import com.github.twitch4j.pubsub.events.RaidCancelEvent;
Expand Down Expand Up @@ -255,6 +256,7 @@ public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventM
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(WEB_SOCKET_SERVER);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new PubSubConnectionStateEvent(oldState, newState, this)));
spec.onPreConnect(this::onPreConnect);
spec.onConnected(this::onConnected);
spec.onTextMessage(this::onTextMessage);
Expand Down
@@ -0,0 +1,33 @@
package com.github.twitch4j.pubsub.events;

import com.github.philippheuer.events4j.core.domain.Event;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.pubsub.TwitchPubSub;
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* PubSub Connection Event
* <p>
* Called when a PubSub socket's connection status changes.
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class PubSubConnectionStateEvent extends Event {

/**
* The previous state of the websocket.
*/
private final WebsocketConnectionState previousState;

/**
* The updated state of the websocket.
*/
private final WebsocketConnectionState state;

/**
* The PubSub instance whose connection status changed.
*/
private final TwitchPubSub connection;

}