Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: comply with undocumented helix rate limits #561

Merged
merged 11 commits into from May 4, 2022
30 changes: 15 additions & 15 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
Expand Up @@ -21,6 +21,7 @@
import com.github.twitch4j.chat.events.channel.IRCMessageEvent;
import com.github.twitch4j.chat.events.channel.UserStateEvent;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.BucketUtils;
import com.github.twitch4j.common.util.CryptoUtils;
import com.github.twitch4j.common.util.EscapeUtils;
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
Expand Down Expand Up @@ -674,7 +675,7 @@ protected void sendCommand(String command, String... args) {
* @param command raw irc command
*/
public boolean sendRaw(String command) {
return ircMessageBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(() -> queueCommand(command), taskExecutor) != null;
return BucketUtils.scheduleAgainstBucket(ircMessageBucket, taskExecutor, () -> queueCommand(command)) != null;
}

/**
Expand Down Expand Up @@ -750,14 +751,11 @@ private void issueJoin(String channelName) {
}

protected void issueJoin(String channelName, int attempts) {
ircJoinBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(
() -> {
String name = channelName.toLowerCase();
queueCommand("JOIN #" + name);
joinAttemptsByChannelName.asMap().merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state
},
taskExecutor
);
BucketUtils.scheduleAgainstBucket(ircJoinBucket, taskExecutor, () -> {
String name = channelName.toLowerCase();
queueCommand("JOIN #" + name);
joinAttemptsByChannelName.asMap().merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state
});
}

/**
Expand Down Expand Up @@ -790,9 +788,10 @@ public boolean leaveChannel(String channelName) {
}

private void issuePart(String channelName) {
ircJoinBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(
() -> queueCommand("PART #" + channelName.toLowerCase()),
taskExecutor
BucketUtils.scheduleAgainstBucket(
ircJoinBucket,
taskExecutor,
() -> queueCommand("PART #" + channelName.toLowerCase())
);
}

Expand Down Expand Up @@ -833,9 +832,10 @@ public boolean sendMessage(String channel, String message, Map<String, Object> t
*/
public void sendPrivateMessage(String targetUser, String message) {
log.debug("Adding private message for user [{}] with content [{}] to the queue.", targetUser, message);
ircWhisperBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(
() -> queueCommand(String.format("PRIVMSG #%s :/w %s %s", chatCredential.getUserName().toLowerCase(), targetUser, message)),
taskExecutor
BucketUtils.scheduleAgainstBucket(
ircWhisperBucket,
taskExecutor,
() -> queueCommand(String.format("PRIVMSG #%s :/w %s %s", chatCredential.getUserName().toLowerCase(), targetUser, message))
);
}

Expand Down
Expand Up @@ -12,6 +12,7 @@
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.config.Twitch4JGlobal;
import com.github.twitch4j.common.enums.TwitchLimitType;
import com.github.twitch4j.common.util.BucketUtils;
import com.github.twitch4j.common.util.EventManagerUtils;
import com.github.twitch4j.common.util.ThreadUtils;
import com.github.twitch4j.common.util.TwitchLimitRegistry;
Expand Down Expand Up @@ -269,16 +270,16 @@ public TwitchChat build() {
}

if (ircMessageBucket == null)
ircMessageBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.chatRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_MESSAGE_LIMIT, Collections.singletonList(chatRateLimit));
ircMessageBucket = userId == null ? BucketUtils.createBucket(this.chatRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_MESSAGE_LIMIT, Collections.singletonList(chatRateLimit));

if (ircWhisperBucket == null)
ircWhisperBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.whisperRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_WHISPER_LIMIT, Arrays.asList(whisperRateLimit));
ircWhisperBucket = userId == null ? BucketUtils.createBucket(this.whisperRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_WHISPER_LIMIT, Arrays.asList(whisperRateLimit));

if (ircJoinBucket == null)
ircJoinBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.joinRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_JOIN_LIMIT, Collections.singletonList(joinRateLimit));
ircJoinBucket = userId == null ? BucketUtils.createBucket(this.joinRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_JOIN_LIMIT, Collections.singletonList(joinRateLimit));

if (ircAuthBucket == null)
ircAuthBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.authRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_AUTH_LIMIT, Collections.singletonList(authRateLimit));
ircAuthBucket = userId == null ? BucketUtils.createBucket(this.authRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_AUTH_LIMIT, Collections.singletonList(authRateLimit));

