From 32bbd33698fa65c903c3a2e5e6eee950188dd6df Mon Sep 17 00:00:00 2001 From: Sidd Date: Wed, 24 Aug 2022 23:33:39 -0700 Subject: [PATCH 1/7] feat: allow custom websocket close delay --- .../com/github/twitch4j/auth/TwitchAuth.java | 14 +++++++++----- .../com/github/twitch4j/chat/TwitchChat.java | 4 +++- .../twitch4j/chat/TwitchChatBuilder.java | 11 ++++++++++- .../client/websocket/WebsocketConnection.java | 4 +++- .../websocket/WebsocketConnectionConfig.java | 11 +++++++++++ .../github/twitch4j/kotlin/mock/MockChat.kt | 3 ++- .../github/twitch4j/pubsub/TwitchPubSub.java | 4 +++- .../twitch4j/pubsub/TwitchPubSubBuilder.java | 11 ++++++++++- .../github/twitch4j/TwitchClientBuilder.java | 12 ++++++++++++ .../twitch4j/TwitchClientPoolBuilder.java | 18 +++++++++++++++++- 10 files changed, 80 insertions(+), 12 deletions(-) diff --git a/auth/src/main/java/com/github/twitch4j/auth/TwitchAuth.java b/auth/src/main/java/com/github/twitch4j/auth/TwitchAuth.java index f13d69c5f..4ceb45583 100644 --- a/auth/src/main/java/com/github/twitch4j/auth/TwitchAuth.java +++ b/auth/src/main/java/com/github/twitch4j/auth/TwitchAuth.java @@ -22,9 +22,9 @@ public class TwitchAuth { * Twitch Identity Provider * * @param credentialManager Credential Manager - * @param clientId OAuth2 Client Id - * @param clientSecret OAuth2 Client Secret - * @param redirectUrl OAuth2 Redirect Url + * @param clientId OAuth2 Client Id + * @param clientSecret OAuth2 Client Secret + * @param redirectUrl OAuth2 Redirect Url */ public TwitchAuth(CredentialManager credentialManager, String clientId, String clientSecret, String redirectUrl) { this.credentialManager = credentialManager; @@ -37,9 +37,13 @@ public static void registerIdentityProvider(CredentialManager credentialManager, if (!ip.isPresent()) { // register IdentityProvider identityProvider = new TwitchIdentityProvider(clientId, clientSecret, redirectUrl); - credentialManager.registerIdentityProvider(identityProvider); + try { + credentialManager.registerIdentityProvider(identityProvider); + } catch (Exception e) { + log.error("TwitchAuth: Encountered conflicting identity provider!", e); + } } else { - log.warn("TwitchIdentityProvider was already registered, ignoring call to TwitchAuth.registerIdentityProvider!"); + log.debug("TwitchIdentityProvider was already registered, ignoring call to TwitchAuth.registerIdentityProvider!"); } } } diff --git a/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java b/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java index 1720f147f..b5cc67d74 100644 --- a/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java +++ b/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java @@ -270,8 +270,9 @@ public class TwitchChat implements ITwitchChat { * @param connectionBackoffStrategy WebSocket Connection Backoff Strategy * @param perChannelRateLimit Per channel message limit * @param validateOnConnect Whether token should be validated on connect + * @param wsCloseDelay Websocket Close Delay */ - public TwitchChat(WebsocketConnection websocketConnection, EventManager eventManager, CredentialManager credentialManager, OAuth2Credential chatCredential, String baseUrl, boolean sendCredentialToThirdPartyHost, Collection commandPrefixes, Integer chatQueueSize, Bucket ircMessageBucket, Bucket ircWhisperBucket, Bucket ircJoinBucket, Bucket ircAuthBucket, ScheduledThreadPoolExecutor taskExecutor, long chatQueueTimeout, ProxyConfig proxyConfig, boolean autoJoinOwnChannel, boolean enableMembershipEvents, Collection botOwnerIds, boolean removeChannelOnJoinFailure, int maxJoinRetries, long chatJoinTimeout, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, Bandwidth perChannelRateLimit, boolean validateOnConnect) { + public TwitchChat(WebsocketConnection websocketConnection, EventManager eventManager, CredentialManager credentialManager, OAuth2Credential chatCredential, String baseUrl, boolean sendCredentialToThirdPartyHost, Collection commandPrefixes, Integer chatQueueSize, Bucket ircMessageBucket, Bucket ircWhisperBucket, Bucket ircJoinBucket, Bucket ircAuthBucket, ScheduledThreadPoolExecutor taskExecutor, long chatQueueTimeout, ProxyConfig proxyConfig, boolean autoJoinOwnChannel, boolean enableMembershipEvents, Collection botOwnerIds, boolean removeChannelOnJoinFailure, int maxJoinRetries, long chatJoinTimeout, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, Bandwidth perChannelRateLimit, boolean validateOnConnect, int wsCloseDelay) { this.eventManager = eventManager; this.credentialManager = credentialManager; this.chatCredential = chatCredential; @@ -304,6 +305,7 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan if (websocketConnection == null) { this.connection = new WebsocketConnection(spec -> { spec.baseUrl(baseUrl); + spec.closeDelay(wsCloseDelay); spec.wsPingPeriod(wsPingPeriod); spec.onStateChanged((oldState, newState) -> eventManager.publish(new ChatConnectionStateEvent(oldState, newState, this))); spec.onConnected(this::onConnected); diff --git a/chat/src/main/java/com/github/twitch4j/chat/TwitchChatBuilder.java b/chat/src/main/java/com/github/twitch4j/chat/TwitchChatBuilder.java index 10d0f63e0..6550c218f 100644 --- a/chat/src/main/java/com/github/twitch4j/chat/TwitchChatBuilder.java +++ b/chat/src/main/java/com/github/twitch4j/chat/TwitchChatBuilder.java @@ -11,6 +11,7 @@ import com.github.twitch4j.chat.events.channel.ChannelJoinFailureEvent; import com.github.twitch4j.chat.util.TwitchChatLimitHelper; import com.github.twitch4j.client.websocket.WebsocketConnection; +import com.github.twitch4j.client.websocket.WebsocketConnectionConfig; import com.github.twitch4j.common.config.ProxyConfig; import com.github.twitch4j.common.config.Twitch4JGlobal; import com.github.twitch4j.common.enums.TwitchLimitType; @@ -252,6 +253,14 @@ public class TwitchChatBuilder { @With private int wsPingPeriod = 15_000; + /** + * Websocket Close Delay in ms (0 = minimum) + + * @see WebsocketConnectionConfig#closeDelay() + */ + @With + private int wsCloseDelay = 1_000; + /** * WebSocket Connection Backoff Strategy */ @@ -327,7 +336,7 @@ public TwitchChat build() { perChannelRateLimit = chatRateLimit; log.debug("TwitchChat: Initializing Module ..."); - return new TwitchChat(this.websocketConnection, this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod, this.connectionBackoffStrategy, this.perChannelRateLimit, this.verifyChatAccountOnReconnect); + return new TwitchChat(this.websocketConnection, this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod, this.connectionBackoffStrategy, this.perChannelRateLimit, this.verifyChatAccountOnReconnect, this.wsCloseDelay); } /** diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java index 167e2d2fc..da10f8816 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java @@ -4,6 +4,7 @@ import com.github.twitch4j.common.util.ExponentialBackoffStrategy; import com.neovisionaries.ws.client.WebSocket; import com.neovisionaries.ws.client.WebSocketAdapter; +import com.neovisionaries.ws.client.WebSocketCloseCode; import com.neovisionaries.ws.client.WebSocketFactory; import com.neovisionaries.ws.client.WebSocketFrame; import lombok.Getter; @@ -149,6 +150,7 @@ public void onPongFrame(WebSocket websocket, WebSocketFrame frame) { protected WebSocket createWebsocket() throws IOException { WebSocket ws = webSocketFactory.createSocket(config.baseUrl()); + ws.setMissingCloseFrameAllowed(true); ws.setPingInterval(config.wsPingPeriod()); if (config.headers() != null) config.headers().forEach(ws::addHeader); @@ -288,7 +290,7 @@ public void close() throws Exception { private void closeSocket() { // Clean up the socket if (webSocket != null) { - this.webSocket.disconnect(); + this.webSocket.disconnect(WebSocketCloseCode.NORMAL, null, config.closeDelay()); this.webSocket.clearListeners(); this.webSocket = null; } diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnectionConfig.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnectionConfig.java index 05fe76ac3..00e9c2a5a 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnectionConfig.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnectionConfig.java @@ -42,6 +42,9 @@ public void validate() { if (socketTimeout < 0) { throw new RuntimeException("socketTimeout must be 0 or greater, set to 0 to disable!"); } + if (closeDelay < 0) { + throw new RuntimeException("closeDelay must be 0 or greater!"); + } Objects.requireNonNull(taskExecutor, "taskExecutor may not be null!"); Objects.requireNonNull(backoffStrategy, "backoffStrategy may not be null!"); Objects.requireNonNull(onStateChanged, "onStateChanged may not be null!"); @@ -75,6 +78,14 @@ public void validate() { */ private int socketTimeout = 30_000; + /** + * The maximum number of milliseconds to wait after sending a close frame + * to receive confirmation from the server, before fully closing the socket. + *

+ * This can be set as low as 0 for applications that require prompt socket closes upon disconnect calls. + */ + private int closeDelay = 1_000; + /** * WebSocket Headers */ diff --git a/kotlin/src/test/kotlin/com/github/twitch4j/kotlin/mock/MockChat.kt b/kotlin/src/test/kotlin/com/github/twitch4j/kotlin/mock/MockChat.kt index c51fd91db..95d31cf6f 100644 --- a/kotlin/src/test/kotlin/com/github/twitch4j/kotlin/mock/MockChat.kt +++ b/kotlin/src/test/kotlin/com/github/twitch4j/kotlin/mock/MockChat.kt @@ -34,7 +34,8 @@ class MockChat : TwitchChat( 0, null, TwitchChatLimitHelper.MOD_MESSAGE_LIMIT, - false + false, + 0 ) { @Volatile var isConnected = false diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java index eac3f6405..ce87a88f5 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java @@ -245,8 +245,9 @@ public class TwitchPubSub implements ITwitchPubSub { * @param botOwnerIds Bot Owner IDs * @param wsPingPeriod WebSocket Ping Period * @param connectionBackoffStrategy WebSocket Connection Backoff Strategy + * @param wsCloseDelay Websocket Close Delay */ - public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor taskExecutor, ProxyConfig proxyConfig, Collection botOwnerIds, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy) { + public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor taskExecutor, ProxyConfig proxyConfig, Collection botOwnerIds, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, int wsCloseDelay) { this.eventManager = eventManager; this.taskExecutor = taskExecutor; this.botOwnerIds = botOwnerIds; @@ -255,6 +256,7 @@ public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventM if (websocketConnection == null) { this.connection = new WebsocketConnection(spec -> { spec.baseUrl(WEB_SOCKET_SERVER); + spec.closeDelay(wsCloseDelay); spec.wsPingPeriod(wsPingPeriod); spec.onStateChanged((oldState, newState) -> eventManager.publish(new PubSubConnectionStateEvent(oldState, newState, this))); spec.onPreConnect(this::onPreConnect); diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubBuilder.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubBuilder.java index 804fdf55b..4972854e4 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubBuilder.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubBuilder.java @@ -4,6 +4,7 @@ import com.github.philippheuer.events4j.core.EventManager; import com.github.philippheuer.events4j.simple.SimpleEventHandler; import com.github.twitch4j.client.websocket.WebsocketConnection; +import com.github.twitch4j.client.websocket.WebsocketConnectionConfig; import com.github.twitch4j.common.config.ProxyConfig; import com.github.twitch4j.common.util.EventManagerUtils; import com.github.twitch4j.common.util.ThreadUtils; @@ -71,6 +72,14 @@ public class TwitchPubSubBuilder { @With private int wsPingPeriod = 15_000; + /** + * Websocket Close Delay in ms (0 = minimum) + + * @see WebsocketConnectionConfig#closeDelay() + */ + @With + private int wsCloseDelay = 1_000; + /** * WebSocket Connection Backoff Strategy */ @@ -99,7 +108,7 @@ public TwitchPubSub build() { // Initialize/Check EventManager eventManager = EventManagerUtils.validateOrInitializeEventManager(eventManager, defaultEventHandler); - return new TwitchPubSub(this.websocketConnection, this.eventManager, scheduledThreadPoolExecutor, this.proxyConfig, this.botOwnerIds, this.wsPingPeriod, this.connectionBackoffStrategy); + return new TwitchPubSub(this.websocketConnection, this.eventManager, scheduledThreadPoolExecutor, this.proxyConfig, this.botOwnerIds, this.wsPingPeriod, this.connectionBackoffStrategy, this.wsCloseDelay); } /** diff --git a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientBuilder.java b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientBuilder.java index eb2694df0..37291b0ab 100644 --- a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientBuilder.java +++ b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientBuilder.java @@ -10,6 +10,7 @@ import com.github.twitch4j.chat.TwitchChat; import com.github.twitch4j.chat.TwitchChatBuilder; import com.github.twitch4j.chat.util.TwitchChatLimitHelper; +import com.github.twitch4j.client.websocket.WebsocketConnectionConfig; import com.github.twitch4j.common.annotation.Unofficial; import com.github.twitch4j.common.config.ProxyConfig; import com.github.twitch4j.common.config.Twitch4JGlobal; @@ -285,6 +286,15 @@ public class TwitchClientBuilder { @With private int wsPingPeriod = 15_000; + /** + * Websocket Close Delay in ms (0 = minimum) + + * @see WebsocketConnectionConfig#closeDelay() + */ + @With + private int wsCloseDelay = 1_000; + + /** * With a Bot Owner's User ID * @@ -440,6 +450,7 @@ public TwitchClient build() { .setBotOwnerIds(botOwnerIds) .setCommandPrefixes(commandPrefixes) .withWsPingPeriod(wsPingPeriod) + .withWsCloseDelay(wsCloseDelay) .build(); } @@ -452,6 +463,7 @@ public TwitchClient build() { .withProxyConfig(proxyConfig) .setBotOwnerIds(botOwnerIds) .withWsPingPeriod(wsPingPeriod) + .withWsCloseDelay(wsCloseDelay) .build(); } diff --git a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientPoolBuilder.java b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientPoolBuilder.java index bfff092e3..cf6cb74b8 100644 --- a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientPoolBuilder.java +++ b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientPoolBuilder.java @@ -12,6 +12,7 @@ import com.github.twitch4j.chat.TwitchChatBuilder; import com.github.twitch4j.chat.TwitchChatConnectionPool; import com.github.twitch4j.chat.util.TwitchChatLimitHelper; +import com.github.twitch4j.client.websocket.WebsocketConnectionConfig; import com.github.twitch4j.common.annotation.Unofficial; import com.github.twitch4j.common.config.ProxyConfig; import com.github.twitch4j.common.config.Twitch4JGlobal; @@ -302,6 +303,14 @@ public class TwitchClientPoolBuilder { @With private int wsPingPeriod = 15_000; + /** + * Websocket Close Delay in ms (0 = minimum) + + * @see WebsocketConnectionConfig#closeDelay() + */ + @With + private int wsCloseDelay = 1_000; + /** * With a Bot Owner's User ID * @@ -458,6 +467,7 @@ public TwitchClientPool build() { .withChatQueueTimeout(chatQueueTimeout) .withMaxJoinRetries(chatMaxJoinRetries) .withWsPingPeriod(wsPingPeriod) + .withWsCloseDelay(wsCloseDelay) .setCommandPrefixes(commandPrefixes) .setBotOwnerIds(botOwnerIds) ) @@ -481,6 +491,7 @@ public TwitchClientPool build() { .setBotOwnerIds(botOwnerIds) .setCommandPrefixes(commandPrefixes) .withWsPingPeriod(wsPingPeriod) + .withWsCloseDelay(wsCloseDelay) .build(); } @@ -491,7 +502,11 @@ public TwitchClientPool build() { .eventManager(eventManager) .executor(() -> scheduledThreadPoolExecutor) .proxyConfig(() -> proxyConfig) - .advancedConfiguration(builder -> builder.withWsPingPeriod(wsPingPeriod).setBotOwnerIds(botOwnerIds)) + .advancedConfiguration(builder -> + builder.withWsPingPeriod(wsPingPeriod) + .withWsCloseDelay(wsCloseDelay) + .setBotOwnerIds(botOwnerIds) + ) .build(); } else if (this.enablePubSub) { pubSub = TwitchPubSubBuilder.builder() @@ -499,6 +514,7 @@ public TwitchClientPool build() { .withScheduledThreadPoolExecutor(scheduledThreadPoolExecutor) .withProxyConfig(proxyConfig) .withWsPingPeriod(wsPingPeriod) + .withWsCloseDelay(wsCloseDelay) .setBotOwnerIds(botOwnerIds) .build(); } From d9e08a88c18c8be6b633d7cdd3760da3f08ae344 Mon Sep 17 00:00:00 2001 From: Sidd Date: Thu, 25 Aug 2022 01:22:34 -0700 Subject: [PATCH 2/7] feat: sync WebsocketConnection#close with underlying socket --- .../com/github/twitch4j/chat/TwitchChat.java | 4 +- .../client/websocket/WebsocketConnection.java | 58 +++++++++++++++++-- .../github/twitch4j/pubsub/TwitchPubSub.java | 4 +- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java b/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java index b5cc67d74..2fc238d2b 100644 --- a/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java +++ b/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java @@ -31,6 +31,7 @@ import io.github.xanthic.cache.api.domain.ExpiryType; import io.github.xanthic.cache.core.CacheApi; import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; @@ -817,11 +818,12 @@ private void onChannelMessage(ChannelMessageEvent event) { /** * Close */ + @SneakyThrows @Override public void close() { this.stopQueueThread = true; queueThread.cancel(false); - this.disconnect(); + connection.close(); } @Override diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java index da10f8816..26d3c1979 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java @@ -5,6 +5,8 @@ import com.neovisionaries.ws.client.WebSocket; import com.neovisionaries.ws.client.WebSocketAdapter; import com.neovisionaries.ws.client.WebSocketCloseCode; +import com.neovisionaries.ws.client.WebSocketError; +import com.neovisionaries.ws.client.WebSocketException; import com.neovisionaries.ws.client.WebSocketFactory; import com.neovisionaries.ws.client.WebSocketFrame; import lombok.Getter; @@ -14,8 +16,10 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -62,6 +66,10 @@ public class WebsocketConnection implements AutoCloseable { @Getter protected volatile long latency = -1L; + protected final AtomicBoolean closed = new AtomicBoolean(false); + + protected final CountDownLatch closeLatch = new CountDownLatch(1); + /** * TwitchWebsocketConnection * @@ -174,6 +182,9 @@ protected void setState(WebsocketConnectionState newState) { public void connect() { WebsocketConnectionState connectionState = this.connectionState.get(); if (connectionState == WebsocketConnectionState.DISCONNECTED || connectionState == WebsocketConnectionState.RECONNECTING || connectionState == WebsocketConnectionState.LOST) { + if (closed.get()) + throw new IllegalStateException("WebsocketConnection was already closed!"); + try { // avoid any resource leaks this.closeSocket(); @@ -283,15 +294,54 @@ public WebsocketConnectionState getConnectionState() { @Override public void close() throws Exception { - disconnect(); + if (closed.getAndSet(true)) + return; // resource close was already requested + + try { + disconnect(); + } catch (Exception e) { + log.warn("Exception thrown from websocket disconnect attempt", e); + this.closeSocket(); // really make sure the resource was released + } finally { + // await the close of the underlying socket + try { + boolean completed = closeLatch.await(Math.max(1000, config.closeDelay()) * 2L, TimeUnit.MILLISECONDS); + if (completed) { + log.trace("Underlying websocket complete close was successful"); + } else { + log.warn("Underlying websocket did not close within the expected delay"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } @Synchronized private void closeSocket() { // Clean up the socket - if (webSocket != null) { - this.webSocket.disconnect(WebSocketCloseCode.NORMAL, null, config.closeDelay()); - this.webSocket.clearListeners(); + final WebSocket socket = this.webSocket; + if (socket != null) { + socket.clearListeners(); + if (closed.get()) { + socket.addListener(new WebSocketAdapter() { + @Override + public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) { + socket.clearListeners(); + closeLatch.countDown(); + } + + @Override + public void onSendError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) { + if (cause != null && cause.getError() == WebSocketError.FLUSH_ERROR) { + socket.clearListeners(); + closeLatch.countDown(); + } + } + }); + } + socket.disconnect(WebSocketCloseCode.NORMAL, null, config.closeDelay()); + this.webSocket = null; } diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java index ce87a88f5..da8757eb1 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java @@ -127,6 +127,7 @@ import com.github.twitch4j.pubsub.events.VideoPlaybackEvent; import com.github.twitch4j.util.IBackoffStrategy; import lombok.Getter; +import lombok.SneakyThrows; import lombok.Synchronized; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -857,13 +858,14 @@ public long getLatency() { /** * Close */ + @SneakyThrows @Override public void close() { if (!isClosed) { isClosed = true; heartbeatTask.cancel(false); queueTask.cancel(false); - disconnect(); + connection.close(); } } From 6a7663626c73e990480457c83cd2e8e8058006ab Mon Sep 17 00:00:00 2001 From: Sidd Date: Thu, 25 Aug 2022 13:58:15 -0700 Subject: [PATCH 3/7] fix: cancel backoffClearer on WebsocketConnection#close --- .../github/twitch4j/client/websocket/WebsocketConnection.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java index 26d3c1979..1de7b4d20 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java @@ -297,6 +297,9 @@ public void close() throws Exception { if (closed.getAndSet(true)) return; // resource close was already requested + if (backoffClearer != null) + backoffClearer.cancel(false); + try { disconnect(); } catch (Exception e) { From 6b335808a9928094393576f81a0405121191505e Mon Sep 17 00:00:00 2001 From: Sidd Date: Thu, 25 Aug 2022 14:42:25 -0700 Subject: [PATCH 4/7] refactor: simplify await timeout --- .../github/twitch4j/client/websocket/WebsocketConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java index 1de7b4d20..ae4baca5c 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java @@ -308,7 +308,7 @@ public void close() throws Exception { } finally { // await the close of the underlying socket try { - boolean completed = closeLatch.await(Math.max(1000, config.closeDelay()) * 2L, TimeUnit.MILLISECONDS); + boolean completed = closeLatch.await(config.closeDelay() + 1000L, TimeUnit.MILLISECONDS); if (completed) { log.trace("Underlying websocket complete close was successful"); } else { From 801596ba918c58b421514aa4b1b76ab239077b76 Mon Sep 17 00:00:00 2001 From: Sidd Date: Thu, 25 Aug 2022 15:27:48 -0700 Subject: [PATCH 5/7] fix: cancel reconnect task on close --- .../client/websocket/WebsocketConnection.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java index ae4baca5c..ccf1e3ed5 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java @@ -48,6 +48,11 @@ public class WebsocketConnection implements AutoCloseable { */ private volatile Future backoffClearer; + /** + * Calls {@link #reconnect()} following a connection loss + */ + private final AtomicReference> reconnectTask = new AtomicReference<>(); + /** * WebSocket Factory */ @@ -126,11 +131,18 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, log.debug("Maximum retry count for websocket reconnection attempts was hit."); config.backoffStrategy().reset(); // start fresh on the next manual connect() call } else { - config.taskExecutor().schedule(() -> { - WebsocketConnectionState state = connectionState.get(); - if (state != WebsocketConnectionState.CONNECTING && state != WebsocketConnectionState.CONNECTED) - reconnect(); - }, reconnectDelay, TimeUnit.MILLISECONDS); + // Schedule the next reconnect according to the delay from the backoff strategy + Future previousReconnection = reconnectTask.getAndSet( + config.taskExecutor().schedule(() -> { + WebsocketConnectionState state = connectionState.get(); + if (state != WebsocketConnectionState.CONNECTING && state != WebsocketConnectionState.CONNECTED && !closed.get()) + reconnect(); + }, reconnectDelay, TimeUnit.MILLISECONDS) + ); + + // Cancel the previous reconnect task, if outstanding + if (previousReconnection != null) + previousReconnection.cancel(false); } } else { setState(WebsocketConnectionState.DISCONNECTED); @@ -300,6 +312,10 @@ public void close() throws Exception { if (backoffClearer != null) backoffClearer.cancel(false); + Future reconnector = reconnectTask.getAndSet(null); + if (reconnector != null) + reconnector.cancel(false); + try { disconnect(); } catch (Exception e) { From 2c9db1316af61fcc26ea2c5023b4e0d334b0bf5b Mon Sep 17 00:00:00 2001 From: Sidd Date: Thu, 25 Aug 2022 16:34:04 -0700 Subject: [PATCH 6/7] chore: explain underlying close logic --- .../client/websocket/WebsocketConnection.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java index ccf1e3ed5..843ed04f8 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java @@ -71,8 +71,14 @@ public class WebsocketConnection implements AutoCloseable { @Getter protected volatile long latency = -1L; + /** + * Whether {@link #close()} has been called + */ protected final AtomicBoolean closed = new AtomicBoolean(false); + /** + * Latch used to indicate that the underlying socket has fully disconnected following {@link #close()}. + */ protected final CountDownLatch closeLatch = new CountDownLatch(1); /** @@ -309,14 +315,19 @@ public void close() throws Exception { if (closed.getAndSet(true)) return; // resource close was already requested + // Cancel backoff clear task, if outstanding if (backoffClearer != null) backoffClearer.cancel(false); + // Cancel any outstanding reconnect task Future reconnector = reconnectTask.getAndSet(null); if (reconnector != null) reconnector.cancel(false); + // Disconnect from socket try { + // This call does not block, so we use CountdownLatch to block + // until the underlying socket fully closes. disconnect(); } catch (Exception e) { log.warn("Exception thrown from websocket disconnect attempt", e); @@ -341,17 +352,22 @@ private void closeSocket() { // Clean up the socket final WebSocket socket = this.webSocket; if (socket != null) { + // The disconnecting socket no needs to invoke this.webSocketAdapter socket.clearListeners(); + + // However, if a full close is requested, we should track when the underlying socket closes to release the latch. if (closed.get()) { socket.addListener(new WebSocketAdapter() { @Override public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) { + // The underlying java.net.Socket fully closed (e.g., after receiving close); release the latch. socket.clearListeners(); closeLatch.countDown(); } @Override public void onSendError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) { + // Flushing (e.g., of the close frame) failed because the socket was already closed; release latch. if (cause != null && cause.getError() == WebSocketError.FLUSH_ERROR) { socket.clearListeners(); closeLatch.countDown(); @@ -359,8 +375,19 @@ public void onSendError(WebSocket websocket, WebSocketException cause, WebSocket } }); } + + // Similarly, this disconnect call is non-blocking. + // Under the hood, this queues a close frame to be sent to the server, + // and the WritingThread is otherwise stopped. + // This also schedules a task to forcibly close the underlying socket, + // after the close delay passes. + // If the close delay is set very low, the queued close frame may never + // successfully flush, triggering onSendError. + // Lastly, the ReadingThread starts to block, awaiting a close frame from the server. + // Upon receiving a close frame, the socket also closes, indicated by onDisconnected. socket.disconnect(WebSocketCloseCode.NORMAL, null, config.closeDelay()); + // Release the reference to the closing websocket. this.webSocket = null; } From 6d0e3f7ac648b768f35b8c8e9f8fee3ee0657c9b Mon Sep 17 00:00:00 2001 From: iProdigy Date: Sat, 27 Aug 2022 02:07:07 -0700 Subject: [PATCH 7/7] chore: comment grammar --- .../github/twitch4j/client/websocket/WebsocketConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java index 843ed04f8..e08a7fdcc 100644 --- a/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java +++ b/client-websocket/src/main/java/com/github/twitch4j/client/websocket/WebsocketConnection.java @@ -352,7 +352,7 @@ private void closeSocket() { // Clean up the socket final WebSocket socket = this.webSocket; if (socket != null) { - // The disconnecting socket no needs to invoke this.webSocketAdapter + // The disconnecting socket no longer needs to invoke this.webSocketAdapter socket.clearListeners(); // However, if a full close is requested, we should track when the underlying socket closes to release the latch.