Skip to content

Commit

Permalink
feat: allow custom websocket close delay (#627)
Browse files Browse the repository at this point in the history
* feat: allow custom websocket close delay

* feat: sync WebsocketConnection#close with underlying socket
  • Loading branch information
iProdigy committed Aug 27, 2022
1 parent 10ce56e commit cb5ebdb
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 22 deletions.
14 changes: 9 additions & 5 deletions auth/src/main/java/com/github/twitch4j/auth/TwitchAuth.java
Expand Up @@ -22,9 +22,9 @@ public class TwitchAuth {
* Twitch Identity Provider
*
* @param credentialManager Credential Manager
* @param clientId OAuth2 Client Id
* @param clientSecret OAuth2 Client Secret
* @param redirectUrl OAuth2 Redirect Url
* @param clientId OAuth2 Client Id
* @param clientSecret OAuth2 Client Secret
* @param redirectUrl OAuth2 Redirect Url
*/
public TwitchAuth(CredentialManager credentialManager, String clientId, String clientSecret, String redirectUrl) {
this.credentialManager = credentialManager;
Expand All @@ -37,9 +37,13 @@ public static void registerIdentityProvider(CredentialManager credentialManager,
if (!ip.isPresent()) {
// register
IdentityProvider identityProvider = new TwitchIdentityProvider(clientId, clientSecret, redirectUrl);
credentialManager.registerIdentityProvider(identityProvider);
try {
credentialManager.registerIdentityProvider(identityProvider);
} catch (Exception e) {
log.error("TwitchAuth: Encountered conflicting identity provider!", e);
}
} else {
log.warn("TwitchIdentityProvider was already registered, ignoring call to TwitchAuth.registerIdentityProvider!");
log.debug("TwitchIdentityProvider was already registered, ignoring call to TwitchAuth.registerIdentityProvider!");
}
}
}
8 changes: 6 additions & 2 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
Expand Up @@ -31,6 +31,7 @@
import io.github.xanthic.cache.api.domain.ExpiryType;
import io.github.xanthic.cache.core.CacheApi;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -270,8 +271,9 @@ public class TwitchChat implements ITwitchChat {
* @param connectionBackoffStrategy WebSocket Connection Backoff Strategy
* @param perChannelRateLimit Per channel message limit
* @param validateOnConnect Whether token should be validated on connect
* @param wsCloseDelay Websocket Close Delay
*/
public TwitchChat(WebsocketConnection websocketConnection, EventManager eventManager, CredentialManager credentialManager, OAuth2Credential chatCredential, String baseUrl, boolean sendCredentialToThirdPartyHost, Collection<String> commandPrefixes, Integer chatQueueSize, Bucket ircMessageBucket, Bucket ircWhisperBucket, Bucket ircJoinBucket, Bucket ircAuthBucket, ScheduledThreadPoolExecutor taskExecutor, long chatQueueTimeout, ProxyConfig proxyConfig, boolean autoJoinOwnChannel, boolean enableMembershipEvents, Collection<String> botOwnerIds, boolean removeChannelOnJoinFailure, int maxJoinRetries, long chatJoinTimeout, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, Bandwidth perChannelRateLimit, boolean validateOnConnect) {
public TwitchChat(WebsocketConnection websocketConnection, EventManager eventManager, CredentialManager credentialManager, OAuth2Credential chatCredential, String baseUrl, boolean sendCredentialToThirdPartyHost, Collection<String> commandPrefixes, Integer chatQueueSize, Bucket ircMessageBucket, Bucket ircWhisperBucket, Bucket ircJoinBucket, Bucket ircAuthBucket, ScheduledThreadPoolExecutor taskExecutor, long chatQueueTimeout, ProxyConfig proxyConfig, boolean autoJoinOwnChannel, boolean enableMembershipEvents, Collection<String> botOwnerIds, boolean removeChannelOnJoinFailure, int maxJoinRetries, long chatJoinTimeout, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, Bandwidth perChannelRateLimit, boolean validateOnConnect, int wsCloseDelay) {
this.eventManager = eventManager;
this.credentialManager = credentialManager;
this.chatCredential = chatCredential;
Expand Down Expand Up @@ -304,6 +306,7 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
if (websocketConnection == null) {
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(baseUrl);
spec.closeDelay(wsCloseDelay);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new ChatConnectionStateEvent(oldState, newState, this)));
spec.onConnected(this::onConnected);
Expand Down Expand Up @@ -815,11 +818,12 @@ private void onChannelMessage(ChannelMessageEvent event) {
/**
* Close
*/
@SneakyThrows
@Override
public void close() {
this.stopQueueThread = true;
queueThread.cancel(false);
this.disconnect();
connection.close();
}

@Override
Expand Down
Expand Up @@ -11,6 +11,7 @@
import com.github.twitch4j.chat.events.channel.ChannelJoinFailureEvent;
import com.github.twitch4j.chat.util.TwitchChatLimitHelper;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.WebsocketConnectionConfig;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.config.Twitch4JGlobal;
import com.github.twitch4j.common.enums.TwitchLimitType;
Expand Down Expand Up @@ -252,6 +253,14 @@ public class TwitchChatBuilder {
@With
private int wsPingPeriod = 15_000;

/**
* Websocket Close Delay in ms (0 = minimum)
* @see WebsocketConnectionConfig#closeDelay()
*/
@With
private int wsCloseDelay = 1_000;

/**
* WebSocket Connection Backoff Strategy
*/
Expand Down Expand Up @@ -327,7 +336,7 @@ public TwitchChat build() {
perChannelRateLimit = chatRateLimit;

log.debug("TwitchChat: Initializing Module ...");
return new TwitchChat(this.websocketConnection, this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod, this.connectionBackoffStrategy, this.perChannelRateLimit, this.verifyChatAccountOnReconnect);
return new TwitchChat(this.websocketConnection, this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod, this.connectionBackoffStrategy, this.perChannelRateLimit, this.verifyChatAccountOnReconnect, this.wsCloseDelay);
}

/**
Expand Down
Expand Up @@ -4,6 +4,9 @@
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
import com.neovisionaries.ws.client.WebSocket;
import com.neovisionaries.ws.client.WebSocketAdapter;
import com.neovisionaries.ws.client.WebSocketCloseCode;
import com.neovisionaries.ws.client.WebSocketError;
import com.neovisionaries.ws.client.WebSocketException;
import com.neovisionaries.ws.client.WebSocketFactory;
import com.neovisionaries.ws.client.WebSocketFrame;
import lombok.Getter;
Expand All @@ -13,8 +16,10 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -43,6 +48,11 @@ public class WebsocketConnection implements AutoCloseable {
*/
private volatile Future<?> backoffClearer;

/**
* Calls {@link #reconnect()} following a connection loss
*/
private final AtomicReference<Future<?>> reconnectTask = new AtomicReference<>();

/**
* WebSocket Factory
*/
Expand All @@ -61,6 +71,16 @@ public class WebsocketConnection implements AutoCloseable {
@Getter
protected volatile long latency = -1L;

/**
* Whether {@link #close()} has been called
*/
protected final AtomicBoolean closed = new AtomicBoolean(false);

/**
* Latch used to indicate that the underlying socket has fully disconnected following {@link #close()}.
*/
protected final CountDownLatch closeLatch = new CountDownLatch(1);

/**
* TwitchWebsocketConnection
*
Expand Down Expand Up @@ -117,11 +137,18 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
log.debug("Maximum retry count for websocket reconnection attempts was hit.");
config.backoffStrategy().reset(); // start fresh on the next manual connect() call
} else {
config.taskExecutor().schedule(() -> {
WebsocketConnectionState state = connectionState.get();
if (state != WebsocketConnectionState.CONNECTING && state != WebsocketConnectionState.CONNECTED)
reconnect();
}, reconnectDelay, TimeUnit.MILLISECONDS);
// Schedule the next reconnect according to the delay from the backoff strategy
Future<?> previousReconnection = reconnectTask.getAndSet(
config.taskExecutor().schedule(() -> {
WebsocketConnectionState state = connectionState.get();
if (state != WebsocketConnectionState.CONNECTING && state != WebsocketConnectionState.CONNECTED && !closed.get())
reconnect();
}, reconnectDelay, TimeUnit.MILLISECONDS)
);

// Cancel the previous reconnect task, if outstanding
if (previousReconnection != null)
previousReconnection.cancel(false);
}
} else {
setState(WebsocketConnectionState.DISCONNECTED);
Expand Down Expand Up @@ -149,6 +176,7 @@ public void onPongFrame(WebSocket websocket, WebSocketFrame frame) {

protected WebSocket createWebsocket() throws IOException {
WebSocket ws = webSocketFactory.createSocket(config.baseUrl());
ws.setMissingCloseFrameAllowed(true);
ws.setPingInterval(config.wsPingPeriod());
if (config.headers() != null)
config.headers().forEach(ws::addHeader);
Expand All @@ -172,6 +200,9 @@ protected void setState(WebsocketConnectionState newState) {
public void connect() {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState == WebsocketConnectionState.DISCONNECTED || connectionState == WebsocketConnectionState.RECONNECTING || connectionState == WebsocketConnectionState.LOST) {
if (closed.get())
throw new IllegalStateException("WebsocketConnection was already closed!");

try {
// avoid any resource leaks
this.closeSocket();
Expand Down Expand Up @@ -281,15 +312,82 @@ public WebsocketConnectionState getConnectionState() {

@Override
public void close() throws Exception {
disconnect();
if (closed.getAndSet(true))
return; // resource close was already requested

// Cancel backoff clear task, if outstanding
if (backoffClearer != null)
backoffClearer.cancel(false);

// Cancel any outstanding reconnect task
Future<?> reconnector = reconnectTask.getAndSet(null);
if (reconnector != null)
reconnector.cancel(false);

// Disconnect from socket
try {
// This call does not block, so we use CountdownLatch to block
// until the underlying socket fully closes.
disconnect();
} catch (Exception e) {
log.warn("Exception thrown from websocket disconnect attempt", e);
this.closeSocket(); // really make sure the resource was released
} finally {
// await the close of the underlying socket
try {
boolean completed = closeLatch.await(config.closeDelay() + 1000L, TimeUnit.MILLISECONDS);
if (completed) {
log.trace("Underlying websocket complete close was successful");
} else {
log.warn("Underlying websocket did not close within the expected delay");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

@Synchronized
private void closeSocket() {
// Clean up the socket
if (webSocket != null) {
this.webSocket.disconnect();
this.webSocket.clearListeners();
final WebSocket socket = this.webSocket;
if (socket != null) {
// The disconnecting socket no longer needs to invoke this.webSocketAdapter
socket.clearListeners();

// However, if a full close is requested, we should track when the underlying socket closes to release the latch.
if (closed.get()) {
socket.addListener(new WebSocketAdapter() {
@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
// The underlying java.net.Socket fully closed (e.g., after receiving close); release the latch.
socket.clearListeners();
closeLatch.countDown();
}

@Override
public void onSendError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) {
// Flushing (e.g., of the close frame) failed because the socket was already closed; release latch.
if (cause != null && cause.getError() == WebSocketError.FLUSH_ERROR) {
socket.clearListeners();
closeLatch.countDown();
}
}
});
}

// Similarly, this disconnect call is non-blocking.
// Under the hood, this queues a close frame to be sent to the server,
// and the WritingThread is otherwise stopped.
// This also schedules a task to forcibly close the underlying socket,
// after the close delay passes.
// If the close delay is set very low, the queued close frame may never
// successfully flush, triggering onSendError.
// Lastly, the ReadingThread starts to block, awaiting a close frame from the server.
// Upon receiving a close frame, the socket also closes, indicated by onDisconnected.
socket.disconnect(WebSocketCloseCode.NORMAL, null, config.closeDelay());

// Release the reference to the closing websocket.
this.webSocket = null;
}

Expand Down
Expand Up @@ -42,6 +42,9 @@ public void validate() {
if (socketTimeout < 0) {
throw new RuntimeException("socketTimeout must be 0 or greater, set to 0 to disable!");
}
if (closeDelay < 0) {
throw new RuntimeException("closeDelay must be 0 or greater!");
}
Objects.requireNonNull(taskExecutor, "taskExecutor may not be null!");
Objects.requireNonNull(backoffStrategy, "backoffStrategy may not be null!");
Objects.requireNonNull(onStateChanged, "onStateChanged may not be null!");
Expand Down Expand Up @@ -75,6 +78,14 @@ public void validate() {
*/
private int socketTimeout = 30_000;

/**
* The maximum number of milliseconds to wait after sending a close frame
* to receive confirmation from the server, before fully closing the socket.
* <p>
* This can be set as low as 0 for applications that require prompt socket closes upon disconnect calls.
*/
private int closeDelay = 1_000;

/**
* WebSocket Headers
*/
Expand Down
Expand Up @@ -34,7 +34,8 @@ class MockChat : TwitchChat(
0,
null,
TwitchChatLimitHelper.MOD_MESSAGE_LIMIT,
false
false,
0
) {
@Volatile
var isConnected = false
Expand Down
Expand Up @@ -127,6 +127,7 @@
import com.github.twitch4j.pubsub.events.VideoPlaybackEvent;
import com.github.twitch4j.util.IBackoffStrategy;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -245,8 +246,9 @@ public class TwitchPubSub implements ITwitchPubSub {
* @param botOwnerIds Bot Owner IDs
* @param wsPingPeriod WebSocket Ping Period
* @param connectionBackoffStrategy WebSocket Connection Backoff Strategy
* @param wsCloseDelay Websocket Close Delay
*/
public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor taskExecutor, ProxyConfig proxyConfig, Collection<String> botOwnerIds, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy) {
public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor taskExecutor, ProxyConfig proxyConfig, Collection<String> botOwnerIds, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, int wsCloseDelay) {
this.eventManager = eventManager;
this.taskExecutor = taskExecutor;
this.botOwnerIds = botOwnerIds;
Expand All @@ -255,6 +257,7 @@ public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventM
if (websocketConnection == null) {
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(WEB_SOCKET_SERVER);
spec.closeDelay(wsCloseDelay);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new PubSubConnectionStateEvent(oldState, newState, this)));
spec.onPreConnect(this::onPreConnect);
Expand Down Expand Up @@ -855,13 +858,14 @@ public long getLatency() {
/**
* Close
*/
@SneakyThrows
@Override
public void close() {
if (!isClosed) {
isClosed = true;
heartbeatTask.cancel(false);
queueTask.cancel(false);
disconnect();
connection.close();
}
}

Expand Down
Expand Up @@ -4,6 +4,7 @@
import com.github.philippheuer.events4j.core.EventManager;
import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.WebsocketConnectionConfig;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.EventManagerUtils;
import com.github.twitch4j.common.util.ThreadUtils;
Expand Down Expand Up @@ -71,6 +72,14 @@ public class TwitchPubSubBuilder {
@With
private int wsPingPeriod = 15_000;

/**
* Websocket Close Delay in ms (0 = minimum)
* @see WebsocketConnectionConfig#closeDelay()
*/
@With
private int wsCloseDelay = 1_000;

/**
* WebSocket Connection Backoff Strategy
*/
Expand Down Expand Up @@ -99,7 +108,7 @@ public TwitchPubSub build() {
// Initialize/Check EventManager
eventManager = EventManagerUtils.validateOrInitializeEventManager(eventManager, defaultEventHandler);

return new TwitchPubSub(this.websocketConnection, this.eventManager, scheduledThreadPoolExecutor, this.proxyConfig, this.botOwnerIds, this.wsPingPeriod, this.connectionBackoffStrategy);
return new TwitchPubSub(this.websocketConnection, this.eventManager, scheduledThreadPoolExecutor, this.proxyConfig, this.botOwnerIds, this.wsPingPeriod, this.connectionBackoffStrategy, this.wsCloseDelay);
}

/**
Expand Down

0 comments on commit cb5ebdb

Please sign in to comment.