log.debug("TwitchChat: Initializing Module ...");
return new TwitchChat(this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod);
Expand Down
@@ -1,9 +1,9 @@
package com.github.twitch4j.chat.util;

import com.github.twitch4j.common.enums.TwitchLimitType;
import com.github.twitch4j.common.util.BucketUtils;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.local.LocalBucketBuilder;
import lombok.experimental.UtilityClass;

import java.time.Duration;
Expand Down Expand Up @@ -103,24 +103,34 @@ public class TwitchChatLimitHelper {
*/
public final Bandwidth VERIFIED_AUTH_LIMIT = Bandwidth.simple(200, Duration.ofSeconds(10)).withId(AUTH_BANDWIDTH_ID);

/**
* @param limit bandwidth
* @return bucket
* @deprecated in favor of BucketUtils
*/
@Deprecated
public Bucket createBucket(Bandwidth limit) {
return Bucket.builder().addLimit(limit).build();
return BucketUtils.createBucket(limit);
}

/**
* @param limits bandwidths
* @return bucket
* @deprecated in favor of BucketUtils
*/
@Deprecated
public Bucket createBucket(Bandwidth... limits) {
LocalBucketBuilder builder = Bucket.builder();
for (Bandwidth limit : limits) {
builder.addLimit(limit);
}
return builder.build();
return BucketUtils.createBucket(limits);
}

/**
* @param limits bandwidths
* @return bucket
* @deprecated in favor of BucketUtils
*/
@Deprecated
public Bucket createBucket(Iterable<Bandwidth> limits) {
LocalBucketBuilder builder = Bucket.builder();
for (Bandwidth limit : limits) {
builder.addLimit(limit);
}
return builder.build();
return BucketUtils.createBucket(limits);
}

}
@@ -0,0 +1,91 @@
package com.github.twitch4j.common.util;

import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.local.LocalBucketBuilder;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

public class BucketUtils {

/**
* Creates a bucket with the specified bandwidth.
*
* @param limit the bandwidth
* @return the bucket
*/
@NotNull
public static Bucket createBucket(@NotNull Bandwidth limit) {
return Bucket.builder().addLimit(limit).build();
}

/**
* Creates a bucket with the specified bandwidths.
*
* @param limits the bandwidths
* @return the bucket
*/
@NotNull
public static Bucket createBucket(@NotNull Bandwidth... limits) {
LocalBucketBuilder builder = Bucket.builder();
for (Bandwidth limit : limits) {
builder.addLimit(limit);
}
return builder.build();
}

/**
* Creates a bucket with the specified bandwidths.
*
* @param limits the bandwidths
* @return the bucket
*/
@NotNull
public static Bucket createBucket(@NotNull Iterable<Bandwidth> limits) {
LocalBucketBuilder builder = Bucket.builder();
for (Bandwidth limit : limits) {
builder.addLimit(limit);
}
return builder.build();
}

/**
* Performs the callable after a token has been consumed from the bucket using the executor.
* <p>
* Note: ExecutionException should be inspected if the passed action can throw an exception.
*
* @param bucket rate limit bucket
* @param executor scheduled executor service for async calls
* @param call task that requires a bucket point
* @return the future result of the call
*/
@NotNull
public static <T> CompletableFuture<T> scheduleAgainstBucket(@NotNull Bucket bucket, @NotNull ScheduledExecutorService executor, @NotNull Callable<T> call) {
if (bucket.tryConsume(1L))
return CompletableFuture.supplyAsync(new SneakySupplier<>(call));

return bucket.asScheduler().consume(1L, executor).thenApplyAsync(v -> new SneakySupplier<>(call).get());
}

/**
* Runs the action after a token has been consumed from the bucket using the executor.
* <p>
* Note: while the executor is used to consume the bucket token, the action is performed on the fork-join common pool, by default.
*
* @param bucket rate limit bucket
* @param executor scheduled executor service for async calls
* @param action runnable that requires a bucket point
* @return a future to track completion progress
*/
@NotNull
public static CompletableFuture<Void> scheduleAgainstBucket(@NotNull Bucket bucket, @NotNull ScheduledExecutorService executor, @NotNull Runnable action) {
if (bucket.tryConsume(1L))
return CompletableFuture.runAsync(action);

return bucket.asScheduler().consume(1L, executor).thenRunAsync(action);
}

}
@@ -0,0 +1,32 @@
package com.github.twitch4j.common.util;

