Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow custom websocket close delay #627

Merged
merged 7 commits into from Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 9 additions & 5 deletions auth/src/main/java/com/github/twitch4j/auth/TwitchAuth.java
Expand Up @@ -22,9 +22,9 @@ public class TwitchAuth {
* Twitch Identity Provider
*
* @param credentialManager Credential Manager
* @param clientId OAuth2 Client Id
* @param clientSecret OAuth2 Client Secret
* @param redirectUrl OAuth2 Redirect Url
* @param clientId OAuth2 Client Id
* @param clientSecret OAuth2 Client Secret
* @param redirectUrl OAuth2 Redirect Url
*/
public TwitchAuth(CredentialManager credentialManager, String clientId, String clientSecret, String redirectUrl) {
this.credentialManager = credentialManager;
Expand All @@ -37,9 +37,13 @@ public static void registerIdentityProvider(CredentialManager credentialManager,
if (!ip.isPresent()) {
// register
IdentityProvider identityProvider = new TwitchIdentityProvider(clientId, clientSecret, redirectUrl);
credentialManager.registerIdentityProvider(identityProvider);
try {
credentialManager.registerIdentityProvider(identityProvider);
} catch (Exception e) {
log.error("TwitchAuth: Encountered conflicting identity provider!", e);
}
} else {
log.warn("TwitchIdentityProvider was already registered, ignoring call to TwitchAuth.registerIdentityProvider!");
log.debug("TwitchIdentityProvider was already registered, ignoring call to TwitchAuth.registerIdentityProvider!");
}
}
}
8 changes: 6 additions & 2 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
Expand Up @@ -31,6 +31,7 @@
import io.github.xanthic.cache.api.domain.ExpiryType;
import io.github.xanthic.cache.core.CacheApi;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -270,8 +271,9 @@ public class TwitchChat implements ITwitchChat {
* @param connectionBackoffStrategy WebSocket Connection Backoff Strategy
* @param perChannelRateLimit Per channel message limit
* @param validateOnConnect Whether token should be validated on connect
* @param wsCloseDelay Websocket Close Delay
*/
public TwitchChat(WebsocketConnection websocketConnection, EventManager eventManager, CredentialManager credentialManager, OAuth2Credential chatCredential, String baseUrl, boolean sendCredentialToThirdPartyHost, Collection<String> commandPrefixes, Integer chatQueueSize, Bucket ircMessageBucket, Bucket ircWhisperBucket, Bucket ircJoinBucket, Bucket ircAuthBucket, ScheduledThreadPoolExecutor taskExecutor, long chatQueueTimeout, ProxyConfig proxyConfig, boolean autoJoinOwnChannel, boolean enableMembershipEvents, Collection<String> botOwnerIds, boolean removeChannelOnJoinFailure, int maxJoinRetries, long chatJoinTimeout, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, Bandwidth perChannelRateLimit, boolean validateOnConnect) {
public TwitchChat(WebsocketConnection websocketConnection, EventManager eventManager, CredentialManager credentialManager, OAuth2Credential chatCredential, String baseUrl, boolean sendCredentialToThirdPartyHost, Collection<String> commandPrefixes, Integer chatQueueSize, Bucket ircMessageBucket, Bucket ircWhisperBucket, Bucket ircJoinBucket, Bucket ircAuthBucket, ScheduledThreadPoolExecutor taskExecutor, long chatQueueTimeout, ProxyConfig proxyConfig, boolean autoJoinOwnChannel, boolean enableMembershipEvents, Collection<String> botOwnerIds, boolean removeChannelOnJoinFailure, int maxJoinRetries, long chatJoinTimeout, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, Bandwidth perChannelRateLimit, boolean validateOnConnect, int wsCloseDelay) {
this.eventManager = eventManager;
this.credentialManager = credentialManager;
this.chatCredential = chatCredential;
Expand Down Expand Up @@ -304,6 +306,7 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
if (websocketConnection == null) {
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(baseUrl);
spec.closeDelay(wsCloseDelay);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new ChatConnectionStateEvent(oldState, newState, this)));
spec.onConnected(this::onConnected);
Expand Down Expand Up @@ -815,11 +818,12 @@ private void onChannelMessage(ChannelMessageEvent event) {
/**
* Close
*/
@SneakyThrows
@Override
public void close() {
this.stopQueueThread = true;
queueThread.cancel(false);
this.disconnect();
connection.close();
}

@Override
Expand Down
Expand Up @@ -11,6 +11,7 @@
import com.github.twitch4j.chat.events.channel.ChannelJoinFailureEvent;
import com.github.twitch4j.chat.util.TwitchChatLimitHelper;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.WebsocketConnectionConfig;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.config.Twitch4JGlobal;
import com.github.twitch4j.common.enums.TwitchLimitType;
Expand Down Expand Up @@ -252,6 +253,14 @@ public class TwitchChatBuilder {
@With
private int wsPingPeriod = 15_000;

/**
* Websocket Close Delay in ms (0 = minimum)

* @see WebsocketConnectionConfig#closeDelay()
*/
@With
private int wsCloseDelay = 1_000;

/**
* WebSocket Connection Backoff Strategy
*/
Expand Down Expand Up @@ -327,7 +336,7 @@ public TwitchChat build() {
perChannelRateLimit = chatRateLimit;

log.debug("TwitchChat: Initializing Module ...");
return new TwitchChat(this.websocketConnection, this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod, this.connectionBackoffStrategy, this.perChannelRateLimit, this.verifyChatAccountOnReconnect);
return new TwitchChat(this.websocketConnection, this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod, this.connectionBackoffStrategy, this.perChannelRateLimit, this.verifyChatAccountOnReconnect, this.wsCloseDelay);
}

/**
Expand Down
Expand Up @@ -4,6 +4,9 @@
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
import com.neovisionaries.ws.client.WebSocket;
import com.neovisionaries.ws.client.WebSocketAdapter;
import com.neovisionaries.ws.client.WebSocketCloseCode;
import com.neovisionaries.ws.client.WebSocketError;
import com.neovisionaries.ws.client.WebSocketException;
import com.neovisionaries.ws.client.WebSocketFactory;
import com.neovisionaries.ws.client.WebSocketFrame;
import lombok.Getter;
Expand All @@ -13,8 +16,10 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -43,6 +48,11 @@ public class WebsocketConnection implements AutoCloseable {
*/
private volatile Future<?> backoffClearer;

/**
* Calls {@link #reconnect()} following a connection loss
*/
private final AtomicReference<Future<?>> reconnectTask = new AtomicReference<>();

/**
* WebSocket Factory
*/
Expand All @@ -61,6 +71,16 @@ public class WebsocketConnection implements AutoCloseable {
@Getter
protected volatile long latency = -1L;

/**
* Whether {@link #close()} has been called
*/
protected final AtomicBoolean closed = new AtomicBoolean(false);

/**
* Latch used to indicate that the underlying socket has fully disconnected following {@link #close()}.
*/
protected final CountDownLatch closeLatch = new CountDownLatch(1);

/**
* TwitchWebsocketConnection
*
Expand Down Expand Up @@ -117,11 +137,18 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
log.debug("Maximum retry count for websocket reconnection attempts was hit.");
config.backoffStrategy().reset(); // start fresh on the next manual connect() call
} else {
config.taskExecutor().schedule(() -> {
WebsocketConnectionState state = connectionState.get();
if (state != WebsocketConnectionState.CONNECTING && state != WebsocketConnectionState.CONNECTED)
reconnect();
}, reconnectDelay, TimeUnit.MILLISECONDS);
// Schedule the next reconnect according to the delay from the backoff strategy
Future<?> previousReconnection = reconnectTask.getAndSet(
config.taskExecutor().schedule(() -> {
WebsocketConnectionState state = connectionState.get();
if (state != WebsocketConnectionState.CONNECTING && state != WebsocketConnectionState.CONNECTED && !closed.get())
reconnect();
}, reconnectDelay, TimeUnit.MILLISECONDS)
);

// Cancel the previous reconnect task, if outstanding
if (previousReconnection != null)
previousReconnection.cancel(false);
}
} else {
setState(WebsocketConnectionState.DISCONNECTED);
Expand Down Expand Up @@ -149,6 +176,7 @@ public void onPongFrame(WebSocket websocket, WebSocketFrame frame) {

protected WebSocket createWebsocket() throws IOException {
WebSocket ws = webSocketFactory.createSocket(config.baseUrl());
ws.setMissingCloseFrameAllowed(true);
ws.setPingInterval(config.wsPingPeriod());
if (config.headers() != null)
config.headers().forEach(ws::addHeader);
Expand All @@ -172,6 +200,9 @@ protected void setState(WebsocketConnectionState newState) {
public void connect() {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState == WebsocketConnectionState.DISCONNECTED || connectionState == WebsocketConnectionState.RECONNECTING || connectionState == WebsocketConnectionState.LOST) {
if (closed.get())
throw new IllegalStateException("WebsocketConnection was already closed!");
PhilippHeuer marked this conversation as resolved.
Show resolved Hide resolved

try {
// avoid any resource leaks
this.closeSocket();
Expand Down Expand Up @@ -281,15 +312,82 @@ public WebsocketConnectionState getConnectionState() {

@Override
public void close() throws Exception {
disconnect();
if (closed.getAndSet(true))
return; // resource close was already requested

// Cancel backoff clear task, if outstanding
if (backoffClearer != null)
backoffClearer.cancel(false);

// Cancel any outstanding reconnect task
Future<?> reconnector = reconnectTask.getAndSet(null);
if (reconnector != null)
reconnector.cancel(false);

// Disconnect from socket
try {
// This call does not block, so we use CountdownLatch to block
// until the underlying socket fully closes.
disconnect();
} catch (Exception e) {
log.warn("Exception thrown from websocket disconnect attempt", e);
this.closeSocket(); // really make sure the resource was released
} finally {
// await the close of the underlying socket
try {
boolean completed = closeLatch.await(config.closeDelay() + 1000L, TimeUnit.MILLISECONDS);
if (completed) {
log.trace("Underlying websocket complete close was successful");
} else {
log.warn("Underlying websocket did not close within the expected delay");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

@Synchronized
private void closeSocket() {
// Clean up the socket
if (webSocket != null) {
this.webSocket.disconnect();
this.webSocket.clearListeners();
final WebSocket socket = this.webSocket;
if (socket != null) {
// The disconnecting socket no needs to invoke this.webSocketAdapter
socket.clearListeners();

// However, if a full close is requested, we should track when the underlying socket closes to release the latch.
if (closed.get()) {
socket.addListener(new WebSocketAdapter() {
@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
// The underlying java.net.Socket fully closed (e.g., after receiving close); release the latch.
socket.clearListeners();
closeLatch.countDown();
}

@Override
public void onSendError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) {
// Flushing (e.g., of the close frame) failed because the socket was already closed; release latch.
if (cause != null && cause.getError() == WebSocketError.FLUSH_ERROR) {
socket.clearListeners();
closeLatch.countDown();
}
}
});
}

// Similarly, this disconnect call is non-blocking.
// Under the hood, this queues a close frame to be sent to the server,
// and the WritingThread is otherwise stopped.
// This also schedules a task to forcibly close the underlying socket,
// after the close delay passes.
// If the close delay is set very low, the queued close frame may never
// successfully flush, triggering onSendError.
// Lastly, the ReadingThread starts to block, awaiting a close frame from the server.
// Upon receiving a close frame, the socket also closes, indicated by onDisconnected.
socket.disconnect(WebSocketCloseCode.NORMAL, null, config.closeDelay());

// Release the reference to the closing websocket.
this.webSocket = null;
}

Expand Down
Expand Up @@ -42,6 +42,9 @@ public void validate() {
if (socketTimeout < 0) {
throw new RuntimeException("socketTimeout must be 0 or greater, set to 0 to disable!");
}
if (closeDelay < 0) {
throw new RuntimeException("closeDelay must be 0 or greater!");
}
Objects.requireNonNull(taskExecutor, "taskExecutor may not be null!");
Objects.requireNonNull(backoffStrategy, "backoffStrategy may not be null!");
Objects.requireNonNull(onStateChanged, "onStateChanged may not be null!");
Expand Down Expand Up @@ -75,6 +78,14 @@ public void validate() {
*/
private int socketTimeout = 30_000;

/**
* The maximum number of milliseconds to wait after sending a close frame
* to receive confirmation from the server, before fully closing the socket.
* <p>
* This can be set as low as 0 for applications that require prompt socket closes upon disconnect calls.
*/
private int closeDelay = 1_000;

/**
* WebSocket Headers
*/
Expand Down
Expand Up @@ -34,7 +34,8 @@ class MockChat : TwitchChat(
0,
null,
TwitchChatLimitHelper.MOD_MESSAGE_LIMIT,
false
false,
0
) {
@Volatile
var isConnected = false
Expand Down
Expand Up @@ -127,6 +127,7 @@
import com.github.twitch4j.pubsub.events.VideoPlaybackEvent;
import com.github.twitch4j.util.IBackoffStrategy;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -245,8 +246,9 @@ public class TwitchPubSub implements ITwitchPubSub {
* @param botOwnerIds Bot Owner IDs
* @param wsPingPeriod WebSocket Ping Period
* @param connectionBackoffStrategy WebSocket Connection Backoff Strategy
* @param wsCloseDelay Websocket Close Delay
*/
public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor taskExecutor, ProxyConfig proxyConfig, Collection<String> botOwnerIds, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy) {
public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor taskExecutor, ProxyConfig proxyConfig, Collection<String> botOwnerIds, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, int wsCloseDelay) {
this.eventManager = eventManager;
this.taskExecutor = taskExecutor;
this.botOwnerIds = botOwnerIds;
Expand All @@ -255,6 +257,7 @@ public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventM
if (websocketConnection == null) {
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(WEB_SOCKET_SERVER);
spec.closeDelay(wsCloseDelay);
spec.wsPingPeriod(wsPingPeriod);
spec.onStateChanged((oldState, newState) -> eventManager.publish(new PubSubConnectionStateEvent(oldState, newState, this)));
spec.onPreConnect(this::onPreConnect);
Expand Down Expand Up @@ -855,13 +858,14 @@ public long getLatency() {
/**
* Close
*/
@SneakyThrows
@Override
public void close() {
if (!isClosed) {
isClosed = true;
heartbeatTask.cancel(false);
queueTask.cancel(false);
disconnect();
connection.close();
}
}

Expand Down
Expand Up @@ -4,6 +4,7 @@
import com.github.philippheuer.events4j.core.EventManager;
import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.WebsocketConnectionConfig;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.EventManagerUtils;
import com.github.twitch4j.common.util.ThreadUtils;
Expand Down Expand Up @@ -71,6 +72,14 @@ public class TwitchPubSubBuilder {
@With
private int wsPingPeriod = 15_000;

/**
* Websocket Close Delay in ms (0 = minimum)

* @see WebsocketConnectionConfig#closeDelay()
*/
@With
private int wsCloseDelay = 1_000;

/**
* WebSocket Connection Backoff Strategy
*/
Expand Down Expand Up @@ -99,7 +108,7 @@ public TwitchPubSub build() {
// Initialize/Check EventManager
eventManager = EventManagerUtils.validateOrInitializeEventManager(eventManager, defaultEventHandler);

return new TwitchPubSub(this.websocketConnection, this.eventManager, scheduledThreadPoolExecutor, this.proxyConfig, this.botOwnerIds, this.wsPingPeriod, this.connectionBackoffStrategy);
return new TwitchPubSub(this.websocketConnection, this.eventManager, scheduledThreadPoolExecutor, this.proxyConfig, this.botOwnerIds, this.wsPingPeriod, this.connectionBackoffStrategy, this.wsCloseDelay);
}

/**
Expand Down