Skip to content

Commit

Permalink
feat: common websocket client (#552)
Browse files Browse the repository at this point in the history
* feat: add websocket client module and use it in twitch chat

* feat: use new websocket-client module in pubsub

* chore: keep the dependency on common for client-websocket for now

* fix: apply pr feedback

* chore: use proxyConfig in websocket-common

* fix: honor ExponentialBackoffStrategy maxRetries

* fix: update tests for the latest changes

* feat: move some util classes into a util module / add IBackoffStrategy

* refactor: make IBackoffStrategy#sleep default

* chore: switch nv-websocket-client dep from api to implementation

Co-authored-by: Sidd <iProdigy@users.noreply.github.com>

* feat: allow setting headers for the ws connection

* feat: customizable websocket connection backoff strategy for chat/pubsub

* fix: set default of ws conn retry immediateFirst to false

Co-authored-by: Sidd <iProdigy@users.noreply.github.com>

Co-authored-by: Sidd <iProdigy@users.noreply.github.com>
  • Loading branch information
PhilippHeuer and iProdigy committed May 5, 2022
1 parent 91788a0 commit 8e0c28f
Show file tree
Hide file tree
Showing 29 changed files with 1,622 additions and 985 deletions.
7 changes: 3 additions & 4 deletions chat/build.gradle.kts
@@ -1,8 +1,5 @@
// In this section you declare the dependencies for your production and test code
dependencies {
// WebSocket
api(group = "com.neovisionaries", name = "nv-websocket-client")

// Rate Limiting
api(group = "com.github.vladimir-bukhtoyarov", name = "bucket4j-core")

Expand All @@ -12,10 +9,12 @@ dependencies {
// Twitch4J Modules
api(project(":twitch4j-common"))
api(project(":twitch4j-auth"))
api(project(":twitch4j-client-websocket"))

// Mocking
// Testing
testImplementation(group = "org.mockito", name = "mockito-core")
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter")
testImplementation(group = "org.awaitility", name = "awaitility")
}

tasks.javadoc {
Expand Down
408 changes: 152 additions & 256 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java

Large diffs are not rendered by default.

27 changes: 24 additions & 3 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChatBuilder.java
Expand Up @@ -5,20 +5,27 @@
import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.philippheuer.events4j.api.service.IEventHandler;
import com.github.philippheuer.events4j.core.EventManager;
import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.auth.providers.TwitchIdentityProvider;
import com.github.twitch4j.chat.events.channel.ChannelJoinFailureEvent;
import com.github.twitch4j.chat.util.TwitchChatLimitHelper;
import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.config.Twitch4JGlobal;
import com.github.twitch4j.common.enums.TwitchLimitType;
import com.github.twitch4j.common.util.BucketUtils;
import com.github.twitch4j.common.util.EventManagerUtils;
import com.github.twitch4j.common.util.ThreadUtils;
import com.github.twitch4j.common.util.TwitchLimitRegistry;
import com.github.twitch4j.util.IBackoffStrategy;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import lombok.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.With;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
Expand All @@ -42,6 +49,14 @@
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class TwitchChatBuilder {

/**
* WebsocketConnection
* <p>
* can be used to inject a mocked connection into the TwitchChat instance
*/
@With(AccessLevel.PROTECTED)
private WebsocketConnection websocketConnection = null;

/**
* Client Id
*/
Expand Down Expand Up @@ -228,6 +243,12 @@ public class TwitchChatBuilder {
@With
private int wsPingPeriod = 15_000;

/**
* WebSocket Connection Backoff Strategy
*/
@With
private IBackoffStrategy connectionBackoffStrategy = null;

/**
* Initialize the builder
*
Expand Down Expand Up @@ -282,7 +303,7 @@ public TwitchChat build() {
ircAuthBucket = userId == null ? BucketUtils.createBucket(this.authRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_AUTH_LIMIT, Collections.singletonList(authRateLimit));

log.debug("TwitchChat: Initializing Module ...");
return new TwitchChat(this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod);
return new TwitchChat(this.websocketConnection, this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod, this.connectionBackoffStrategy);
}

/**
Expand Down
Expand Up @@ -9,6 +9,7 @@
import com.github.twitch4j.common.annotation.Unofficial;
import com.github.twitch4j.common.pool.TwitchModuleConnectionPool;
import com.github.twitch4j.common.util.ChatReply;
import com.github.twitch4j.util.IBackoffStrategy;
import io.github.bucket4j.Bandwidth;
import lombok.Builder;
import lombok.NonNull;
Expand Down Expand Up @@ -95,6 +96,12 @@ public class TwitchChatConnectionPool extends TwitchModuleConnectionPool<TwitchC
@Builder.Default
protected Bandwidth authRateLimit = TwitchChatLimitHelper.USER_AUTH_LIMIT;

/**
* WebSocket Connection Backoff Strategy
*/
@Builder.Default
private IBackoffStrategy connectionBackoffStrategy = null;

@Override
public boolean sendMessage(String channel, String message, @Nullable Map<String, Object> tags) {
return this.sendMessage(channel, channel, message, tags);
Expand Down Expand Up @@ -257,6 +264,7 @@ protected TwitchChat createConnection() {
.withJoinRateLimit(joinRateLimit)
.withAuthRateLimit(authRateLimit)
.withAutoJoinOwnChannel(false) // user will have to manually send a subscribe call to enable whispers. this avoids duplicating whisper events
.withConnectionBackoffStrategy(connectionBackoffStrategy)
).build();

// Reclaim channel headroom upon generic join failures
Expand Down
@@ -1,12 +1,9 @@
package com.github.twitch4j.chat.enums;

/**
* WebSocket - Connection State
*/
public enum TMIConnectionState {
DISCONNECTING,
RECONNECTING,
DISCONNECTED,
CONNECTING,
CONNECTED
DISCONNECTING,
RECONNECTING,
DISCONNECTED,
CONNECTING,
CONNECTED
}
@@ -0,0 +1,88 @@
package com.github.twitch4j.chat;

import com.github.twitch4j.chat.util.TestUtils;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.time.Duration;

import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.times;

@Slf4j
public class TwitchChatRateLimitTest {

private WebsocketConnection connection;

private TwitchChat twitchChat;

@BeforeEach
void init() {
// mock connection
connection = Mockito.mock(WebsocketConnection.class);

// construct twitchChat
twitchChat = TwitchChatBuilder.builder()
.withWebsocketConnection(connection)
.withChatAccount(TestUtils.getCredential())
.withCommandTrigger("!")
.build();
}

@Test
public void testJoinRateLimit() {
// fake connected state
Mockito.when(connection.getConnectionState()).thenReturn(WebsocketConnectionState.CONNECTED);

// join a bunch of channels
for (int i = 1; i <= 100; i++) {
twitchChat.joinChannel("twitch4j" + i);
}

// wait
await().atMost(Duration.ofSeconds(1)).until(() -> twitchChat.ircCommandQueue.size() <= 80);

// verify
Mockito.verify(connection, times(20)).sendText(startsWith("JOIN #twitch4j"));
}

@Test
public void testChannelMessageRateLimit() {
// fake connected state
Mockito.when(connection.getConnectionState()).thenReturn(WebsocketConnectionState.CONNECTED);

// join a bunch of messages
for (int i = 1; i <= 100; i++) {
twitchChat.sendMessage("twitch4j", "Hello @twitch4j");
}

// wait for all commands to be processed
await().atMost(Duration.ofSeconds(1)).until(() -> twitchChat.ircCommandQueue.size() <= 80);

// verify
Assertions.assertEquals(0, twitchChat.ircCommandQueue.size(), "there shouldn't be any queued messages left");
Mockito.verify(connection, times(20)).sendText(eq("PRIVMSG #twitch4j :Hello @twitch4j"));
}

@Test
public void testAuthRateLimit() {
// fake disconnected state
Mockito.when(connection.getConnectionState()).thenReturn(WebsocketConnectionState.DISCONNECTED);

// connect a few times
for (int i = 1; i <= 20; i++) {
twitchChat.connect();
}

// verify
Assertions.assertEquals(0, twitchChat.ircAuthBucket.getAvailableTokens(), "we should have used up all auth tokens");
}

}
127 changes: 71 additions & 56 deletions chat/src/test/java/com/github/twitch4j/chat/TwitchChatTest.java
@@ -1,86 +1,101 @@
package com.github.twitch4j.chat;

import com.github.philippheuer.credentialmanager.CredentialManager;
import com.github.philippheuer.credentialmanager.CredentialManagerBuilder;
import com.github.philippheuer.events4j.core.EventManager;
import com.github.twitch4j.auth.providers.TwitchIdentityProvider;
import com.github.twitch4j.chat.events.CommandEvent;
import com.github.twitch4j.chat.events.channel.ChannelMessageEvent;
import com.github.twitch4j.chat.events.channel.IRCMessageEvent;
import com.github.twitch4j.chat.util.TestUtils;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.common.enums.CommandPermission;
import com.github.twitch4j.common.test.TestEventManager;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.List;
import java.time.Duration;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;

@Slf4j
@Tag("integration")
public class TwitchChatTest {

private static TwitchChat twitchChat;
private WebsocketConnection connection;

@BeforeAll
public static void connectToChat() {
// event manager
EventManager eventManager = new EventManager();
eventManager.autoDiscovery();
private TestEventManager eventManager;

// credential manager
CredentialManager credentialManager = CredentialManagerBuilder.builder().build();
credentialManager.registerIdentityProvider(new TwitchIdentityProvider("jzkbprff40iqj646a697cyrvl0zt2m6", "**SECRET**", ""));
private TwitchChat twitchChat;

@BeforeEach
void init() {
// mock connection
connection = Mockito.mock(WebsocketConnection.class);

// mock eventManager
eventManager = new TestEventManager();

// construct twitchChat
twitchChat = TwitchChatBuilder.builder()
.withEventManager(eventManager)
.withCredentialManager(credentialManager)
.withChatAccount(TestUtils.getCredential())
.withCommandTrigger("!")
.build();

// sleep for a few seconds so that we're connected
TestUtils.sleepFor(5000);
.withWebsocketConnection(connection)
.withEventManager(eventManager)
.withChatAccount(TestUtils.getCredential())
.withCommandTrigger("!")
.build();
}

@Test
@DisplayName("Tests sending and receiving channel messages")
public void sendTwitchChannelMessage() {
// listen for events in channel
List<ChannelMessageEvent> channelMessages = new ArrayList<>();
twitchChat.joinChannel("twitch4j");
twitchChat.getEventManager().onEvent(ChannelMessageEvent.class, event -> {
channelMessages.add(event);
log.debug(event.toString());
});
public void testJoinChannel() {
// fake connected state
Mockito.when(connection.getConnectionState()).thenReturn(WebsocketConnectionState.CONNECTED);

// send message to channel
twitchChat.sendMessage("twitch4j", "Hello @twitch4j");
// join channel
twitchChat.joinChannel("twitch4j");

// sleep a second and look of the message was sended
TestUtils.sleepFor(1000);
// wait for all commands to be processed
await().atMost(Duration.ofSeconds(1)).until(() -> twitchChat.ircCommandQueue.size() == 0);

// check if the message was send and received
assertTrue(twitchChat.ircCommandQueue.size() == 0, "Can't find the message we send in the received messages!");
// verify
Assertions.assertEquals(WebsocketConnectionState.CONNECTED, connection.getConnectionState(), "should be CONNECTED");
Assertions.assertEquals(0, twitchChat.ircCommandQueue.size(), "there shouldn't be any queued messages left");
Mockito.verify(connection, times(1)).sendText(eq("JOIN #twitch4j"));
}

@Test
@DisplayName("Local test to keep it running for debugging")
@Disabled
public void localTestRun() {
// listen for events in channel
List<ChannelMessageEvent> channelMessages = new ArrayList<>();
twitchChat.joinChannel("twitch4j");
twitchChat.getEventManager().onEvent(ChannelMessageEvent.class, event -> {
log.debug(event.toString());
});
public void testSendChannelMessage() {
// fake connected state
Mockito.when(connection.getConnectionState()).thenReturn(WebsocketConnectionState.CONNECTED);

twitchChat.getEventManager().onEvent(CommandEvent.class, event -> {
log.debug(event.toString());
});
// send messages
twitchChat.sendMessage("twitch4j", "Hello @twitch4j");

// wait for all commands to be processed
await().atMost(Duration.ofSeconds(1)).until(() -> twitchChat.ircCommandQueue.size() == 0);

// sleep a second and look of the message was sended
TestUtils.sleepFor(120000);
// verify
Assertions.assertEquals(WebsocketConnectionState.CONNECTED, connection.getConnectionState(), "should be CONNECTED");
Assertions.assertEquals(0, twitchChat.ircCommandQueue.size(), "there shouldn't be any queued messages left");
Mockito.verify(connection, times(1)).sendText(eq("PRIVMSG #twitch4j :Hello @twitch4j"));
}

@Test
public void testReceiveChannelMessage() {
// simulate a message
twitchChat.onTextMessage("@badge-info=;badges=moments/1;client-nonce=2a752cf1b27d354c11cbc1b845229091;color=#00FF7F;display-name=Twitch4J;emotes=;first-msg=0;flags=;id=7bb22cd5-4882-4d79-b12f-8c9473004542;mod=0;room-id=149223493;subscriber=0;tmi-sent-ts=1647099473133;turbo=0;user-id=149223493;user-type= :twitch4j!twitch4j@twitch4j.tmi.twitch.tv PRIVMSG #twitch4j :hello world");

// expect a IRCMessageEvent and ChannelMessageEvent
Assertions.assertEquals(2, eventManager.getPublishedEvents().size());
Assertions.assertTrue(eventManager.getPublishedEvents().get(0) instanceof IRCMessageEvent);
Assertions.assertTrue(eventManager.getPublishedEvents().get(1) instanceof ChannelMessageEvent);
ChannelMessageEvent event = ((ChannelMessageEvent) eventManager.getPublishedEvents().get(1));
Assertions.assertEquals("2a752cf1b27d354c11cbc1b845229091", event.getNonce());
Assertions.assertEquals(false, event.isDesignatedFirstMessage());
Assertions.assertTrue(event.getPermissions().contains(CommandPermission.EVERYONE));
Assertions.assertEquals("twitch4j", event.getChannel().getName());
Assertions.assertEquals("twitch4j", event.getUser().getName());
Assertions.assertEquals("Twitch4J", event.getMessageEvent().getTags().get("display-name"));
Assertions.assertEquals("hello world", event.getMessage());
}

}

0 comments on commit 8e0c28f

Please sign in to comment.