Skip to content

Commit

Permalink
feat: comply with undocumented helix rate limits (#561)
Browse files Browse the repository at this point in the history
* feat: allow broad helix bandwidth customization

* feat: stricter limit for helix ban and unban user

* refactor: simplify request channel id extraction

* feat: deplete ban bucket on 429

* feat: implement createClip rate limit

* feat: implement blocked terms rate limit

* refactor: move util methods to common module

* refactor: create TwitchHelixRateLimitTracker

* refactor: create TwitchHelixTokenManager

* fix: ensure error response body stream is closed

* fix: comply with slower-than-documented helix refill rate
  • Loading branch information
iProdigy committed May 4, 2022
1 parent ef1035d commit a1707a9
Show file tree
Hide file tree
Showing 17 changed files with 612 additions and 226 deletions.
30 changes: 15 additions & 15 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
Expand Up @@ -21,6 +21,7 @@
import com.github.twitch4j.chat.events.channel.IRCMessageEvent;
import com.github.twitch4j.chat.events.channel.UserStateEvent;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.BucketUtils;
import com.github.twitch4j.common.util.CryptoUtils;
import com.github.twitch4j.common.util.EscapeUtils;
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
Expand Down Expand Up @@ -674,7 +675,7 @@ protected void sendCommand(String command, String... args) {
* @param command raw irc command
*/
public boolean sendRaw(String command) {
return ircMessageBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(() -> queueCommand(command), taskExecutor) != null;
return BucketUtils.scheduleAgainstBucket(ircMessageBucket, taskExecutor, () -> queueCommand(command)) != null;
}

/**
Expand Down Expand Up @@ -750,14 +751,11 @@ private void issueJoin(String channelName) {
}

protected void issueJoin(String channelName, int attempts) {
ircJoinBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(
() -> {
String name = channelName.toLowerCase();
queueCommand("JOIN #" + name);
joinAttemptsByChannelName.asMap().merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state
},
taskExecutor
);
BucketUtils.scheduleAgainstBucket(ircJoinBucket, taskExecutor, () -> {
String name = channelName.toLowerCase();
queueCommand("JOIN #" + name);
joinAttemptsByChannelName.asMap().merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state
});
}

/**
Expand Down Expand Up @@ -790,9 +788,10 @@ public boolean leaveChannel(String channelName) {
}

private void issuePart(String channelName) {
ircJoinBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(
() -> queueCommand("PART #" + channelName.toLowerCase()),
taskExecutor
BucketUtils.scheduleAgainstBucket(
ircJoinBucket,
taskExecutor,
() -> queueCommand("PART #" + channelName.toLowerCase())
);
}

Expand Down Expand Up @@ -833,9 +832,10 @@ public boolean sendMessage(String channel, String message, Map<String, Object> t
*/
public void sendPrivateMessage(String targetUser, String message) {
log.debug("Adding private message for user [{}] with content [{}] to the queue.", targetUser, message);
ircWhisperBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(
() -> queueCommand(String.format("PRIVMSG #%s :/w %s %s", chatCredential.getUserName().toLowerCase(), targetUser, message)),
taskExecutor
BucketUtils.scheduleAgainstBucket(
ircWhisperBucket,
taskExecutor,
() -> queueCommand(String.format("PRIVMSG #%s :/w %s %s", chatCredential.getUserName().toLowerCase(), targetUser, message))
);
}

Expand Down
Expand Up @@ -12,6 +12,7 @@
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.config.Twitch4JGlobal;
import com.github.twitch4j.common.enums.TwitchLimitType;
import com.github.twitch4j.common.util.BucketUtils;
import com.github.twitch4j.common.util.EventManagerUtils;
import com.github.twitch4j.common.util.ThreadUtils;
import com.github.twitch4j.common.util.TwitchLimitRegistry;
Expand Down Expand Up @@ -269,16 +270,16 @@ public TwitchChat build() {
}

if (ircMessageBucket == null)
ircMessageBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.chatRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_MESSAGE_LIMIT, Collections.singletonList(chatRateLimit));
ircMessageBucket = userId == null ? BucketUtils.createBucket(this.chatRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_MESSAGE_LIMIT, Collections.singletonList(chatRateLimit));

if (ircWhisperBucket == null)
ircWhisperBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.whisperRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_WHISPER_LIMIT, Arrays.asList(whisperRateLimit));
ircWhisperBucket = userId == null ? BucketUtils.createBucket(this.whisperRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_WHISPER_LIMIT, Arrays.asList(whisperRateLimit));

if (ircJoinBucket == null)
ircJoinBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.joinRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_JOIN_LIMIT, Collections.singletonList(joinRateLimit));
ircJoinBucket = userId == null ? BucketUtils.createBucket(this.joinRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_JOIN_LIMIT, Collections.singletonList(joinRateLimit));

