diff --git a/twitch4j/src/main/java/com/github/twitch4j/IClientHelper.java b/twitch4j/src/main/java/com/github/twitch4j/IClientHelper.java new file mode 100644 index 000000000..ab5fc40a3 --- /dev/null +++ b/twitch4j/src/main/java/com/github/twitch4j/IClientHelper.java @@ -0,0 +1,306 @@ +package com.github.twitch4j; + +import com.github.twitch4j.common.util.CollectionUtils; +import com.github.twitch4j.common.util.ExponentialBackoffStrategy; +import com.github.twitch4j.domain.ChannelCache; +import com.github.twitch4j.helix.TwitchHelix; +import com.github.twitch4j.helix.domain.User; +import com.github.twitch4j.helix.domain.UserList; +import org.jetbrains.annotations.Nullable; + +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import static com.github.twitch4j.TwitchClientHelper.MAX_LIMIT; +import static com.github.twitch4j.TwitchClientHelper.log; + +public interface IClientHelper extends AutoCloseable { + + TwitchHelix getTwitchHelix(); + + /** + * Enable StreamEvent Listener, without invoking a Helix API call + * + * @param channelId Channel Id + * @param channelName Channel Name + * @return true if the channel was added, false otherwise + */ + boolean enableStreamEventListener(String channelId, String channelName); + + /** + * Disable StreamEventListener, without invoking a Helix API call + * + * @param channelId Channel Id + * @return true if the channel was removed, false otherwise + */ + boolean disableStreamEventListenerForId(String channelId); + + /** + * Enable Follow Listener, without invoking a Helix API call + * + * @param channelId Channel Id + * @param channelName Channel Name + * @return true if the channel was added, false otherwise + */ + boolean enableFollowEventListener(String channelId, String channelName); + + /** + * Disable Follow Listener, without invoking a Helix API call + * + * @param channelId Channel Id + * @return true when a previously-tracked channel was removed, false otherwise + */ + boolean disableFollowEventListenerForId(String channelId); + + /** + * Enable Clip Creation Listener, without invoking a Helix API call, starting at a custom timestamp + * + * @param channelId Channel Id + * @param channelName Channel Name + * @param startedAt The oldest clip creation timestamp to start the queries at + * @return whether the channel was added + */ + boolean enableClipEventListener(String channelId, String channelName, Instant startedAt); + + /** + * Disable Clip Creation Listener, without invoking a Helix API call + * + * @param channelId Channel Id + * @return whether a previously-tracked channel was removed + */ + boolean disableClipEventListenerForId(String channelId); + + /** + * Get cached information for a channel's stream status and follower count. + *

+ * For this information to be valid, the respective event listeners need to be enabled for the channel. + *

+ * For thread safety, the setters on this object should not be used; only getters. + * + * @param channelId The ID of the channel whose cache is to be retrieved. + * @return ChannelCache in an optional wrapper. + */ + Optional getCachedInformation(String channelId); + + /** + * Updates {@link ExponentialBackoffStrategy#getBaseMillis()} for each of the independent listeners (i.e. stream status and followers) + * + * @param threadDelay the minimum milliseconds delay between each api call + */ + void setThreadDelay(long threadDelay); + + /** + * Enable StreamEvent Listener + * + * @param channelName Channel Name + */ + @Nullable + default User enableStreamEventListener(String channelName) { + UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute(); + + if (users.getUsers().size() == 1) { + User user = users.getUsers().get(0); + if (enableStreamEventListener(user.getId(), user.getLogin())) + return user; + } else { + log.error("Failed to add channel {} to stream event listener!", channelName); + } + + return null; + } + + /** + * Enable StreamEvent Listener for the given channel names + * + * @param channelNames the channel names to be added + */ + default Collection enableStreamEventListener(Iterable channelNames) { + return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream() + .map(channels -> getTwitchHelix().getUsers(null, null, channels).execute()) + .map(UserList::getUsers) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .filter(user -> enableStreamEventListener(user.getId(), user.getLogin())) + .collect(Collectors.toList()); + } + + /** + * Disable StreamEvent Listener + * + * @param channelName Channel Name + */ + default void disableStreamEventListener(String channelName) { + UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute(); + + if (users.getUsers().size() == 1) { + users.getUsers().forEach(user -> disableStreamEventListenerForId(user.getId())); + } else { + log.error("Failed to remove channel " + channelName + " from stream event listener!"); + } + } + + /** + * Disable StreamEvent Listener for the given channel names + * + * @param channelNames the channel names to be removed + */ + default void disableStreamEventListener(Iterable channelNames) { + CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> { + UserList users = getTwitchHelix().getUsers(null, null, channels).execute(); + users.getUsers().forEach(user -> disableStreamEventListenerForId(user.getId())); + }); + } + + /** + * Follow Listener + * + * @param channelName Channel Name + */ + @Nullable + default User enableFollowEventListener(String channelName) { + UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute(); + + if (users.getUsers().size() == 1) { + User user = users.getUsers().get(0); + if (enableFollowEventListener(user.getId(), user.getLogin())) + return user; + } else { + log.error("Failed to add channel " + channelName + " to Follow Listener, maybe it doesn't exist!"); + } + + return null; + } + + /** + * Enable Follow Listener for the given channel names + * + * @param channelNames the channel names to be added + */ + default Collection enableFollowEventListener(Iterable channelNames) { + return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream() + .map(channels -> getTwitchHelix().getUsers(null, null, channels).execute()) + .map(UserList::getUsers) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .filter(user -> enableFollowEventListener(user.getId(), user.getLogin())) + .collect(Collectors.toList()); + } + + /** + * Disable Follow Listener + * + * @param channelName Channel Name + */ + default void disableFollowEventListener(String channelName) { + UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute(); + + if (users.getUsers().size() == 1) { + users.getUsers().forEach(user -> disableFollowEventListenerForId(user.getId())); + } else { + log.error("Failed to remove channel " + channelName + " from follow listener!"); + } + } + + /** + * Disable Follow Listener for the given channel names + * + * @param channelNames the channel names to be removed + */ + default void disableFollowEventListener(Iterable channelNames) { + CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> { + UserList users = getTwitchHelix().getUsers(null, null, channels).execute(); + users.getUsers().forEach(user -> disableFollowEventListenerForId(user.getId())); + }); + } + + /** + * Enable Clip Creation Listener, without invoking a Helix API call + * + * @param channelId Channel Id + * @param channelName Channel Name + * @return whether the channel was added + */ + default boolean enableClipEventListener(String channelId, String channelName) { + return enableClipEventListener(channelId, channelName, Instant.now()); + } + + /** + * Clip Creation Listener + * + * @param channelName Channel Name + * @return the channel whose clip creations are now tracked, or null (if unable to resolve or already tracking) + */ + @Nullable + default User enableClipEventListener(String channelName) { + UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute(); + + if (users.getUsers().size() == 1) { + User user = users.getUsers().get(0); + if (enableClipEventListener(user.getId(), user.getLogin())) + return user; + } else { + log.error("Failed to add channel " + channelName + " to Clip Creation Listener, maybe it doesn't exist!"); + } + + return null; + } + + /** + * Enable Clip Creation Listener for the given channel names + * + * @param channelNames the channel names to be added + * @return the channels that are freshly tracked for clip creations + */ + default Collection enableClipEventListener(Iterable channelNames) { + return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream() + .map(channels -> getTwitchHelix().getUsers(null, null, channels).execute()) + .map(UserList::getUsers) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .filter(user -> enableClipEventListener(user.getId(), user.getLogin())) + .collect(Collectors.toList()); + } + + /** + * Disable Clip Creation Listener + * + * @param channelName Channel Name + * @return whether a previously-tracked channel was removed + */ + default boolean disableClipEventListener(String channelName) { + UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute(); + + if (users.getUsers().size() == 1) { + return disableClipEventListenerForId(users.getUsers().get(0).getId()); + } else { + log.error("Failed to remove channel " + channelName + " from clip creation listener!"); + return false; + } + } + + /** + * Disable Clip Creation Listener for the given channel names + * + * @param channelNames the channel names to be removed + */ + default void disableClipEventListener(Iterable channelNames) { + CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> { + UserList users = getTwitchHelix().getUsers(null, null, channels).execute(); + users.getUsers().forEach(user -> disableClipEventListenerForId(user.getId())); + }); + } + + /** + * Updates {@link ExponentialBackoffStrategy#getBaseMillis()} for each of the independent listeners (i.e. stream status and followers) + * + * @param threadRate the maximum rate of api calls per second + */ + default void setThreadRate(long threadRate) { + this.setThreadDelay(1000 / threadRate); + } + +} diff --git a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientHelper.java b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientHelper.java index adca11577..8bb8d9450 100644 --- a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientHelper.java +++ b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientHelper.java @@ -9,28 +9,33 @@ import com.github.twitch4j.common.events.domain.EventUser; import com.github.twitch4j.common.util.CollectionUtils; import com.github.twitch4j.common.util.ExponentialBackoffStrategy; +import com.github.twitch4j.util.PaginationUtil; import com.github.twitch4j.domain.ChannelCache; import com.github.twitch4j.events.ChannelChangeGameEvent; import com.github.twitch4j.events.ChannelChangeTitleEvent; +import com.github.twitch4j.events.ChannelClipCreatedEvent; import com.github.twitch4j.events.ChannelFollowCountUpdateEvent; import com.github.twitch4j.events.ChannelGoLiveEvent; import com.github.twitch4j.events.ChannelGoOfflineEvent; import com.github.twitch4j.events.ChannelViewerCountUpdateEvent; import com.github.twitch4j.helix.TwitchHelix; -import com.github.twitch4j.helix.domain.*; +import com.github.twitch4j.helix.domain.Clip; +import com.github.twitch4j.helix.domain.ClipList; +import com.github.twitch4j.helix.domain.Follow; +import com.github.twitch4j.helix.domain.FollowList; +import com.github.twitch4j.helix.domain.Stream; +import com.github.twitch4j.helix.domain.StreamList; import com.netflix.hystrix.HystrixCommand; +import lombok.Getter; import lombok.Value; -import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -43,20 +48,20 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; -import java.util.stream.Collectors; /** * A helper class that covers a few basic use cases of most library users */ -@Slf4j -public class TwitchClientHelper implements AutoCloseable { +public class TwitchClientHelper implements IClientHelper { + + static final Logger log = LoggerFactory.getLogger(TwitchClientHelper.class); public static final int REQUIRED_THREAD_COUNT = 2; /** * The greatest number of streams or followers that can be requested in a single API call */ - private static final int MAX_LIMIT = 100; + static final int MAX_LIMIT = 100; /** * Holds the channels that are checked for live/offline state changes @@ -69,14 +74,15 @@ public class TwitchClientHelper implements AutoCloseable { private final Set listenForFollow = ConcurrentHashMap.newKeySet(); /** - * Twitch Helix + * Holds the channels that are checked for new clip creations */ - private final TwitchHelix twitchHelix; + private final Set listenForClips = ConcurrentHashMap.newKeySet(); /** - * Event Manager + * Twitch Helix */ - private final EventManager eventManager; + @Getter + private final TwitchHelix twitchHelix; /** * Event Task - Stream Status @@ -102,6 +108,18 @@ public class TwitchClientHelper implements AutoCloseable { */ private final AtomicReference> followerEventFuture = new AtomicReference<>(); + /** + * Event Task - Clip Creations + *

+ * Accepts a channel id as the input; Yields true if the next call should not be delayed + */ + private final Function clipEventTask; + + /** + * The {@link Future} associated with clipEventTask, in an atomic wrapper + */ + private final AtomicReference> clipEventFuture = new AtomicReference<>(); + /** * Channel Information Cache */ @@ -124,6 +142,11 @@ public class TwitchClientHelper implements AutoCloseable { */ private final AtomicReference followBackoff; + /** + * Holds the {@link ExponentialBackoffStrategy} used for the clip creation listener + */ + private final AtomicReference clipBackoff; + /** * Constructor * @@ -133,14 +156,14 @@ public class TwitchClientHelper implements AutoCloseable { */ public TwitchClientHelper(TwitchHelix twitchHelix, EventManager eventManager, ScheduledThreadPoolExecutor executor) { this.twitchHelix = twitchHelix; - this.eventManager = eventManager; this.executor = executor; final ExponentialBackoffStrategy defaultBackoff = ExponentialBackoffStrategy.builder().immediateFirst(false).baseMillis(1000L).jitter(false).build(); - liveBackoff = new AtomicReference<>(defaultBackoff); - followBackoff = new AtomicReference<>(defaultBackoff.copy()); + this.liveBackoff = new AtomicReference<>(defaultBackoff); + this.followBackoff = new AtomicReference<>(defaultBackoff.copy()); + this.clipBackoff = new AtomicReference<>(defaultBackoff.copy()); - // Threads + // Tasks this.streamStatusEventTask = channels -> { // check go live / stream events HystrixCommand hystrixGetAllStreams = twitchHelix.getStreams(null, null, null, channels.size(), null, null, channels, null); @@ -170,11 +193,11 @@ public TwitchClientHelper(TwitchHelix twitchHelix, EventManager eventManager, Sc if (stream != null && stream.getType().equalsIgnoreCase("live")) { // is live // - live status - if (currentChannelCache.getIsLive() != null && currentChannelCache.getIsLive() == false) { + if (currentChannelCache.getIsLive() != null && !currentChannelCache.getIsLive()) { dispatchGoLiveEvent = true; } currentChannelCache.setIsLive(true); - boolean wasAlreadyLive = dispatchGoLiveEvent != true && currentChannelCache.getIsLive() == true; + boolean wasAlreadyLive = !dispatchGoLiveEvent && currentChannelCache.getIsLive(); // - change stream title event if (wasAlreadyLive && currentChannelCache.getTitle() != null && !currentChannelCache.getTitle().equalsIgnoreCase(stream.getTitle())) { @@ -194,7 +217,7 @@ public TwitchClientHelper(TwitchHelix twitchHelix, EventManager eventManager, Sc } } else { // was online previously? - if (currentChannelCache.getIsLive() != null && currentChannelCache.getIsLive() == true) { + if (currentChannelCache.getIsLive() != null && currentChannelCache.getIsLive()) { dispatchGoOfflineEvent = true; } @@ -308,50 +331,50 @@ public TwitchClientHelper(TwitchHelix twitchHelix, EventManager eventManager, Sc return false; } }; - } + this.clipEventTask = channelId -> { + // check clip creations + boolean nextRequestCanBeImmediate = false; + + final ChannelCache currentChannelCache = channelInformation.get(channelId, c -> new ChannelCache()); + final AtomicReference windowStart = currentChannelCache.getClipWindowStart(); + final Instant startedAt = windowStart.get(); + final Instant now = Instant.now(); + + if (startedAt == null) { + // initialize clip query window started_at + nextRequestCanBeImmediate = windowStart.compareAndSet(null, now); + } else { + // get all clips in range [startedAt, now] + final List clips = getClips(channelId, startedAt, now); - /** - * Enable StreamEvent Listener - * - * @param channelName Channel Name - */ - @Nullable - public User enableStreamEventListener(String channelName) { - UserList users = twitchHelix.getUsers(null, null, Collections.singletonList(channelName)).execute(); - - if (users.getUsers().size() == 1) { - User user = users.getUsers().get(0); - if (enableStreamEventListener(user.getId(), user.getLogin())) - return user; - } else { - log.error("Failed to add channel {} to stream event listener!", channelName); - } + // cache channel name if unknown and construct event channel to be passed to events + if (!clips.isEmpty() && clips.get(0) != null && currentChannelCache.getUserName() == null) { + currentChannelCache.setUserName(clips.get(0).getBroadcasterName()); + } + final EventChannel channel = new EventChannel(channelId, currentChannelCache.getUserName()); - return null; - } + // loop through queried clips + Instant maxCreatedAt = startedAt; + for (Clip clip : clips) { + if (clip != null && clip.getCreatedAtInstant() != null && clip.getCreatedAtInstant().compareTo(startedAt) > 0) { + eventManager.publish(new ChannelClipCreatedEvent(channel, clip)); // found a new clip - /** - * Enable StreamEvent Listener for the given channel names - * - * @param channelNames the channel names to be added - */ - public Collection enableStreamEventListener(Iterable channelNames) { - return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream() - .map(channels -> twitchHelix.getUsers(null, null, channels).execute()) - .map(UserList::getUsers) - .filter(Objects::nonNull) - .flatMap(Collection::stream) - .filter(user -> enableStreamEventListener(user.getId(), user.getLogin())) - .collect(Collectors.toList()); + if (clip.getCreatedAtInstant().compareTo(maxCreatedAt) > 0) + maxCreatedAt = clip.getCreatedAtInstant(); // keep track of most recently created clip + } + } + + // next clip window should start just after the most recent clip we've seen for this channel + final Instant nextStartedAt = maxCreatedAt; + if (nextStartedAt != startedAt) + windowStart.updateAndGet(old -> old == null || old.compareTo(nextStartedAt) < 0 ? nextStartedAt : old); + } + + return nextRequestCanBeImmediate; + }; } - /** - * Enable StreamEvent Listener, without invoking a Helix API call - * - * @param channelId Channel Id - * @param channelName Channel Name - * @return true if the channel was added, false otherwise - */ + @Override public boolean enableStreamEventListener(String channelId, String channelName) { // add to set final boolean add = listenForGoLive.add(channelId); @@ -365,45 +388,13 @@ public boolean enableStreamEventListener(String channelId, String channelName) { return add; } - /** - * Disable StreamEvent Listener - * - * @param channelName Channel Name - */ - public void disableStreamEventListener(String channelName) { - UserList users = twitchHelix.getUsers(null, null, Collections.singletonList(channelName)).execute(); - - if (users.getUsers().size() == 1) { - users.getUsers().forEach(user -> disableStreamEventListenerForId(user.getId())); - } else { - log.error("Failed to remove channel " + channelName + " from stream event listener!"); - } - } - - /** - * Disable StreamEvent Listener for the given channel names - * - * @param channelNames the channel names to be removed - */ - public void disableStreamEventListener(Iterable channelNames) { - CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> { - UserList users = twitchHelix.getUsers(null, null, channels).execute(); - users.getUsers().forEach(user -> disableStreamEventListenerForId(user.getId())); - }); - } - - /** - * Disable StreamEventListener, without invoking a Helix API call - * - * @param channelId Channel Id - * @return true if the channel was removed, false otherwise - */ + @Override public boolean disableStreamEventListenerForId(String channelId) { // remove from set boolean remove = listenForGoLive.remove(channelId); // invalidate cache - if (!listenForFollow.contains(channelId)) { + if (!listenForFollow.contains(channelId) && !listenForClips.contains(channelId)) { channelInformation.invalidate(channelId); } else if (remove) { ChannelCache info = channelInformation.getIfPresent(channelId); @@ -418,48 +409,7 @@ public boolean disableStreamEventListenerForId(String channelId) { return remove; } - /** - * Follow Listener - * - * @param channelName Channel Name - */ - @Nullable - public User enableFollowEventListener(String channelName) { - UserList users = twitchHelix.getUsers(null, null, Collections.singletonList(channelName)).execute(); - - if (users.getUsers().size() == 1) { - User user = users.getUsers().get(0); - if (enableFollowEventListener(user.getId(), user.getLogin())) - return user; - } else { - log.error("Failed to add channel " + channelName + " to Follow Listener, maybe it doesn't exist!"); - } - - return null; - } - - /** - * Enable Follow Listener for the given channel names - * - * @param channelNames the channel names to be added - */ - public Collection enableFollowEventListener(Iterable channelNames) { - return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream() - .map(channels -> twitchHelix.getUsers(null, null, channels).execute()) - .map(UserList::getUsers) - .filter(Objects::nonNull) - .flatMap(Collection::stream) - .filter(user -> enableFollowEventListener(user.getId(), user.getLogin())) - .collect(Collectors.toList()); - } - - /** - * Enable Follow Listener, without invoking a Helix API call - * - * @param channelId Channel Id - * @param channelName Channel Name - * @return true if the channel was added, false otherwise - */ + @Override public boolean enableFollowEventListener(String channelId, String channelName) { // add to list final boolean add = listenForFollow.add(channelId); @@ -473,51 +423,53 @@ public boolean enableFollowEventListener(String channelId, String channelName) { return add; } - /** - * Disable Follow Listener - * - * @param channelName Channel Name - */ - public void disableFollowEventListener(String channelName) { - UserList users = twitchHelix.getUsers(null, null, Collections.singletonList(channelName)).execute(); + @Override + public boolean disableFollowEventListenerForId(String channelId) { + // remove from set + boolean remove = listenForFollow.remove(channelId); - if (users.getUsers().size() == 1) { - users.getUsers().forEach(user -> disableFollowEventListenerForId(user.getId())); - } else { - log.error("Failed to remove channel " + channelName + " from follow listener!"); + // invalidate cache + if (!listenForGoLive.contains(channelId) && !listenForClips.contains(channelId)) { + channelInformation.invalidate(channelId); + } else if (remove) { + ChannelCache info = channelInformation.getIfPresent(channelId); + if (info != null) { + info.setLastFollowCheck(null); + info.getFollowers().lazySet(null); + } } + + startOrStopEventGenerationThread(); + return remove; } - /** - * Disable Follow Listener for the given channel names - * - * @param channelNames the channel names to be removed - */ - public void disableFollowEventListener(Iterable channelNames) { - CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> { - UserList users = twitchHelix.getUsers(null, null, channels).execute(); - users.getUsers().forEach(user -> disableFollowEventListenerForId(user.getId())); - }); + @Override + public boolean enableClipEventListener(String channelId, String channelName, Instant startedAt) { + // add to set + final boolean add = listenForClips.add(channelId); + if (!add) { + log.info("Channel {} already added for Clip Creation Events", channelName); + } else { + // initialize cache + ChannelCache channelCache = channelInformation.get(channelId, s -> new ChannelCache(channelName)); + channelCache.getClipWindowStart().compareAndSet(null, startedAt); + } + startOrStopEventGenerationThread(); + return add; } - /** - * Disable Follow Listener, without invoking a Helix API call - * - * @param channelId Channel Id - * @return true when a previously-tracked channel was removed, false otherwise - */ - public boolean disableFollowEventListenerForId(String channelId) { + @Override + public boolean disableClipEventListenerForId(String channelId) { // remove from set - boolean remove = listenForFollow.remove(channelId); + boolean remove = listenForClips.remove(channelId); // invalidate cache - if (!listenForGoLive.contains(channelId)) { + if (!listenForGoLive.contains(channelId) && !listenForFollow.contains(channelId)) { channelInformation.invalidate(channelId); } else if (remove) { ChannelCache info = channelInformation.getIfPresent(channelId); if (info != null) { - info.setLastFollowCheck(null); - info.getFollowers().set(null); + info.getClipWindowStart().lazySet(null); } } @@ -525,6 +477,44 @@ public boolean disableFollowEventListenerForId(String channelId) { return remove; } + @Override + public void setThreadDelay(long threadDelay) { + final UnaryOperator updateBackoff = old -> { + ExponentialBackoffStrategy next = old.toBuilder().baseMillis(threadDelay).build(); + next.setFailures(old.getFailures()); + return next; + }; + + this.liveBackoff.getAndUpdate(updateBackoff); + this.followBackoff.getAndUpdate(updateBackoff); + this.clipBackoff.getAndUpdate(updateBackoff); + } + + @Override + public Optional getCachedInformation(String channelId) { + return Optional.ofNullable(channelInformation.getIfPresent(channelId)); + } + + @Override + public void close() { + final Future streamStatusFuture = this.streamStatusEventFuture.getAndSet(null); + if (streamStatusFuture != null) + streamStatusFuture.cancel(false); + + final Future followerFuture = this.followerEventFuture.getAndSet(null); + if (followerFuture != null) + followerFuture.cancel(false); + + final Future clipFuture = this.clipEventFuture.getAndSet(null); + if (clipFuture != null) + clipFuture.cancel(false); + + listenForGoLive.clear(); + listenForFollow.clear(); + listenForClips.clear(); + channelInformation.invalidateAll(); + } + /** * Start or quit the thread, depending on usage */ @@ -534,12 +524,15 @@ private void startOrStopEventGenerationThread() { // follower event thread updateListener(listenForFollow::isEmpty, followerEventFuture, this::runRecursiveFollowerCheck, followBackoff); + + // clip creation event thread + updateListener(listenForClips::isEmpty, clipEventFuture, this::runRecursiveClipCheck, clipBackoff); } /** * Performs the "heavy lifting" of starting or stopping a listener * - * @param stopCondition yields whether or not the listener should be running + * @param stopCondition yields whether the listener should be running * @param futureReference the current listener in an atomic wrapper * @param startCommand the command to start the listener * @param backoff the {@link ExponentialBackoffStrategy} for the listener @@ -578,103 +571,47 @@ private void updateListener(BooleanSupplier stopCondition, AtomicReference( - executor, - CollectionUtils.chunked(listenForGoLive, MAX_LIMIT), - streamStatusEventFuture, - liveBackoff, - this::runRecursiveStreamStatusCheck, - chunk -> { - streamStatusEventTask.accept(chunk); - return false; // treat as always consuming from the api rate-limit - } - ) - ) - ); - } + runRecursiveCheck(streamStatusEventFuture, executor, CollectionUtils.chunked(listenForGoLive, MAX_LIMIT), liveBackoff, this::runRecursiveStreamStatusCheck, chunk -> { + streamStatusEventTask.accept(chunk); + return false; // treat as always consuming from the api rate-limit + }); } /** * Initiates the follower listener execution */ private void runRecursiveFollowerCheck() { - if (followerEventFuture.get() != null) - synchronized (followerEventFuture) { - if (cancel(followerEventFuture)) - followerEventFuture.set( - executor.submit( - new ListenerRunnable<>( - executor, - new ArrayList<>(listenForFollow), - followerEventFuture, - followBackoff, - this::runRecursiveFollowerCheck, - followerEventTask - ) - ) - ); - } + runRecursiveCheck(followerEventFuture, executor, new ArrayList<>(listenForFollow), followBackoff, this::runRecursiveFollowerCheck, followerEventTask); } /** - * Updates {@link ExponentialBackoffStrategy#getBaseMillis()} for each of the independent listeners (i.e. stream status and followers) - * - * @param threadRate the maximum rate of api calls per second + * Initiates the clip creation listener execution */ - public void setThreadRate(long threadRate) { - this.setThreadDelay(1000 / threadRate); + private void runRecursiveClipCheck() { + runRecursiveCheck(clipEventFuture, executor, new ArrayList<>(listenForClips), clipBackoff, this::runRecursiveClipCheck, clipEventTask); } - /** - * Updates {@link ExponentialBackoffStrategy#getBaseMillis()} for each of the independent listeners (i.e. stream status and followers) - * - * @param threadDelay the minimum milliseconds delay between each api call - */ - public void setThreadDelay(long threadDelay) { - final UnaryOperator updateBackoff = old -> { - ExponentialBackoffStrategy next = old.toBuilder().baseMillis(threadDelay).build(); - next.setFailures(old.getFailures()); - return next; - }; - - this.liveBackoff.getAndUpdate(updateBackoff); - this.followBackoff.getAndUpdate(updateBackoff); - } - - /** - * Get cached information for a channel's stream status and follower count. - *

- * For this information to be valid, the respective event listeners need to be enabled for the channel. - *

- * For thread safety, the setters on this object should not be used; only getters. - * - * @param channelId The ID of the channel whose cache is to be retrieved. - * @return ChannelCache in an optional wrapper. - */ - public Optional getCachedInformation(String channelId) { - return Optional.ofNullable(channelInformation.getIfPresent(channelId)); - } - - /** - * Close - */ - public void close() { - final Future streamStatusFuture = this.streamStatusEventFuture.getAndSet(null); - if (streamStatusFuture != null) - streamStatusFuture.cancel(false); - - final Future followerFuture = this.followerEventFuture.getAndSet(null); - if (followerFuture != null) - followerFuture.cancel(false); - - listenForGoLive.clear(); - listenForFollow.clear(); - channelInformation.invalidateAll(); + private List getClips(String channelId, Instant startedAt, Instant endedAt) { + return PaginationUtil.getPaginated( + cursor -> { + final HystrixCommand commandGetClips = twitchHelix.getClips(null, channelId, null, null, cursor, null, MAX_LIMIT, startedAt, endedAt); + try { + ClipList result = commandGetClips.execute(); + clipBackoff.get().reset(); // successful api call + return result; + } catch (Exception ex) { + if (commandGetClips != null && commandGetClips.isFailedExecution()) { + log.trace(ex.getMessage(), ex); + } + log.error("Failed to check for Clip Events: " + ex.getMessage()); + clipBackoff.get().get(); // increment failures + return null; + } + }, + ClipList::getData, + call -> call.getPagination() != null ? call.getPagination().getCursor() : null, + 20 + ); } @Value @@ -727,4 +664,24 @@ private static boolean cancel(AtomicReference> futureRef) { return future != null && future.cancel(false); } + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + private static void runRecursiveCheck(AtomicReference> future, ScheduledExecutorService executor, List units, AtomicReference backoff, Runnable startCommand, Function task) { + if (future.get() != null) + synchronized (future) { + if (cancel(future)) + future.set( + executor.submit( + new ListenerRunnable<>( + executor, + units, + future, + backoff, + startCommand, + task + ) + ) + ); + } + } + } diff --git a/twitch4j/src/main/java/com/github/twitch4j/domain/ChannelCache.java b/twitch4j/src/main/java/com/github/twitch4j/domain/ChannelCache.java index 75a70b075..fef5f1a58 100644 --- a/twitch4j/src/main/java/com/github/twitch4j/domain/ChannelCache.java +++ b/twitch4j/src/main/java/com/github/twitch4j/domain/ChannelCache.java @@ -50,6 +50,11 @@ public class ChannelCache { */ private final AtomicReference followers = new AtomicReference<>(); + /** + * Clip Query Started At + */ + private final AtomicReference clipWindowStart = new AtomicReference<>(); + /** * Construct Channel Cache * diff --git a/twitch4j/src/main/java/com/github/twitch4j/events/ChannelClipCreatedEvent.java b/twitch4j/src/main/java/com/github/twitch4j/events/ChannelClipCreatedEvent.java new file mode 100644 index 000000000..48336eef6 --- /dev/null +++ b/twitch4j/src/main/java/com/github/twitch4j/events/ChannelClipCreatedEvent.java @@ -0,0 +1,42 @@ +package com.github.twitch4j.events; + +import com.github.twitch4j.common.events.TwitchEvent; +import com.github.twitch4j.common.events.domain.EventChannel; +import com.github.twitch4j.common.events.domain.EventUser; +import com.github.twitch4j.helix.domain.Clip; +import lombok.EqualsAndHashCode; +import lombok.Value; + +import java.util.Optional; + +/** + * Called when a new clip is created in a channel. + *

+ * Fired by {@link com.github.twitch4j.TwitchClientHelper}; so this event must explicitly be enabled for specific channels. + *

+ * Due to Twitch heavily caching the get clips endpoint, these creation events can have multi-minute delays. + * + * @see com.github.twitch4j.IClientHelper#enableClipEventListener(String, String) + */ +@Value +@EqualsAndHashCode(callSuper = false) +public class ChannelClipCreatedEvent extends TwitchEvent { + + /** + * The channel where the clip was created. + */ + EventChannel channel; + + /** + * The clip that was created. + */ + Clip clip; + + /** + * @return the user that created the clip + */ + public Optional getCreatingUser() { + return Optional.ofNullable(clip.getCreatorId()).map(id -> new EventUser(id, clip.getCreatorName())); + } + +} diff --git a/util/src/main/java/com/github/twitch4j/util/PaginationUtil.java b/util/src/main/java/com/github/twitch4j/util/PaginationUtil.java new file mode 100644 index 000000000..5b72f8ad9 --- /dev/null +++ b/util/src/main/java/com/github/twitch4j/util/PaginationUtil.java @@ -0,0 +1,165 @@ +package com.github.twitch4j.util; + +import lombok.experimental.UtilityClass; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; + +@UtilityClass +public class PaginationUtil { + + /** + * Obtains a full (user-specified) collection of results from paginated calls with arbitrary cursors, up to a max number of pages and elements, while optionally avoiding duplicate cursors. + *

+ * Implementation notes: + *

+ * + * @param callByCursor Performs a query based on the cursor + * @param getResult Yields the results contained in the executed call + * @param getNext Yields the next cursor to paginate on (or null to cease pagination) + * @param maxPages The maximum number of pages to paginate over + * @param maxUnits The (approximate) maximum number of elements that can be queried + * @param collector The supplier of the collection to store the paginated results + * @param strict Whether pagination should stop upon encountering an already-seen cursor + * @param first The initial cursor to use in the first call + * @param valid Whether a cursor is valid to be used in a subsequent call + * @param Result unit class + * @param Container class of results and pagination information + * @param The type of collection that should be used to store the result units + * @param

The type of the cursor object; should implement equals and hashcode + * @return the paginated results + */ + public static , P> C getPaginated(Function callByCursor, Function> getResult, Function getNext, int maxPages, int maxUnits, Supplier collector, boolean strict, P first, Predicate

valid) { + final C collection = collector.get(); + + Set

cursors = strict ? new HashSet<>(Math.max(16, Math.min(maxPages, 1024))) : null; + P cursor = first; + int page = 0; + do { + // Perform the call + final K call = callByCursor.apply(cursor); + page++; + if (call == null) break; + + // Save the results + final Collection results = getResult.apply(call); + if (results == null || results.isEmpty()) break; + collection.addAll(results); + + // Obtain the cursor for the next call + final P next = getNext.apply(call); + if (Objects.equals(next, cursor)) break; // we got the same cursor back; avoid infinite loop + if (strict && next != null && !cursors.add(next)) break; // we've already seen this cursor in the past; avoid infinite loop + cursor = next; + } while (cursor != null && valid.test(cursor) && (maxPages < 0 || page < maxPages) && (maxUnits < 0 || collection.size() < maxUnits)); + + return collection; + } + + /** + * Obtains a full (user-specified) collection of results from paginated calls, up to a max number of pages and elements, while optionally avoiding duplicate cursors. + *

+ * Implementation notes: + *

    + *
  • null or empty cursors yielded from a call end further pagination
  • + *
  • the first call executed is done with a null cursor
  • + *
+ * + * @param callByCursor Performs a query based on the string cursor (which is initially null) + * @param extractResult Yields the results contained in the executed call + * @param extractCursor Yields the next cursor to paginate on (or null to cease pagination) + * @param maxPages The maximum number of pages to paginate over + * @param maxElements The (approximate) maximum number of elements that can be queried + * @param createCollection The supplier of the collection to store the paginated results + * @param strict Whether pagination should stop upon encountering an already-seen cursor + * @param Result unit class + * @param Container class of results and pagination information + * @param The type of collection that should be used to store the result units + * @return the paginated results + * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean, Object, Predicate) + */ + public static > C getPaginated(Function callByCursor, Function> extractResult, Function extractCursor, int maxPages, int maxElements, Supplier createCollection, boolean strict) { + return getPaginated(callByCursor, extractResult, extractCursor, maxPages, maxElements, createCollection, strict, null, s -> !s.isEmpty()); + } + + /** + * Obtains a full (user-specified) collection of results from paginated calls, up to a max number of pages and elements. + * + * @param callByCursor Performs a query based on the string cursor (which is initially null) + * @param resultsFromCall Yields the results contained in the executed call + * @param nextCursorFromCall Yields the next cursor to paginate on (or null to cease pagination) + * @param maxPages The maximum number of pages to paginate over + * @param maxElements The (approximate) maximum number of elements that can be queried + * @param createCollection The supplier of the collection to store the paginated results + * @param Result unit class + * @param Container class of results and pagination information + * @param The type of collection that should be used to store the result units + * @return the paginated results + * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean) + */ + public static > C getPaginated(Function callByCursor, Function> resultsFromCall, Function nextCursorFromCall, int maxPages, int maxElements, Supplier createCollection) { + return getPaginated(callByCursor, resultsFromCall, nextCursorFromCall, maxPages, maxElements, createCollection, false); + } + + /** + * Obtains a full list of results from paginated calls, up to a max number of pages and elements. + * + * @param callByCursor Performs a query based on the string cursor (which is initially null) + * @param resultsFromCall Yields the results contained in the executed call + * @param nextCursorFromCall Yields the next cursor to paginate on (or null to cease pagination) + * @param maxPages The maximum number of pages to paginate over + * @param maxElements The (approximate) maximum number of elements that can be queried + * @param Result unit class + * @param Container class of results and pagination information + * @return the paginated results + * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean) + */ + public static List getPaginated(Function callByCursor, Function> resultsFromCall, Function nextCursorFromCall, int maxPages, int maxElements) { + return getPaginated(callByCursor, resultsFromCall, nextCursorFromCall, maxPages, maxElements, ArrayList::new); + } + + /** + * Obtains a full list of results from paginated calls, up to a max number of pages. + * + * @param callByCursor Performs a query based on the string cursor (which is initially null) + * @param resultsFromCall Yields the results contained in the executed call + * @param nextCursorFromCall Yields the next cursor to paginate on (or null to cease pagination) + * @param maxPages The maximum number of pages to paginate over + * @param Result unit class + * @param Container class of results and pagination information + * @return the paginated results + * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean) + */ + public static List getPaginated(Function callByCursor, Function> resultsFromCall, Function nextCursorFromCall, int maxPages) { + return getPaginated(callByCursor, resultsFromCall, nextCursorFromCall, maxPages, Integer.MAX_VALUE); + } + + /** + * Obtains a full list of results from paginated calls. + * + * @param callByCursor Performs a query based on the string cursor (which is initially null) + * @param resultsFromCall Yields the results contained in the executed call + * @param nextCursorFromCall Yields the next cursor to paginate on (or null to cease pagination) + * @param Result unit class + * @param Container class of results and pagination information + * @return the paginated results + * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean) + */ + public static List getPaginated(Function callByCursor, Function> resultsFromCall, Function nextCursorFromCall) { + return getPaginated(callByCursor, resultsFromCall, nextCursorFromCall, Integer.MAX_VALUE); + } + +}