Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow custom websocket close delay #627

Merged
merged 7 commits into from Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 9 additions & 5 deletions auth/src/main/java/com/github/twitch4j/auth/TwitchAuth.java
Expand Up @@ -22,9 +22,9 @@ public class TwitchAuth {
* Twitch Identity Provider
*
* @param credentialManager Credential Manager
* @param clientId OAuth2 Client Id
* @param clientSecret OAuth2 Client Secret
* @param redirectUrl OAuth2 Redirect Url
* @param clientId OAuth2 Client Id
* @param clientSecret OAuth2 Client Secret
* @param redirectUrl OAuth2 Redirect Url
*/
public TwitchAuth(CredentialManager credentialManager, String clientId, String clientSecret, String redirectUrl) {
this.credentialManager = credentialManager;
Expand All @@ -37,9 +37,13 @@ public static void registerIdentityProvider(CredentialManager credentialManager,
if (!ip.isPresent()) {
// register
IdentityProvider identityProvider = new TwitchIdentityProvider(clientId, clientSecret, redirectUrl);
credentialManager.registerIdentityProvider(identityProvider);
try {
credentialManager.registerIdentityProvider(identityProvider);
} catch (Exception e) {
log.error("TwitchAuth: Encountered conflicting identity provider!", e);
}
} else {
log.warn("TwitchIdentityProvider was already registered, ignoring call to TwitchAuth.registerIdentityProvider!");
log.debug("TwitchIdentityProvider was already registered, ignoring call to TwitchAuth.registerIdentityProvider!");
}
}
}
8 changes: 6 additions & 2 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
Expand Up @@ -31,6 +31,7 @@
import io.github.xanthic.cache.api.domain.ExpiryType;
import io.github.xanthic.cache.core.CacheApi;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -270,8 +271,9 @@ public class TwitchChat implements ITwitchChat {
* @param connectionBackoffStrategy WebSocket Connection Backoff Strategy
* @param perChannelRateLimit Per channel message limit
* @param validateOnConnect Whether token should be validated on connect
* @param wsCloseDelay Websocket Close Delay
*/
public TwitchChat(WebsocketConnection websocketConnection, EventManager eventManager, CredentialManager credentialManager, OAuth2Credential chatCredential, String baseUrl, boolean sendCredentialToThirdPartyHost, Collection<String> commandPrefixes, Integer chatQueueSize, Bucket ircMessageBucket, Bucket ircWhisperBucket, Bucket ircJoinBucket, Bucket ircAuthBucket, ScheduledThreadPoolExecutor taskExecutor, long chatQueueTimeout, ProxyConfig proxyConfig, boolean autoJoinOwnChannel, boolean enableMembershipEvents, Collection<String> botOwnerIds, boolean removeChannelOnJoinFailure, int maxJoinRetries, long chatJoinTimeout, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, Bandwidth perChannelRateLimit, boolean validateOnConnect) {
public TwitchChat(WebsocketConnection websocketConnection, EventManager eventManager, CredentialManager credentialManager, OAuth2Credential chatCredential, String baseUrl, boolean sendCredentialToThirdPartyHost, Collection<String> commandPrefixes, Integer chatQueueSize, Bucket ircMessageBucket, Bucket ircWhisperBucket, Bucket ircJoinBucket, Bucket ircAuthBucket, ScheduledThreadPoolExecutor taskExecutor, long chatQueueTimeout, ProxyConfig proxyConfig, boolean autoJoinOwnChannel, boolean enableMembershipEvents, Collection<String> botOwnerIds, boolean removeChannelOnJoinFailure, int maxJoinRetries, long chatJoinTimeout, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, Bandwidth perChannelRateLimit, boolean validateOnConnect, int wsCloseDelay) {
this.eventManager = eventManager;
this.credentialManager = credentialManager;
this.chatCredential = chatCredential;
Expand Down Expand Up @@ -304,6 +306,7 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
if (websocketConnection == null) {
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(baseUrl);
spec.closeDelay(wsCloseDelay);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new ChatConnectionStateEvent(oldState, newState, this)));
spec.onConnected(this::onConnected);
Expand Down Expand Up @@ -815,11 +818,12 @@ private void onChannelMessage(ChannelMessageEvent event) {
/**
* Close
*/
@SneakyThrows
@Override
public void close() {
this.stopQueueThread = true;
queueThread.cancel(false);
this.disconnect();
connection.close();
}

@Override
Expand Down
Expand Up @@ -11,6 +11,7 @@
import com.github.twitch4j.chat.events.channel.ChannelJoinFailureEvent;
import com.github.twitch4j.chat.util.TwitchChatLimitHelper;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.WebsocketConnectionConfig;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.config.Twitch4JGlobal;
import com.github.twitch4j.common.enums.TwitchLimitType;
Expand Down Expand Up @@ -252,6 +253,14 @@ public class TwitchChatBuilder {
@With
private int wsPingPeriod = 15_000;

/**
* Websocket Close Delay in ms (0 = minimum)

* @see WebsocketConnectionConfig#closeDelay()
*/
@With
private int wsCloseDelay = 1_000;

/**
* WebSocket Connection Backoff Strategy
*/
Expand Down Expand Up @@ -327,7 +336,7 @@ public TwitchChat build() {
perChannelRateLimit = chatRateLimit;

log.debug("TwitchChat: Initializing Module ...");
return new TwitchChat(this.websocketConnection, this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod, this.connectionBackoffStrategy, this.perChannelRateLimit, this.verifyChatAccountOnReconnect);
return new TwitchChat(this.websocketConnection, this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod, this.connectionBackoffStrategy, this.perChannelRateLimit, this.verifyChatAccountOnReconnect, this.wsCloseDelay);
}

/**
Expand Down
Expand Up @@ -4,6 +4,9 @@
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
import com.neovisionaries.ws.client.WebSocket;
import com.neovisionaries.ws.client.WebSocketAdapter;
import com.neovisionaries.ws.client.WebSocketCloseCode;
import com.neovisionaries.ws.client.WebSocketError;
import com.neovisionaries.ws.client.WebSocketException;
import com.neovisionaries.ws.client.WebSocketFactory;
import com.neovisionaries.ws.client.WebSocketFrame;
import lombok.Getter;
Expand All @@ -13,8 +16,10 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -61,6 +66,10 @@ public class WebsocketConnection implements AutoCloseable {
@Getter
protected volatile long latency = -1L;

protected final AtomicBoolean closed = new AtomicBoolean(false);

protected final CountDownLatch closeLatch = new CountDownLatch(1);

/**
* TwitchWebsocketConnection
*
Expand Down Expand Up @@ -149,6 +158,7 @@ public void onPongFrame(WebSocket websocket, WebSocketFrame frame) {

protected WebSocket createWebsocket() throws IOException {
WebSocket ws = webSocketFactory.createSocket(config.baseUrl());
ws.setMissingCloseFrameAllowed(true);
ws.setPingInterval(config.wsPingPeriod());
if (config.headers() != null)
config.headers().forEach(ws::addHeader);
Expand All @@ -172,6 +182,9 @@ protected void setState(WebsocketConnectionState newState) {
public void connect() {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState == WebsocketConnectionState.DISCONNECTED || connectionState == WebsocketConnectionState.RECONNECTING || connectionState == WebsocketConnectionState.LOST) {
if (closed.get())
throw new IllegalStateException("WebsocketConnection was already closed!");
PhilippHeuer marked this conversation as resolved.
Show resolved Hide resolved

try {
// avoid any resource leaks
this.closeSocket();
Expand Down Expand Up @@ -281,15 +294,54 @@ public WebsocketConnectionState getConnectionState() {

@Override
public void close() throws Exception {
disconnect();
if (closed.getAndSet(true))
return; // resource close was already requested

try {
disconnect();
} catch (Exception e) {
log.warn("Exception thrown from websocket disconnect attempt", e);
this.closeSocket(); // really make sure the resource was released
} finally {
// await the close of the underlying socket
try {
boolean completed = closeLatch.await(Math.max(1000, config.closeDelay()) * 2L, TimeUnit.MILLISECONDS);
if (completed) {
log.trace("Underlying websocket complete close was successful");
} else {
log.warn("Underlying websocket did not close within the expected delay");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

@Synchronized
private void closeSocket() {
// Clean up the socket
if (webSocket != null) {
this.webSocket.disconnect();
this.webSocket.clearListeners();
final WebSocket socket = this.webSocket;
if (socket != null) {
socket.clearListeners();
if (closed.get()) {
socket.addListener(new WebSocketAdapter() {
@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
socket.clearListeners();
closeLatch.countDown();
}

@Override
public void onSendError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) {
if (cause != null && cause.getError() == WebSocketError.FLUSH_ERROR) {
socket.clearListeners();
closeLatch.countDown();
}
}
});
}
socket.disconnect(WebSocketCloseCode.NORMAL, null, config.closeDelay());

this.webSocket = null;
}

Expand Down
Expand Up @@ -42,6 +42,9 @@ public void validate() {
if (socketTimeout < 0) {
throw new RuntimeException("socketTimeout must be 0 or greater, set to 0 to disable!");
}
if (closeDelay < 0) {
throw new RuntimeException("closeDelay must be 0 or greater!");
}
Objects.requireNonNull(taskExecutor, "taskExecutor may not be null!");
Objects.requireNonNull(backoffStrategy, "backoffStrategy may not be null!");
Objects.requireNonNull(onStateChanged, "onStateChanged may not be null!");
Expand Down Expand Up @@ -75,6 +78,14 @@ public void validate() {
*/
private int socketTimeout = 30_000;

/**
* The maximum number of milliseconds to wait after sending a close frame
* to receive confirmation from the server, before fully closing the socket.
* <p>
* This can be set as low as 0 for applications that require prompt socket closes upon disconnect calls.
*/
private int closeDelay = 1_000;

/**
* WebSocket Headers
*/
Expand Down
Expand Up @@ -34,7 +34,8 @@ class MockChat : TwitchChat(
0,
null,
TwitchChatLimitHelper.MOD_MESSAGE_LIMIT,
false
false,
0
) {
@Volatile
var isConnected = false
Expand Down
Expand Up @@ -127,6 +127,7 @@
import com.github.twitch4j.pubsub.events.VideoPlaybackEvent;
import com.github.twitch4j.util.IBackoffStrategy;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -245,8 +246,9 @@ public class TwitchPubSub implements ITwitchPubSub {
* @param botOwnerIds Bot Owner IDs
* @param wsPingPeriod WebSocket Ping Period
* @param connectionBackoffStrategy WebSocket Connection Backoff Strategy
* @param wsCloseDelay Websocket Close Delay
*/
public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor taskExecutor, ProxyConfig proxyConfig, Collection<String> botOwnerIds, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy) {
public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor taskExecutor, ProxyConfig proxyConfig, Collection<String> botOwnerIds, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, int wsCloseDelay) {
this.eventManager = eventManager;
this.taskExecutor = taskExecutor;
this.botOwnerIds = botOwnerIds;
Expand All @@ -255,6 +257,7 @@ public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventM
if (websocketConnection == null) {
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(WEB_SOCKET_SERVER);
spec.closeDelay(wsCloseDelay);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new PubSubConnectionStateEvent(oldState, newState, this)));
spec.onPreConnect(this::onPreConnect);
Expand Down Expand Up @@ -855,13 +858,14 @@ public long getLatency() {
/**
* Close
*/
@SneakyThrows
@Override
public void close() {
if (!isClosed) {
isClosed = true;
heartbeatTask.cancel(false);
queueTask.cancel(false);
disconnect();
connection.close();
}
}

Expand Down
Expand Up @@ -4,6 +4,7 @@
import com.github.philippheuer.events4j.core.EventManager;
import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.WebsocketConnectionConfig;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.EventManagerUtils;
import com.github.twitch4j.common.util.ThreadUtils;
Expand Down Expand Up @@ -71,6 +72,14 @@ public class TwitchPubSubBuilder {
@With
private int wsPingPeriod = 15_000;

/**
* Websocket Close Delay in ms (0 = minimum)

* @see WebsocketConnectionConfig#closeDelay()
*/
@With
private int wsCloseDelay = 1_000;

/**
* WebSocket Connection Backoff Strategy
*/
Expand Down Expand Up @@ -99,7 +108,7 @@ public TwitchPubSub build() {
// Initialize/Check EventManager
eventManager = EventManagerUtils.validateOrInitializeEventManager(eventManager, defaultEventHandler);

return new TwitchPubSub(this.websocketConnection, this.eventManager, scheduledThreadPoolExecutor, this.proxyConfig, this.botOwnerIds, this.wsPingPeriod, this.connectionBackoffStrategy);
return new TwitchPubSub(this.websocketConnection, this.eventManager, scheduledThreadPoolExecutor, this.proxyConfig, this.botOwnerIds, this.wsPingPeriod, this.connectionBackoffStrategy, this.wsCloseDelay);
}

/**
Expand Down
Expand Up @@ -10,6 +10,7 @@
import com.github.twitch4j.chat.TwitchChat;
import com.github.twitch4j.chat.TwitchChatBuilder;
import com.github.twitch4j.chat.util.TwitchChatLimitHelper;
import com.github.twitch4j.client.websocket.WebsocketConnectionConfig;
import com.github.twitch4j.common.annotation.Unofficial;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.config.Twitch4JGlobal;
Expand Down Expand Up @@ -285,6 +286,15 @@ public class TwitchClientBuilder {
@With
private int wsPingPeriod = 15_000;

/**
* Websocket Close Delay in ms (0 = minimum)

* @see WebsocketConnectionConfig#closeDelay()
*/
@With
private int wsCloseDelay = 1_000;


/**
* With a Bot Owner's User ID
*
Expand Down Expand Up @@ -440,6 +450,7 @@ public TwitchClient build() {
.setBotOwnerIds(botOwnerIds)
.setCommandPrefixes(commandPrefixes)
.withWsPingPeriod(wsPingPeriod)
.withWsCloseDelay(wsCloseDelay)
.build();
}

Expand All @@ -452,6 +463,7 @@ public TwitchClient build() {
.withProxyConfig(proxyConfig)
.setBotOwnerIds(botOwnerIds)
.withWsPingPeriod(wsPingPeriod)
.withWsCloseDelay(wsCloseDelay)
.build();
}

Expand Down