if (ircAuthBucket == null)
ircAuthBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.authRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_AUTH_LIMIT, Collections.singletonList(authRateLimit));
ircAuthBucket = userId == null ? BucketUtils.createBucket(this.authRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_AUTH_LIMIT, Collections.singletonList(authRateLimit));

log.debug("TwitchChat: Initializing Module ...");
return new TwitchChat(this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod);
Expand Down
@@ -1,9 +1,9 @@
package com.github.twitch4j.chat.util;

import com.github.twitch4j.common.enums.TwitchLimitType;
import com.github.twitch4j.common.util.BucketUtils;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.local.LocalBucketBuilder;
import lombok.experimental.UtilityClass;

import java.time.Duration;
Expand Down Expand Up @@ -103,24 +103,34 @@ public class TwitchChatLimitHelper {
*/
public final Bandwidth VERIFIED_AUTH_LIMIT = Bandwidth.simple(200, Duration.ofSeconds(10)).withId(AUTH_BANDWIDTH_ID);

/**
* @param limit bandwidth
* @return bucket
* @deprecated in favor of BucketUtils
*/
@Deprecated
public Bucket createBucket(Bandwidth limit) {
return Bucket.builder().addLimit(limit).build();
return BucketUtils.createBucket(limit);
}

/**
* @param limits bandwidths
* @return bucket
* @deprecated in favor of BucketUtils
*/
@Deprecated
public Bucket createBucket(Bandwidth... limits) {
LocalBucketBuilder builder = Bucket.builder();
for (Bandwidth limit : limits) {
builder.addLimit(limit);
}
return builder.build();
return BucketUtils.createBucket(limits);
}

/**
* @param limits bandwidths
* @return bucket
* @deprecated in favor of BucketUtils
*/
@Deprecated
public Bucket createBucket(Iterable<Bandwidth> limits) {
LocalBucketBuilder builder = Bucket.builder();
for (Bandwidth limit : limits) {
builder.addLimit(limit);
}
return builder.build();
return BucketUtils.createBucket(limits);
}

}
@@ -0,0 +1,91 @@
package com.github.twitch4j.common.util;

import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.local.LocalBucketBuilder;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

public class BucketUtils {

/**
* Creates a bucket with the specified bandwidth.
*
* @param limit the bandwidth
* @return the bucket
*/
@NotNull
public static Bucket createBucket(@NotNull Bandwidth limit) {
return Bucket.builder().addLimit(limit).build();
}

/**
* Creates a bucket with the specified bandwidths.
*
* @param limits the bandwidths
* @return the bucket
*/
@NotNull
public static Bucket createBucket(@NotNull Bandwidth... limits) {
LocalBucketBuilder builder = Bucket.builder();
for (Bandwidth limit : limits) {
builder.addLimit(limit);
}
return builder.build();
}

/**
* Creates a bucket with the specified bandwidths.
*
* @param limits the bandwidths
* @return the bucket
*/
@NotNull
public static Bucket createBucket(@NotNull Iterable<Bandwidth> limits) {
LocalBucketBuilder builder = Bucket.builder();
for (Bandwidth limit : limits) {
builder.addLimit(limit);
}
return builder.build();
}

/**
* Performs the callable after a token has been consumed from the bucket using the executor.
* <p>
* Note: ExecutionException should be inspected if the passed action can throw an exception.
*
* @param bucket rate limit bucket
* @param executor scheduled executor service for async calls
* @param call task that requires a bucket point
* @return the future result of the call
*/
@NotNull
public static <T> CompletableFuture<T> scheduleAgainstBucket(@NotNull Bucket bucket, @NotNull ScheduledExecutorService executor, @NotNull Callable<T> call) {
if (bucket.tryConsume(1L))
return CompletableFuture.supplyAsync(new SneakySupplier<>(call));

return bucket.asScheduler().consume(1L, executor).thenApplyAsync(v -> new SneakySupplier<>(call).get());
}

/**
* Runs the action after a token has been consumed from the bucket using the executor.
* <p>
* Note: while the executor is used to consume the bucket token, the action is performed on the fork-join common pool, by default.
*
* @param bucket rate limit bucket
* @param executor scheduled executor service for async calls
* @param action runnable that requires a bucket point
* @return a future to track completion progress
*/
@NotNull
public static CompletableFuture<Void> scheduleAgainstBucket(@NotNull Bucket bucket, @NotNull ScheduledExecutorService executor, @NotNull Runnable action) {
if (bucket.tryConsume(1L))
return CompletableFuture.runAsync(action);

return bucket.asScheduler().consume(1L, executor).thenRunAsync(action);
}

}
@@ -0,0 +1,32 @@
package com.github.twitch4j.common.util;

