Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add websocket connection state events / new connection lost state #597

Merged
merged 11 commits into from Jul 11, 2022
2 changes: 2 additions & 0 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
Expand Up @@ -22,6 +22,7 @@
import com.github.twitch4j.chat.events.channel.UserStateEvent;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.client.websocket.events.WebsocketStateEvent;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.BucketUtils;
import com.github.twitch4j.common.util.CryptoUtils;
Expand Down Expand Up @@ -288,6 +289,7 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(baseUrl);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new WebsocketStateEvent(newState, this)));
spec.onConnected(this::onConnected);
spec.onTextMessage(this::onTextMessage);
spec.onDisconnecting(this::onDisconnecting);
Expand Down
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

@Slf4j
Expand All @@ -35,8 +36,7 @@ public class WebsocketConnection implements AutoCloseable {
/**
* connection state
*/
@Getter
private volatile WebsocketConnectionState connectionState = WebsocketConnectionState.DISCONNECTED;
private final AtomicReference<WebsocketConnectionState> connectionState = new AtomicReference<>(WebsocketConnectionState.DISCONNECTED);

/**
* Calls {@link ExponentialBackoffStrategy#reset()} upon a successful websocket connection
Expand Down Expand Up @@ -87,9 +87,9 @@ public void onConnected(WebSocket ws, Map<String, List<String>> headers) {
config.onConnected().run();

// Connection Success
connectionState = WebsocketConnectionState.CONNECTED;
setState(WebsocketConnectionState.CONNECTED);
backoffClearer = config.taskExecutor().schedule(() -> {
if (connectionState == WebsocketConnectionState.CONNECTED)
if (connectionState.get() == WebsocketConnectionState.CONNECTED)
config.backoffStrategy().reset();
}, 30, TimeUnit.SECONDS);
}
Expand All @@ -102,7 +102,8 @@ public void onTextMessage(WebSocket ws, String text) {

@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
if (!connectionState.equals(WebsocketConnectionState.DISCONNECTING)) {
if (connectionState.get() != WebsocketConnectionState.DISCONNECTING) {
setState(WebsocketConnectionState.LOST);
log.info("Connection to WebSocket [{}] lost! Retrying soon ...", config.baseUrl());

// connection lost - reconnecting
Expand All @@ -115,7 +116,7 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
config.taskExecutor().schedule(() -> reconnect(), reconnectDelay, TimeUnit.MILLISECONDS);
}
} else {
connectionState = WebsocketConnectionState.DISCONNECTED;
setState(WebsocketConnectionState.DISCONNECTED);
log.info("Disconnected from WebSocket [{}]!", config.baseUrl());
}
}
Expand Down Expand Up @@ -149,18 +150,26 @@ protected WebSocket createWebsocket() throws IOException {
return ws;
}

protected void setState(WebsocketConnectionState newState) {
WebsocketConnectionState oldState = connectionState.getAndSet(newState);
if (oldState != newState) {
config.onStateChanged().accept(oldState, newState);
}
}

/**
* Connect to the WebSocket
*/
@Synchronized
public void connect() {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState.equals(WebsocketConnectionState.DISCONNECTED) || connectionState.equals(WebsocketConnectionState.RECONNECTING)) {
try {
// hook: on pre connect
config.onPreConnect().run();

// Change Connection State
connectionState = WebsocketConnectionState.CONNECTING;
setState(WebsocketConnectionState.CONNECTING);

// init websocket
webSocket = createWebsocket();
Expand Down Expand Up @@ -197,17 +206,18 @@ public void connect() {
*/
@Synchronized
public void disconnect() {
if (connectionState.equals(WebsocketConnectionState.CONNECTED)) {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState.equals(WebsocketConnectionState.CONNECTED) || connectionState.equals(WebsocketConnectionState.LOST)) {
// hook: disconnecting
config.onDisconnecting().run();

connectionState = WebsocketConnectionState.DISCONNECTING;
setState(WebsocketConnectionState.DISCONNECTING);
}

// hook: pre disconnect
config.onPreDisconnect().run();

connectionState = WebsocketConnectionState.DISCONNECTED;
setState(WebsocketConnectionState.DISCONNECTED);

// CleanUp
this.webSocket.disconnect();
Expand All @@ -223,7 +233,7 @@ public void disconnect() {
*/
@Synchronized
public void reconnect() {
connectionState = WebsocketConnectionState.RECONNECTING;
setState(WebsocketConnectionState.RECONNECTING);
disconnect();
connect();
}
Expand All @@ -236,6 +246,7 @@ public void reconnect() {
*/
public boolean sendText(String message) {
// only send if state is CONNECTING or CONNECTED
WebsocketConnectionState connectionState = this.connectionState.get();
if (!connectionState.equals(WebsocketConnectionState.CONNECTED) && !connectionState.equals(WebsocketConnectionState.CONNECTING)) {
return false;
}
Expand All @@ -245,6 +256,13 @@ public boolean sendText(String message) {
return true;
}

/**
* @return the socket's connection state
*/
public WebsocketConnectionState getConnectionState() {
return connectionState.get();
}

@Override
public void close() throws Exception {
disconnect();
Expand Down
@@ -1,5 +1,6 @@
package com.github.twitch4j.client.websocket;

import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
import com.github.twitch4j.util.IBackoffStrategy;
Expand All @@ -12,6 +13,7 @@
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

@Accessors(chain = true, fluent = true, prefix = "")
Expand All @@ -36,6 +38,7 @@ public void validate() {
}
Objects.requireNonNull(taskExecutor, "taskExecutor may not be null!");
Objects.requireNonNull(backoffStrategy, "backoffStrategy may not be null!");
Objects.requireNonNull(onStateChanged, "onStateChanged may not be null!");
Objects.requireNonNull(onPreConnect, "onPreConnect may not be null!");
Objects.requireNonNull(onPostConnect, "onPostConnect may not be null!");
Objects.requireNonNull(onConnected, "onConnected may not be null!");
Expand Down Expand Up @@ -77,6 +80,11 @@ public void validate() {
.maximumBackoff(Duration.ofMinutes(5).toMillis())
.build();

/**
* called when the websocket's state changes
*/
private BiConsumer<WebsocketConnectionState, WebsocketConnectionState> onStateChanged = (oldState, newState) -> {};

/**
* called before connecting
*/
Expand Down
Expand Up @@ -5,5 +5,6 @@ public enum WebsocketConnectionState {
RECONNECTING,
DISCONNECTED,
CONNECTING,
CONNECTED
CONNECTED,
LOST
}
@@ -0,0 +1,27 @@
package com.github.twitch4j.client.websocket.events;

import com.github.philippheuer.events4j.core.domain.Event;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* Websocket State Event
* <p>
* Called when the websocket changes its connection state.
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class WebsocketStateEvent extends Event {
iProdigy marked this conversation as resolved.
Show resolved Hide resolved

/**
* The state of the websocket
*/
private final WebsocketConnectionState state;

/**
* The parent of the websocket
*/
private final AutoCloseable parent;

}
Expand Up @@ -20,6 +20,7 @@ public void testFullConfiguration() throws IOException {
spec.onPreConnect(() -> System.out.println("on-pre-connect"));
spec.onPostConnect(() -> System.out.println("on-post-connect"));
spec.onConnected(() -> System.out.println("on-connected"));
spec.onStateChanged((oldState, newState) -> System.out.println("on-state-change"));
spec.onTextMessage((text) -> System.out.println("on-text-message"));
spec.onDisconnecting(() -> System.out.println("on-disconnecting"));
spec.onPreDisconnect(() -> System.out.println("on-pre-disconnect"));
Expand Down
Expand Up @@ -5,6 +5,7 @@
import com.github.philippheuer.events4j.core.EventManager;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.client.websocket.events.WebsocketStateEvent;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.enums.CommandPermission;
import com.github.twitch4j.common.events.domain.EventUser;
Expand Down Expand Up @@ -255,6 +256,7 @@ public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventM
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(WEB_SOCKET_SERVER);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new WebsocketStateEvent(newState, this)));
spec.onConnected(this::onConnected);
spec.onTextMessage(this::onTextMessage);
spec.taskExecutor(taskExecutor);
Expand Down