/
TwitchChat.java
790 lines (691 loc) · 31.3 KB
/
TwitchChat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
package com.github.twitch4j.chat;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.philippheuer.credentialmanager.CredentialManager;
import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.philippheuer.events4j.core.EventManager;
import com.github.twitch4j.auth.providers.TwitchIdentityProvider;
import com.github.twitch4j.chat.enums.CommandSource;
import com.github.twitch4j.chat.enums.NoticeTag;
import com.github.twitch4j.chat.events.AbstractChannelEvent;
import com.github.twitch4j.chat.events.CommandEvent;
import com.github.twitch4j.chat.events.IRCEventHandler;
import com.github.twitch4j.chat.events.channel.ChannelJoinFailureEvent;
import com.github.twitch4j.chat.events.channel.ChannelMessageEvent;
import com.github.twitch4j.chat.events.channel.ChannelNoticeEvent;
import com.github.twitch4j.chat.events.channel.ChannelStateEvent;
import com.github.twitch4j.chat.events.channel.IRCMessageEvent;
import com.github.twitch4j.chat.events.channel.UserStateEvent;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.CryptoUtils;
import com.github.twitch4j.common.util.EscapeUtils;
import io.github.bucket4j.Bucket;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@Slf4j
public class TwitchChat implements ITwitchChat {
public static final int REQUIRED_THREAD_COUNT = 2;
/**
* EventManager
*/
@Getter
private final EventManager eventManager;
/**
* CredentialManager
*/
@Getter
private final CredentialManager credentialManager;
/**
* WebSocket Connection
*/
private final WebsocketConnection connection;
/**
* OAuth2Credential, used to sign in to twitch chat
*/
private OAuth2Credential chatCredential;
/**
* Twitch's official WebSocket Server
*/
public static final String TWITCH_WEB_SOCKET_SERVER = "wss://irc-ws.chat.twitch.tv:443";
/**
* ThirdParty WebSocket Server for Testing
*/
public static final String FDGT_TEST_SOCKET_SERVER = "wss://irc.fdgt.dev";
/**
* Whether the {@link OAuth2Credential} password should be sent when the baseUrl does not
* match the official twitch websocket server, thus bypassing a security check in the library.
*/
protected final boolean sendCredentialToThirdPartyHost;
/**
* Channel Cache Lock
*/
private final ReentrantLock channelCacheLock = new ReentrantLock();
/**
* Current Channels
*/
protected final Set<String> currentChannels = ConcurrentHashMap.newKeySet();
/**
* Cache: ChannelId to ChannelName
*/
protected final Map<String, String> channelIdToChannelName = new ConcurrentHashMap<>();
/**
* Cache: ChannelName to ChannelId
*/
protected final Map<String, String> channelNameToChannelId = new ConcurrentHashMap<>();
/**
* IRC Message Bucket
*/
protected final Bucket ircMessageBucket;
/**
* IRC Whisper Bucket
*/
protected final Bucket ircWhisperBucket;
/**
* IRC Join Bucket
*/
protected final Bucket ircJoinBucket;
/**
* IRC Auth Bucket
*/
protected final Bucket ircAuthBucket;
/**
* IRC Command Queue
*/
protected final BlockingQueue<String> ircCommandQueue;
/**
* IRC Command Queue Thread
*/
protected final ScheduledFuture<?> queueThread;
/**
* Whether {@link #flushCommand} is currently executing
*/
private final AtomicBoolean flushing = new AtomicBoolean();
/**
* Whether an expedited flush has already been submitted
*/
private final AtomicBoolean flushRequested = new AtomicBoolean();
/**
* The {@link Runnable} for flushing the {@link #ircCommandQueue}
*/
private final Runnable flushCommand;
/**
* Command Queue Thread stop flag
*/
protected volatile boolean stopQueueThread = false;
/**
* Bot Owner IDs
*/
protected final Collection<String> botOwnerIds;
/**
* IRC Command Handlers
*/
protected final List<String> commandPrefixes;
/**
* Thread Pool Executor
*/
protected final ScheduledExecutorService taskExecutor;
/**
* Time to wait for an item on the chat queue before continuing to next iteration
* If set too high your thread will be late check to shutdown
*/
protected final long chatQueueTimeout;
/**
* Whether one's own channel should automatically be joined
*/
protected final boolean autoJoinOwnChannel;
/**
* Whether join failures should result in removal from current channels
*/
protected final boolean removeChannelOnJoinFailure;
/**
* Whether JOIN/PART events should be enabled
*/
protected final boolean enableMembershipEvents;
/**
* The maximum number of attempts to make for joining each channel
*/
protected final int maxJoinRetries;
/**
* Minimum milliseconds to wait after a join attempt
*/
protected final long chatJoinTimeout;
/**
* Tracks the timestamp of the last outbound ping
*/
protected final AtomicLong lastPing = new AtomicLong();
/**
* Cache of recent number of join attempts for each channel
*/
protected final Cache<String, Integer> joinAttemptsByChannelName;
/**
* Constructor
*
* @param websocketConnection WebsocketConnection
* @param eventManager EventManager
* @param credentialManager CredentialManager
* @param chatCredential Chat Credential
* @param baseUrl The websocket url for the chat client to connect to
* @param sendCredentialToThirdPartyHost Whether the password should be sent when the baseUrl is not official
* @param commandPrefixes Command Prefixes
* @param chatQueueSize Chat Queue Size
* @param ircMessageBucket Bucket for chat
* @param ircWhisperBucket Bucket for whispers
* @param ircJoinBucket Bucket for joins
* @param ircAuthBucket Bucket for auths
* @param taskExecutor ScheduledThreadPoolExecutor
* @param chatQueueTimeout Timeout to wait for events in Chat Queue
* @param proxyConfig Proxy Configuration
* @param autoJoinOwnChannel Whether one's own channel should automatically be joined
* @param enableMembershipEvents Whether JOIN/PART events should be enabled
* @param botOwnerIds Bot Owner IDs
* @param removeChannelOnJoinFailure Whether channels should be removed after a join failure
* @param maxJoinRetries Maximum join retries per channel
* @param chatJoinTimeout Minimum milliseconds to wait after a join attempt
* @param wsPingPeriod WebSocket Ping Period
*/
public TwitchChat(WebsocketConnection websocketConnection, EventManager eventManager, CredentialManager credentialManager, OAuth2Credential chatCredential, String baseUrl, boolean sendCredentialToThirdPartyHost, Collection<String> commandPrefixes, Integer chatQueueSize, Bucket ircMessageBucket, Bucket ircWhisperBucket, Bucket ircJoinBucket, Bucket ircAuthBucket, ScheduledThreadPoolExecutor taskExecutor, long chatQueueTimeout, ProxyConfig proxyConfig, boolean autoJoinOwnChannel, boolean enableMembershipEvents, Collection<String> botOwnerIds, boolean removeChannelOnJoinFailure, int maxJoinRetries, long chatJoinTimeout, int wsPingPeriod) {
this.eventManager = eventManager;
this.credentialManager = credentialManager;
this.chatCredential = chatCredential;
this.sendCredentialToThirdPartyHost = sendCredentialToThirdPartyHost;
this.commandPrefixes = new ArrayList<>(commandPrefixes);
this.botOwnerIds = botOwnerIds;
this.ircCommandQueue = new ArrayBlockingQueue<>(chatQueueSize, true);
this.ircMessageBucket = ircMessageBucket;
this.ircWhisperBucket = ircWhisperBucket;
this.ircJoinBucket = ircJoinBucket;
this.ircAuthBucket = ircAuthBucket;
this.taskExecutor = taskExecutor;
this.chatQueueTimeout = chatQueueTimeout;
this.autoJoinOwnChannel = autoJoinOwnChannel;
this.enableMembershipEvents = enableMembershipEvents;
this.removeChannelOnJoinFailure = removeChannelOnJoinFailure;
this.maxJoinRetries = maxJoinRetries;
this.chatJoinTimeout = chatJoinTimeout;
// init connection
if (websocketConnection == null) {
this.connection = new WebsocketConnection(spec -> {
spec.baseUrl(baseUrl);
spec.wsPingPeriod(wsPingPeriod);
spec.onConnected(this::onConnected);
spec.onTextMessage(this::onTextMessage);
spec.onDisconnecting(this::onDisconnecting);
if (proxyConfig != null) {
spec.proxyHost(proxyConfig.getHostname());
spec.proxyPort(proxyConfig.getPort());
spec.proxyUsername(proxyConfig.getUsername());
spec.proxyPassword(String.valueOf(proxyConfig.getPassword()));
}
});
} else {
this.connection = websocketConnection;
}
// credential validation
if (this.chatCredential == null) {
log.info("TwitchChat: No ChatAccount provided, Chat will be joined anonymously! Please look at the docs Twitch4J -> Chat if this is unintentional");
} else if (this.chatCredential.getUserName() == null) {
log.debug("TwitchChat: AccessToken does not contain any user information, fetching using the CredentialManager ...");
// credential manager
Optional<OAuth2Credential> credential = credentialManager.getOAuth2IdentityProviderByName("twitch")
.orElse(new TwitchIdentityProvider(null, null, null))
.getAdditionalCredentialInformation(this.chatCredential);
if (credential.isPresent()) {
this.chatCredential = credential.get();
} else {
log.error("TwitchChat: Failed to get AccessToken Information, the token is probably not valid. Please check the docs Twitch4J -> Chat on how to obtain a valid token.");
}
}
// register with serviceMediator
this.eventManager.getServiceMediator().addService("twitch4j-chat", this);
// register event listeners
IRCEventHandler ircEventHandler = new IRCEventHandler(this);
// connect to irc
connection.connect();
// queue command worker
this.flushCommand = () -> {
if (flushing.getAndSet(true)) return;
while (!stopQueueThread && connection.getConnectionState() == WebsocketConnectionState.CONNECTED) {
String command = null;
try {
// Send the command
command = ircCommandQueue.poll();
if (command == null) break;
sendTextToWebSocket(command, false);
// Logging
log.debug("Processed command from queue: [{}].", command.startsWith("PASS") ? "***OAUTH TOKEN HIDDEN***" : command);
} catch (Exception ex) {
log.error("Chat: Unexpected error in worker thread", ex);
// Reschedule command for processing
if (command != null) {
try {
ircCommandQueue.offer(command, this.chatQueueTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("Failed to reschedule command", e);
}
}
break;
}
}
flushRequested.set(false);
flushing.set(false);
};
// Thread will start right now
this.queueThread = taskExecutor.scheduleAtFixedRate(flushCommand, 0, this.chatQueueTimeout, TimeUnit.MILLISECONDS);
log.debug("Started IRC Queue Worker");
// Event Handlers
log.debug("Registering the following command triggers: {}", commandPrefixes);
// register event handler
eventManager.onEvent("twitch4j-chat-command-trigger", ChannelMessageEvent.class, this::onChannelMessage);
eventManager.onEvent(IRCMessageEvent.class, event -> {
// we get at least one room state event with channel name + id when we join a channel, so we cache that to provide channel id + name for all events
if ("ROOMSTATE".equalsIgnoreCase(event.getCommandType())) {
// check that channel id / name are present and that we didn't leave the channel yet
if (event.getChannelId() != null) {
channelCacheLock.lock();
try {
// store mapping info into channelIdToChannelName / channelNameToChannelId
event.getChannelName().map(String::toLowerCase).filter(currentChannels::contains).ifPresent(name -> {
String oldName = channelIdToChannelName.put(event.getChannelId(), name);
if (!name.equals(oldName)) {
if (oldName != null) channelNameToChannelId.remove(oldName, event.getChannelId());
channelNameToChannelId.put(name, event.getChannelId());
}
});
} finally {
channelCacheLock.unlock();
}
}
}
});
// Initialize joinAttemptsByChannelName (on an attempt expiring without explicit removal, we retry with exponential backoff)
if (maxJoinRetries > 0) {
final long initialWait = Math.max(chatJoinTimeout, 0);
this.joinAttemptsByChannelName = Caffeine.newBuilder()
.expireAfterWrite(initialWait, TimeUnit.MILLISECONDS)
.scheduler(Scheduler.forScheduledExecutorService(taskExecutor)) // required for prompt removals on java 8
.<String, Integer>evictionListener((name, attempts, cause) -> {
if (cause == RemovalCause.EXPIRED && name != null && attempts != null) {
if (attempts < maxJoinRetries) {
taskExecutor.schedule(() -> {
if (currentChannels.contains(name)) {
issueJoin(name, attempts + 1);
}
}, initialWait * (1L << (Math.min(attempts, 16) + 1)), TimeUnit.MILLISECONDS); // exponential backoff (pow2 optimization)
} else if (removeChannelOnJoinFailure && removeCurrentChannel(name)) {
eventManager.publish(new ChannelJoinFailureEvent(name, ChannelJoinFailureEvent.Reason.RETRIES_EXHAUSTED));
} else {
log.warn("Chat connection exhausted retries when attempting to join channel: {}", name);
}
}
})
.build();
} else {
this.joinAttemptsByChannelName = Caffeine.newBuilder().maximumSize(0).build(); // optimization
}
// Remove successfully joined channels from joinAttemptsByChannelName (as further retries are not needed)
Consumer<AbstractChannelEvent> joinListener = e -> joinAttemptsByChannelName.invalidate(e.getChannel().getName().toLowerCase());
eventManager.onEvent(ChannelStateEvent.class, joinListener::accept);
eventManager.onEvent(ChannelNoticeEvent.class, joinListener::accept);
eventManager.onEvent(UserStateEvent.class, joinListener::accept);
// Ban Listener
final Set<NoticeTag> banNotices = EnumSet.of(NoticeTag.MSG_BANNED, NoticeTag.MSG_CHANNEL_SUSPENDED, NoticeTag.TOS_BAN); // bit vector
eventManager.onEvent(ChannelNoticeEvent.class, e -> {
String name = e.getChannel().getName();
NoticeTag type = e.getType();
if (removeChannelOnJoinFailure && banNotices.contains(type) && removeCurrentChannel(name)) {
ChannelJoinFailureEvent.Reason reason = type == NoticeTag.MSG_BANNED ? ChannelJoinFailureEvent.Reason.USER_BANNED : ChannelJoinFailureEvent.Reason.CHANNEL_SUSPENDED;
eventManager.publish(new ChannelJoinFailureEvent(name, reason));
}
});
}
protected void onConnected() {
String baseUrl = connection.getConfig().baseUrl();
log.info("Connecting to Twitch IRC {}", baseUrl);
// acquire capabilities
sendTextToWebSocket("CAP REQ :twitch.tv/tags twitch.tv/commands" + (enableMembershipEvents ? " twitch.tv/membership" : ""), true);
sendTextToWebSocket("CAP END", true);
// sign in
String userName;
if (chatCredential != null) {
boolean sendRealPass = sendCredentialToThirdPartyHost // check whether this security feature has been overridden
|| baseUrl.equalsIgnoreCase(TWITCH_WEB_SOCKET_SERVER) // check whether the url is exactly the official one
|| baseUrl.equalsIgnoreCase(TWITCH_WEB_SOCKET_SERVER.substring(0, TWITCH_WEB_SOCKET_SERVER.length() - 4)); // check whether the url matches without the port
sendTextToWebSocket(String.format("pass oauth:%s", sendRealPass ? chatCredential.getAccessToken() : CryptoUtils.generateNonce(30)), true);
userName = String.valueOf(chatCredential.getUserName()).toLowerCase();
} else {
userName = "justinfan" + ThreadLocalRandom.current().nextInt(100000);
}
sendTextToWebSocket(String.format("nick %s", userName), true);
// Join defined channels, in case we reconnect or weren't connected yet when we called joinChannel
for (String channel : currentChannels) {
issueJoin(channel);
}
// then join to own channel - required for sending or receiving whispers
if (chatCredential != null && chatCredential.getUserName() != null) {
if (autoJoinOwnChannel && !currentChannels.contains(userName))
joinChannel(userName);
} else {
log.warn("Chat: The whispers feature is currently not available because the provided credential does not hold information about the user. Please check the documentation on how to pass the token to the credentialManager where it will be enriched with the required information.");
}
}
protected void onTextMessage(String text) {
Arrays.asList(text.replace("\n\r", "\n")
.replace("\r", "\n").split("\n"))
.forEach(message -> {
if (!message.equals("")) {
// Handle messages
log.trace("Received WebSocketMessage: " + message);
// - CAP
if (message.startsWith(":tmi.twitch.tv 410") || message.startsWith(":tmi.twitch.tv CAP * NAK")) {
log.error("Failed to acquire requested IRC capabilities!");
}
// - CAP ACK
else if (message.startsWith(":tmi.twitch.tv CAP * ACK :")) {
List<String> capabilities = Arrays.asList(message.substring(":tmi.twitch.tv CAP * ACK :".length()).split(" "));
capabilities.forEach(cap -> log.debug("Acquired chat capability: " + cap));
}
// - Ping
else if (message.equalsIgnoreCase("PING :tmi.twitch.tv")) {
sendTextToWebSocket("PONG :tmi.twitch.tv", true);
log.debug("Responding to PING request!");
}
// - Login failed.
else if (message.equalsIgnoreCase(":tmi.twitch.tv NOTICE * :Login authentication failed")) {
log.error("Invalid IRC Credentials. Login failed!");
}
// - Parse IRC Message
else {
try {
IRCMessageEvent event = new IRCMessageEvent(message, channelIdToChannelName, channelNameToChannelId, botOwnerIds);
if (event.isValid()) {
eventManager.publish(event);
} else {
log.trace("Can't parse {}", event.getRawMessage());
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
}
});
}
protected void onDisconnecting() {
sendTextToWebSocket("QUIT", true); // safe disconnect
}
/**
* Connecting to IRC-WS
*/
public void connect() {
if (connection.getConnectionState().equals(WebsocketConnectionState.DISCONNECTED) || connection.getConnectionState().equals(WebsocketConnectionState.RECONNECTING)) {
if (chatCredential != null) {
// Wait for AUTH limit before opening the connection
ircAuthBucket.asBlocking().consumeUninterruptibly(1L);
}
}
connection.connect();
}
/**
* Disconnecting from IRC-WS
*/
public void disconnect() {
connection.disconnect();
}
/**
* Reconnecting to IRC-WS
*/
public void reconnect() {
connection.reconnect();
}
/**
* Send IRC Command
*
* @param command IRC Command
* @param args command arguments
*/
protected void sendCommand(String command, String... args) {
sendRaw(String.format("%s %s", command.toUpperCase(), String.join(" ", args)));
}
/**
* Send raw irc command
*
* @param command raw irc command
*/
public boolean sendRaw(String command) {
return ircMessageBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(() -> queueCommand(command), taskExecutor) != null;
}
/**
* Adds a raw irc command to the queue without checking bucket headroom.
*
* @param command Raw IRC command to be queued.
*/
private void queueCommand(String command) {
// Add command to the queue, waiting for a period of time if necessary
if (!ircCommandQueue.offer(command)) {
try {
ircCommandQueue.offer(command, chatQueueTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("Chat: unable to add command to full queue", e);
return;
}
}
// Expedite command execution if we aren't already flushing the queue and another expedition hasn't already been requested
if (!flushing.get() && !flushRequested.getAndSet(true))
taskExecutor.schedule(flushCommand, chatQueueTimeout / 20, TimeUnit.MILLISECONDS); // allow for some accumulation of requests before flushing
}
/**
* Send IRC Command (for Login/...)
* <p>
* Sends important irc commands for login / capabilities and similar.
* Will consume tokens to respect the ratelimit, but will bypass the limit if the bucket is empty.
*
* @param command IRC Command
* @param consumeToken should a token be consumed when sending this text?
*/
private boolean sendTextToWebSocket(String command, Boolean consumeToken) {
// will send text only if CONNECTED or CONNECTING
if (!connection.getConnectionState().equals(WebsocketConnectionState.CONNECTED) && !connection.getConnectionState().equals(WebsocketConnectionState.CONNECTING)) {
return false;
}
// consume tokens if available, but ignore if not as those are important system commands (CAP, Login, ...)
if (consumeToken)
ircMessageBucket.tryConsume(1L);
// send message
this.connection.sendText(command);
return true;
}
/**
* Joining the channel
*
* @param channelName channel name
*/
@Override
public void joinChannel(String channelName) {
String lowerChannelName = channelName.toLowerCase();
channelCacheLock.lock();
try {
if (currentChannels.add(lowerChannelName)) {
issueJoin(lowerChannelName);
log.debug("Joining Channel [{}].", lowerChannelName);
} else {
log.warn("Already joined channel {}", channelName);
}
} finally {
channelCacheLock.unlock();
}
}
private void issueJoin(String channelName) {
this.issueJoin(channelName, 0);
}
protected void issueJoin(String channelName, int attempts) {
ircJoinBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(
() -> {
String name = channelName.toLowerCase();
queueCommand("JOIN #" + name);
joinAttemptsByChannelName.asMap().merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state
},
taskExecutor
);
}
/**
* leaving the channel
*
* @param channelName channel name
*/
@Override
public boolean leaveChannel(String channelName) {
String lowerChannelName = channelName.toLowerCase();
channelCacheLock.lock();
try {
if (currentChannels.remove(lowerChannelName)) {
issuePart(lowerChannelName);
log.debug("Leaving Channel [{}].", lowerChannelName);
// clear cache
String cachedId = channelNameToChannelId.remove(lowerChannelName);
if (cachedId != null) channelIdToChannelName.remove(cachedId);
return true;
} else {
log.warn("Already left channel {}", channelName);
return false;
}
} finally {
channelCacheLock.unlock();
}
}
private void issuePart(String channelName) {
ircJoinBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(
() -> queueCommand("PART #" + channelName.toLowerCase()),
taskExecutor
);
}
private boolean removeCurrentChannel(String channelName) {
channelCacheLock.lock();
try {
if (currentChannels.remove(channelName)) {
String id = channelNameToChannelId.remove(channelName);
if (id != null) channelIdToChannelName.remove(id);
return true;
} else {
return false;
}
} finally {
channelCacheLock.unlock();
}
}
@Override
public boolean sendMessage(String channel, String message, Map<String, Object> tags) {
StringBuilder sb = new StringBuilder();
if (tags != null && !tags.isEmpty()) {
sb.append('@');
tags.forEach((k, v) -> sb.append(k).append('=').append(EscapeUtils.escapeTagValue(v)).append(';'));
sb.setCharAt(sb.length() - 1, ' '); // replace last semi-colon with space
}
sb.append("PRIVMSG #").append(channel.toLowerCase()).append(" :").append(message);
log.debug("Adding message for channel [{}] with content [{}] to the queue.", channel.toLowerCase(), message);
return sendRaw(sb.toString());
}
/**
* Sends a user a private message
*
* @param targetUser username
* @param message message
*/
public void sendPrivateMessage(String targetUser, String message) {
log.debug("Adding private message for user [{}] with content [{}] to the queue.", targetUser, message);
ircWhisperBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(
() -> queueCommand(String.format("PRIVMSG #%s :/w %s %s", chatCredential.getUserName().toLowerCase(), targetUser, message)),
taskExecutor
);
}
/**
* On Channel Message
*
* @param event ChannelMessageEvent
*/
private void onChannelMessage(ChannelMessageEvent event) {
Optional<String> prefix = Optional.empty();
Optional<String> commandWithoutPrefix = Optional.empty();
// try to find a `command` based on the prefix
for (String commandPrefix : this.commandPrefixes) {
if (event.getMessage().startsWith(commandPrefix)) {
prefix = Optional.of(commandPrefix);
commandWithoutPrefix = Optional.of(event.getMessage().substring(commandPrefix.length()));
break;
}
}
// is command?
if (commandWithoutPrefix.isPresent()) {
log.debug("Detected a command in channel {} with content: {}", event.getChannel().getName(), commandWithoutPrefix.get());
// dispatch command event
eventManager.publish(new CommandEvent(CommandSource.CHANNEL, event.getChannel().getName(), event.getUser(), prefix.get(), commandWithoutPrefix.get(), event.getPermissions()));
}
}
/**
* Close
*/
@Override
public void close() {
this.stopQueueThread = true;
queueThread.cancel(false);
this.disconnect();
}
@Override
public boolean isChannelJoined(String channelName) {
return currentChannels.contains(channelName.toLowerCase());
}
/**
* Returns a set of all currently joined channels (without # prefix)
*
* @return a set of channel names
* @deprecated use getChannels() instead
*/
@Deprecated
public List<String> getCurrentChannels() {
return Collections.unmodifiableList(new ArrayList<>(currentChannels));
}
@Override
public Set<String> getChannels() {
return Collections.unmodifiableSet(currentChannels);
}
/**
* @return the cached map used for channel id to name mapping
*/
@Override
public Map<String, String> getChannelIdToChannelName() {
return Collections.unmodifiableMap(channelIdToChannelName);
}
/**
* @return the cached map used for channel name to id mapping
*/
@Override
public Map<String, String> getChannelNameToChannelId() {
return Collections.unmodifiableMap(channelNameToChannelId);
}
public long getLatency() {
return connection.getLatency();
}
}