import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.Callable;
import java.util.function.Supplier;

/**
* A supplier that can sneakily throw exceptions.
* <p>
* This class should be used sparingly (to avoid hackiness) and carefully (to ensure bubbled exceptions are properly handled).
*
* @param <T> the return type of values provided by the supplier
*/
@RequiredArgsConstructor
public final class SneakySupplier<T> implements Supplier<T> {

/**
* The action to compute the supplied value, possibly throwing an exception.
*/
@NotNull
private final Callable<T> callable;

@Override
@SneakyThrows
public T get() {
return callable.call();
}

}
Expand Up @@ -5,7 +5,6 @@
import io.github.bucket4j.Bucket;
import io.github.bucket4j.BucketConfiguration;
import io.github.bucket4j.TokensInheritanceStrategy;
import io.github.bucket4j.local.LocalBucketBuilder;
import lombok.NonNull;

import java.util.Collections;
Expand Down Expand Up @@ -51,7 +50,7 @@ public void setLimit(@NonNull String userId, @NonNull TwitchLimitType limitType,
return bucket;
}

return constructBucket(limit);
return BucketUtils.createBucket(limit);
});
}

Expand All @@ -76,19 +75,13 @@ public Optional<Bucket> getBucket(@NonNull String userId, @NonNull TwitchLimitTy
* @return the shared rate limit bucket for this user and limit type
*/
public Bucket getOrInitializeBucket(@NonNull String userId, @NonNull TwitchLimitType limitType, @NonNull List<Bandwidth> limit) {
return getBucketsByUser(userId).computeIfAbsent(limitType, l -> constructBucket(limit));
return getBucketsByUser(userId).computeIfAbsent(limitType, l -> BucketUtils.createBucket(limit));
}

private Map<TwitchLimitType, Bucket> getBucketsByUser(String userId) {
return limits.computeIfAbsent(userId, s -> Collections.synchronizedMap(new EnumMap<>(TwitchLimitType.class)));
}

private static Bucket constructBucket(List<Bandwidth> limits) {
LocalBucketBuilder builder = Bucket.builder();
limits.forEach(builder::addLimit);
return builder.build();
}

/**
* @return the single thread-safe instance of the limit registry.
*/
Expand Down
Expand Up @@ -14,6 +14,7 @@
import org.apache.commons.lang3.exception.ContextedRuntimeException;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;

@RequiredArgsConstructor
Expand All @@ -39,8 +40,8 @@ public class TwitchExtensionsErrorDecoder implements ErrorDecoder {
public Exception decode(String methodKey, Response response) {
Exception ex;

try {
String responseBody = response.body() == null ? "" : IOUtils.toString(response.body().asInputStream(), StandardCharsets.UTF_8.name());
try (InputStream is = response.body() == null ? null : response.body().asInputStream()) {
String responseBody = is == null ? "" : IOUtils.toString(is, StandardCharsets.UTF_8);

if (response.status() == 401) {
ex = new UnauthorizedException()
Expand All @@ -53,7 +54,7 @@ public Exception decode(String methodKey, Response response) {
.addContextValue("requestMethod", response.request().httpMethod())
.addContextValue("responseBody", responseBody);
} else if (response.status() == 429) {
ex = new ContextedRuntimeException("To many requests!")
ex = new ContextedRuntimeException("Too many requests!")
.addContextValue("requestUrl", response.request().url())
.addContextValue("requestMethod", response.request().httpMethod())
.addContextValue("responseBody", responseBody);
Expand All @@ -70,7 +71,8 @@ public Exception decode(String methodKey, Response response) {
.addContextValue("responseBody", responseBody)
.addContextValue("errorType", error.getError())
.addContextValue("errorStatus", error.getStatus())
.addContextValue("errorType", error.getMessage());
.addContextValue("errorType", error.getMessage())
.addContextValue("errorMessage", error.getMessage());
}
} catch (IOException fallbackToDefault) {
ex = defaultDecoder.decode(methodKey, response);
Expand Down