From 78a1cc0b89df77026abb9b7cf2c9ccf6e2cffee3 Mon Sep 17 00:00:00 2001 From: samfundev Date: Wed, 6 Jul 2022 21:11:45 -0400 Subject: [PATCH] feat: emit websocket events --- .../com/github/twitch4j/chat/TwitchChat.java | 2 ++ .../client/websocket/WebsocketConnection.java | 20 +++++++---- .../websocket/WebsocketConnectionConfig.java | 7 ++++ .../domain/WebsocketConnectionState.java | 3 +- .../websocket/events/WebsocketStateEvent.java | 34 +++++++++++++++++++ .../github/twitch4j/pubsub/TwitchPubSub.java | 2 ++ 6 files changed, 60 insertions(+), 8 deletions(-) create mode 100644 client-websocket/src/main/java/com/github/twitch4j/client/websocket/events/WebsocketStateEvent.java diff --git a/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java b/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java index c7ab4f0d1..7c30e15b0 100644 --- a/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java +++ b/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java @@ -22,6 +22,7 @@ import com.github.twitch4j.chat.events.channel.UserStateEvent; import com.github.twitch4j.client.websocket.WebsocketConnection; import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState; +import com.github.twitch4j.client.websocket.events.WebsocketStateEvent; import com.github.twitch4j.common.config.ProxyConfig; import com.github.twitch4j.common.util.BucketUtils; import com.github.twitch4j.common.util.CryptoUtils; @@ -288,6 +289,7 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan this.connection = new WebsocketConnection(spec -> { spec.baseUrl(baseUrl); spec.wsPingPeriod(wsPingPeriod); + spec.onStateChanged((state) -> eventManager.publish(new WebsocketStateEvent(state, this))); spec.onConnected(this::onConnected); spec.onTextMessage(this::onTextMessage); spec.onDisconnecting(this::onDisconnecting); diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java index 0d28a825c..2638fb477 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java @@ -87,7 +87,7 @@ public void onConnected(WebSocket ws, Map> headers) { config.onConnected().run(); // Connection Success - connectionState = WebsocketConnectionState.CONNECTED; + setState(WebsocketConnectionState.CONNECTED); backoffClearer = config.taskExecutor().schedule(() -> { if (connectionState == WebsocketConnectionState.CONNECTED) config.backoffStrategy().reset(); @@ -103,6 +103,7 @@ public void onTextMessage(WebSocket ws, String text) { @Override public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) { if (!connectionState.equals(WebsocketConnectionState.DISCONNECTING)) { + setState(WebsocketConnectionState.LOST); log.info("Connection to WebSocket [{}] lost! Retrying soon ...", config.baseUrl()); // connection lost - reconnecting @@ -115,7 +116,7 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, config.taskExecutor().schedule(() -> reconnect(), reconnectDelay, TimeUnit.MILLISECONDS); } } else { - connectionState = WebsocketConnectionState.DISCONNECTED; + setState(WebsocketConnectionState.DISCONNECTED); log.info("Disconnected from WebSocket [{}]!", config.baseUrl()); } } @@ -149,6 +150,11 @@ protected WebSocket createWebsocket() throws IOException { return ws; } + protected void setState(WebsocketConnectionState state) { + connectionState = state; + config.onStateChanged().accept(state); + } + /** * Connect to the WebSocket */ @@ -160,7 +166,7 @@ public void connect() { config.onPreConnect().run(); // Change Connection State - connectionState = WebsocketConnectionState.CONNECTING; + setState(WebsocketConnectionState.CONNECTING); // init websocket webSocket = createWebsocket(); @@ -197,17 +203,17 @@ public void connect() { */ @Synchronized public void disconnect() { - if (connectionState.equals(WebsocketConnectionState.CONNECTED)) { + if (connectionState.equals(WebsocketConnectionState.CONNECTED) || connectionState.equals(WebsocketConnectionState.LOST)) { // hook: disconnecting config.onDisconnecting().run(); - connectionState = WebsocketConnectionState.DISCONNECTING; + setState(WebsocketConnectionState.DISCONNECTING); } // hook: pre disconnect config.onPreDisconnect().run(); - connectionState = WebsocketConnectionState.DISCONNECTED; + setState(WebsocketConnectionState.DISCONNECTED); // CleanUp this.webSocket.disconnect(); @@ -223,7 +229,7 @@ public void disconnect() { */ @Synchronized public void reconnect() { - connectionState = WebsocketConnectionState.RECONNECTING; + setState(WebsocketConnectionState.RECONNECTING); disconnect(); connect(); } diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnectionConfig.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnectionConfig.java index 2abb0a84a..d1fbf2cd2 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnectionConfig.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnectionConfig.java @@ -1,5 +1,6 @@ package com.github.twitch4j.client.websocket; +import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState; import com.github.twitch4j.common.config.ProxyConfig; import com.github.twitch4j.common.util.ExponentialBackoffStrategy; import com.github.twitch4j.util.IBackoffStrategy; @@ -36,6 +37,7 @@ public void validate() { } Objects.requireNonNull(taskExecutor, "taskExecutor may not be null!"); Objects.requireNonNull(backoffStrategy, "backoffStrategy may not be null!"); + Objects.requireNonNull(onStateChanged, "onStateChanged may not be null!"); Objects.requireNonNull(onPreConnect, "onPreConnect may not be null!"); Objects.requireNonNull(onPostConnect, "onPostConnect may not be null!"); Objects.requireNonNull(onConnected, "onConnected may not be null!"); @@ -77,6 +79,11 @@ public void validate() { .maximumBackoff(Duration.ofMinutes(5).toMillis()) .build(); + /** + * called when the websocket's state changes + */ + private Consumer onStateChanged = (state) -> {}; + /** * called before connecting */ diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/domain/WebsocketConnectionState.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/domain/WebsocketConnectionState.java index 50d5ceb0d..ba6c5a9e0 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/domain/WebsocketConnectionState.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/domain/WebsocketConnectionState.java @@ -5,5 +5,6 @@ public enum WebsocketConnectionState { RECONNECTING, DISCONNECTED, CONNECTING, - CONNECTED + CONNECTED, + LOST } diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/events/WebsocketStateEvent.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/events/WebsocketStateEvent.java new file mode 100644 index 000000000..6e50cc763 --- /dev/null +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/events/WebsocketStateEvent.java @@ -0,0 +1,34 @@ +package com.github.twitch4j.client.websocket.events; + +import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState; +import lombok.Getter; + +/** + * Websocket State Event + * + * Called when the websocket changes it's connection state. + */ +@Getter +public class WebsocketStateEvent { + + /** + * Websocket State + */ + private final WebsocketConnectionState state; + + /** + * The parent of the websocket + */ + private final AutoCloseable parent; + + /** + * Constructor + * + * @param state The state of the websocket + * @param parent The parent of the websocket + */ + public WebsocketStateEvent(WebsocketConnectionState state, AutoCloseable parent) { + this.state = state; + this.parent = parent; + } +} diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java index 4b984c24a..08c38062c 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java @@ -5,6 +5,7 @@ import com.github.philippheuer.events4j.core.EventManager; import com.github.twitch4j.client.websocket.WebsocketConnection; import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState; +import com.github.twitch4j.client.websocket.events.WebsocketStateEvent; import com.github.twitch4j.common.config.ProxyConfig; import com.github.twitch4j.common.enums.CommandPermission; import com.github.twitch4j.common.events.domain.EventUser; @@ -255,6 +256,7 @@ public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventM this.connection = new WebsocketConnection(spec -> { spec.baseUrl(WEB_SOCKET_SERVER); spec.wsPingPeriod(wsPingPeriod); + spec.onStateChanged((state) -> eventManager.publish(new WebsocketStateEvent(state, this))); spec.onConnected(this::onConnected); spec.onTextMessage(this::onTextMessage); spec.taskExecutor(taskExecutor);