import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.Callable;
import java.util.function.Supplier;

/**
* A supplier that can sneakily throw exceptions.
* <p>
* This class should be used sparingly (to avoid hackiness) and carefully (to ensure bubbled exceptions are properly handled).
*
* @param <T> the return type of values provided by the supplier
*/
@RequiredArgsConstructor
public final class SneakySupplier<T> implements Supplier<T> {

/**
* The action to compute the supplied value, possibly throwing an exception.
*/
@NotNull
private final Callable<T> callable;

@Override
@SneakyThrows
public T get() {
return callable.call();
}

}
Expand Up @@ -5,7 +5,6 @@
import io.github.bucket4j.Bucket;
import io.github.bucket4j.BucketConfiguration;
import io.github.bucket4j.TokensInheritanceStrategy;
import io.github.bucket4j.local.LocalBucketBuilder;
import lombok.NonNull;

import java.util.Collections;
Expand Down Expand Up @@ -51,7 +50,7 @@ public void setLimit(@NonNull String userId, @NonNull TwitchLimitType limitType,
return bucket;
}

return constructBucket(limit);
return BucketUtils.createBucket(limit);
});
}

Expand All @@ -76,19 +75,13 @@ public Optional<Bucket> getBucket(@NonNull String userId, @NonNull TwitchLimitTy
* @return the shared rate limit bucket for this user and limit type
*/
public Bucket getOrInitializeBucket(@NonNull String userId, @NonNull TwitchLimitType limitType, @NonNull List<Bandwidth> limit) {
return getBucketsByUser(userId).computeIfAbsent(limitType, l -> constructBucket(limit));
return getBucketsByUser(userId).computeIfAbsent(limitType, l -> BucketUtils.createBucket(limit));
}

private Map<TwitchLimitType, Bucket> getBucketsByUser(String userId) {
return limits.computeIfAbsent(userId, s -> Collections.synchronizedMap(new EnumMap<>(TwitchLimitType.class)));
}

private static Bucket constructBucket(List<Bandwidth> limits) {
LocalBucketBuilder builder = Bucket.builder();
limits.forEach(builder::addLimit);
return builder.build();
}

/**
* @return the single thread-safe instance of the limit registry.
*/
Expand Down
Expand Up @@ -14,6 +14,7 @@
import org.apache.commons.lang3.exception.ContextedRuntimeException;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;

@RequiredArgsConstructor
Expand All @@ -39,8 +40,8 @@ public class TwitchExtensionsErrorDecoder implements ErrorDecoder {
public Exception decode(String methodKey, Response response) {
Exception ex;

try {
String responseBody = response.body() == null ? "" : IOUtils.toString(response.body().asInputStream(), StandardCharsets.UTF_8.name());
try (InputStream is = response.body() == null ? null : response.body().asInputStream()) {
String responseBody = is == null ? "" : IOUtils.toString(is, StandardCharsets.UTF_8);

if (response.status() == 401) {
ex = new UnauthorizedException()
Expand All @@ -53,7 +54,7 @@ public Exception decode(String methodKey, Response response) {
.addContextValue("requestMethod", response.request().httpMethod())
.addContextValue("responseBody", responseBody);
} else if (response.status() == 429) {
ex = new ContextedRuntimeException("To many requests!")
ex = new ContextedRuntimeException("Too many requests!")
.addContextValue("requestUrl", response.request().url())
.addContextValue("requestMethod", response.request().httpMethod())
.addContextValue("responseBody", responseBody);
Expand All @@ -70,7 +71,8 @@ public Exception decode(String methodKey, Response response) {
.addContextValue("responseBody", responseBody)
.addContextValue("errorType", error.getError())
.addContextValue("errorStatus", error.getStatus())
.addContextValue("errorType", error.getMessage());
.addContextValue("errorType", error.getMessage())
.addContextValue("errorMessage", error.getMessage());
}
} catch (IOException fallbackToDefault) {
ex = defaultDecoder.decode(methodKey, response);
Expand Down

0 comments on commit a1707a9

Please sign in to comment.