diff --git a/twitch4j/src/main/java/com/github/twitch4j/IClientHelper.java b/twitch4j/src/main/java/com/github/twitch4j/IClientHelper.java
new file mode 100644
index 000000000..ab5fc40a3
--- /dev/null
+++ b/twitch4j/src/main/java/com/github/twitch4j/IClientHelper.java
@@ -0,0 +1,306 @@
+package com.github.twitch4j;
+
+import com.github.twitch4j.common.util.CollectionUtils;
+import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
+import com.github.twitch4j.domain.ChannelCache;
+import com.github.twitch4j.helix.TwitchHelix;
+import com.github.twitch4j.helix.domain.User;
+import com.github.twitch4j.helix.domain.UserList;
+import org.jetbrains.annotations.Nullable;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.github.twitch4j.TwitchClientHelper.MAX_LIMIT;
+import static com.github.twitch4j.TwitchClientHelper.log;
+
+public interface IClientHelper extends AutoCloseable {
+
+ TwitchHelix getTwitchHelix();
+
+ /**
+ * Enable StreamEvent Listener, without invoking a Helix API call
+ *
+ * @param channelId Channel Id
+ * @param channelName Channel Name
+ * @return true if the channel was added, false otherwise
+ */
+ boolean enableStreamEventListener(String channelId, String channelName);
+
+ /**
+ * Disable StreamEventListener, without invoking a Helix API call
+ *
+ * @param channelId Channel Id
+ * @return true if the channel was removed, false otherwise
+ */
+ boolean disableStreamEventListenerForId(String channelId);
+
+ /**
+ * Enable Follow Listener, without invoking a Helix API call
+ *
+ * @param channelId Channel Id
+ * @param channelName Channel Name
+ * @return true if the channel was added, false otherwise
+ */
+ boolean enableFollowEventListener(String channelId, String channelName);
+
+ /**
+ * Disable Follow Listener, without invoking a Helix API call
+ *
+ * @param channelId Channel Id
+ * @return true when a previously-tracked channel was removed, false otherwise
+ */
+ boolean disableFollowEventListenerForId(String channelId);
+
+ /**
+ * Enable Clip Creation Listener, without invoking a Helix API call, starting at a custom timestamp
+ *
+ * @param channelId Channel Id
+ * @param channelName Channel Name
+ * @param startedAt The oldest clip creation timestamp to start the queries at
+ * @return whether the channel was added
+ */
+ boolean enableClipEventListener(String channelId, String channelName, Instant startedAt);
+
+ /**
+ * Disable Clip Creation Listener, without invoking a Helix API call
+ *
+ * @param channelId Channel Id
+ * @return whether a previously-tracked channel was removed
+ */
+ boolean disableClipEventListenerForId(String channelId);
+
+ /**
+ * Get cached information for a channel's stream status and follower count.
+ *
+ * For this information to be valid, the respective event listeners need to be enabled for the channel.
+ *
+ * For thread safety, the setters on this object should not be used; only getters.
+ *
+ * @param channelId The ID of the channel whose cache is to be retrieved.
+ * @return ChannelCache in an optional wrapper.
+ */
+ Optional getCachedInformation(String channelId);
+
+ /**
+ * Updates {@link ExponentialBackoffStrategy#getBaseMillis()} for each of the independent listeners (i.e. stream status and followers)
+ *
+ * @param threadDelay the minimum milliseconds delay between each api call
+ */
+ void setThreadDelay(long threadDelay);
+
+ /**
+ * Enable StreamEvent Listener
+ *
+ * @param channelName Channel Name
+ */
+ @Nullable
+ default User enableStreamEventListener(String channelName) {
+ UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute();
+
+ if (users.getUsers().size() == 1) {
+ User user = users.getUsers().get(0);
+ if (enableStreamEventListener(user.getId(), user.getLogin()))
+ return user;
+ } else {
+ log.error("Failed to add channel {} to stream event listener!", channelName);
+ }
+
+ return null;
+ }
+
+ /**
+ * Enable StreamEvent Listener for the given channel names
+ *
+ * @param channelNames the channel names to be added
+ */
+ default Collection enableStreamEventListener(Iterable channelNames) {
+ return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream()
+ .map(channels -> getTwitchHelix().getUsers(null, null, channels).execute())
+ .map(UserList::getUsers)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .filter(user -> enableStreamEventListener(user.getId(), user.getLogin()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Disable StreamEvent Listener
+ *
+ * @param channelName Channel Name
+ */
+ default void disableStreamEventListener(String channelName) {
+ UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute();
+
+ if (users.getUsers().size() == 1) {
+ users.getUsers().forEach(user -> disableStreamEventListenerForId(user.getId()));
+ } else {
+ log.error("Failed to remove channel " + channelName + " from stream event listener!");
+ }
+ }
+
+ /**
+ * Disable StreamEvent Listener for the given channel names
+ *
+ * @param channelNames the channel names to be removed
+ */
+ default void disableStreamEventListener(Iterable channelNames) {
+ CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> {
+ UserList users = getTwitchHelix().getUsers(null, null, channels).execute();
+ users.getUsers().forEach(user -> disableStreamEventListenerForId(user.getId()));
+ });
+ }
+
+ /**
+ * Follow Listener
+ *
+ * @param channelName Channel Name
+ */
+ @Nullable
+ default User enableFollowEventListener(String channelName) {
+ UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute();
+
+ if (users.getUsers().size() == 1) {
+ User user = users.getUsers().get(0);
+ if (enableFollowEventListener(user.getId(), user.getLogin()))
+ return user;
+ } else {
+ log.error("Failed to add channel " + channelName + " to Follow Listener, maybe it doesn't exist!");
+ }
+
+ return null;
+ }
+
+ /**
+ * Enable Follow Listener for the given channel names
+ *
+ * @param channelNames the channel names to be added
+ */
+ default Collection enableFollowEventListener(Iterable channelNames) {
+ return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream()
+ .map(channels -> getTwitchHelix().getUsers(null, null, channels).execute())
+ .map(UserList::getUsers)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .filter(user -> enableFollowEventListener(user.getId(), user.getLogin()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Disable Follow Listener
+ *
+ * @param channelName Channel Name
+ */
+ default void disableFollowEventListener(String channelName) {
+ UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute();
+
+ if (users.getUsers().size() == 1) {
+ users.getUsers().forEach(user -> disableFollowEventListenerForId(user.getId()));
+ } else {
+ log.error("Failed to remove channel " + channelName + " from follow listener!");
+ }
+ }
+
+ /**
+ * Disable Follow Listener for the given channel names
+ *
+ * @param channelNames the channel names to be removed
+ */
+ default void disableFollowEventListener(Iterable channelNames) {
+ CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> {
+ UserList users = getTwitchHelix().getUsers(null, null, channels).execute();
+ users.getUsers().forEach(user -> disableFollowEventListenerForId(user.getId()));
+ });
+ }
+
+ /**
+ * Enable Clip Creation Listener, without invoking a Helix API call
+ *
+ * @param channelId Channel Id
+ * @param channelName Channel Name
+ * @return whether the channel was added
+ */
+ default boolean enableClipEventListener(String channelId, String channelName) {
+ return enableClipEventListener(channelId, channelName, Instant.now());
+ }
+
+ /**
+ * Clip Creation Listener
+ *
+ * @param channelName Channel Name
+ * @return the channel whose clip creations are now tracked, or null (if unable to resolve or already tracking)
+ */
+ @Nullable
+ default User enableClipEventListener(String channelName) {
+ UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute();
+
+ if (users.getUsers().size() == 1) {
+ User user = users.getUsers().get(0);
+ if (enableClipEventListener(user.getId(), user.getLogin()))
+ return user;
+ } else {
+ log.error("Failed to add channel " + channelName + " to Clip Creation Listener, maybe it doesn't exist!");
+ }
+
+ return null;
+ }
+
+ /**
+ * Enable Clip Creation Listener for the given channel names
+ *
+ * @param channelNames the channel names to be added
+ * @return the channels that are freshly tracked for clip creations
+ */
+ default Collection enableClipEventListener(Iterable channelNames) {
+ return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream()
+ .map(channels -> getTwitchHelix().getUsers(null, null, channels).execute())
+ .map(UserList::getUsers)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .filter(user -> enableClipEventListener(user.getId(), user.getLogin()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Disable Clip Creation Listener
+ *
+ * @param channelName Channel Name
+ * @return whether a previously-tracked channel was removed
+ */
+ default boolean disableClipEventListener(String channelName) {
+ UserList users = getTwitchHelix().getUsers(null, null, Collections.singletonList(channelName)).execute();
+
+ if (users.getUsers().size() == 1) {
+ return disableClipEventListenerForId(users.getUsers().get(0).getId());
+ } else {
+ log.error("Failed to remove channel " + channelName + " from clip creation listener!");
+ return false;
+ }
+ }
+
+ /**
+ * Disable Clip Creation Listener for the given channel names
+ *
+ * @param channelNames the channel names to be removed
+ */
+ default void disableClipEventListener(Iterable channelNames) {
+ CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> {
+ UserList users = getTwitchHelix().getUsers(null, null, channels).execute();
+ users.getUsers().forEach(user -> disableClipEventListenerForId(user.getId()));
+ });
+ }
+
+ /**
+ * Updates {@link ExponentialBackoffStrategy#getBaseMillis()} for each of the independent listeners (i.e. stream status and followers)
+ *
+ * @param threadRate the maximum rate of api calls per second
+ */
+ default void setThreadRate(long threadRate) {
+ this.setThreadDelay(1000 / threadRate);
+ }
+
+}
diff --git a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientHelper.java b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientHelper.java
index adca11577..8bb8d9450 100644
--- a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientHelper.java
+++ b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientHelper.java
@@ -9,28 +9,33 @@
import com.github.twitch4j.common.events.domain.EventUser;
import com.github.twitch4j.common.util.CollectionUtils;
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
+import com.github.twitch4j.util.PaginationUtil;
import com.github.twitch4j.domain.ChannelCache;
import com.github.twitch4j.events.ChannelChangeGameEvent;
import com.github.twitch4j.events.ChannelChangeTitleEvent;
+import com.github.twitch4j.events.ChannelClipCreatedEvent;
import com.github.twitch4j.events.ChannelFollowCountUpdateEvent;
import com.github.twitch4j.events.ChannelGoLiveEvent;
import com.github.twitch4j.events.ChannelGoOfflineEvent;
import com.github.twitch4j.events.ChannelViewerCountUpdateEvent;
import com.github.twitch4j.helix.TwitchHelix;
-import com.github.twitch4j.helix.domain.*;
+import com.github.twitch4j.helix.domain.Clip;
+import com.github.twitch4j.helix.domain.ClipList;
+import com.github.twitch4j.helix.domain.Follow;
+import com.github.twitch4j.helix.domain.FollowList;
+import com.github.twitch4j.helix.domain.Stream;
+import com.github.twitch4j.helix.domain.StreamList;
import com.netflix.hystrix.HystrixCommand;
+import lombok.Getter;
import lombok.Value;
-import lombok.extern.slf4j.Slf4j;
-import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,20 +48,20 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
-import java.util.stream.Collectors;
/**
* A helper class that covers a few basic use cases of most library users
*/
-@Slf4j
-public class TwitchClientHelper implements AutoCloseable {
+public class TwitchClientHelper implements IClientHelper {
+
+ static final Logger log = LoggerFactory.getLogger(TwitchClientHelper.class);
public static final int REQUIRED_THREAD_COUNT = 2;
/**
* The greatest number of streams or followers that can be requested in a single API call
*/
- private static final int MAX_LIMIT = 100;
+ static final int MAX_LIMIT = 100;
/**
* Holds the channels that are checked for live/offline state changes
@@ -69,14 +74,15 @@ public class TwitchClientHelper implements AutoCloseable {
private final Set listenForFollow = ConcurrentHashMap.newKeySet();
/**
- * Twitch Helix
+ * Holds the channels that are checked for new clip creations
*/
- private final TwitchHelix twitchHelix;
+ private final Set listenForClips = ConcurrentHashMap.newKeySet();
/**
- * Event Manager
+ * Twitch Helix
*/
- private final EventManager eventManager;
+ @Getter
+ private final TwitchHelix twitchHelix;
/**
* Event Task - Stream Status
@@ -102,6 +108,18 @@ public class TwitchClientHelper implements AutoCloseable {
*/
private final AtomicReference> followerEventFuture = new AtomicReference<>();
+ /**
+ * Event Task - Clip Creations
+ *
+ * Accepts a channel id as the input; Yields true if the next call should not be delayed
+ */
+ private final Function clipEventTask;
+
+ /**
+ * The {@link Future} associated with clipEventTask, in an atomic wrapper
+ */
+ private final AtomicReference> clipEventFuture = new AtomicReference<>();
+
/**
* Channel Information Cache
*/
@@ -124,6 +142,11 @@ public class TwitchClientHelper implements AutoCloseable {
*/
private final AtomicReference followBackoff;
+ /**
+ * Holds the {@link ExponentialBackoffStrategy} used for the clip creation listener
+ */
+ private final AtomicReference clipBackoff;
+
/**
* Constructor
*
@@ -133,14 +156,14 @@ public class TwitchClientHelper implements AutoCloseable {
*/
public TwitchClientHelper(TwitchHelix twitchHelix, EventManager eventManager, ScheduledThreadPoolExecutor executor) {
this.twitchHelix = twitchHelix;
- this.eventManager = eventManager;
this.executor = executor;
final ExponentialBackoffStrategy defaultBackoff = ExponentialBackoffStrategy.builder().immediateFirst(false).baseMillis(1000L).jitter(false).build();
- liveBackoff = new AtomicReference<>(defaultBackoff);
- followBackoff = new AtomicReference<>(defaultBackoff.copy());
+ this.liveBackoff = new AtomicReference<>(defaultBackoff);
+ this.followBackoff = new AtomicReference<>(defaultBackoff.copy());
+ this.clipBackoff = new AtomicReference<>(defaultBackoff.copy());
- // Threads
+ // Tasks
this.streamStatusEventTask = channels -> {
// check go live / stream events
HystrixCommand hystrixGetAllStreams = twitchHelix.getStreams(null, null, null, channels.size(), null, null, channels, null);
@@ -170,11 +193,11 @@ public TwitchClientHelper(TwitchHelix twitchHelix, EventManager eventManager, Sc
if (stream != null && stream.getType().equalsIgnoreCase("live")) {
// is live
// - live status
- if (currentChannelCache.getIsLive() != null && currentChannelCache.getIsLive() == false) {
+ if (currentChannelCache.getIsLive() != null && !currentChannelCache.getIsLive()) {
dispatchGoLiveEvent = true;
}
currentChannelCache.setIsLive(true);
- boolean wasAlreadyLive = dispatchGoLiveEvent != true && currentChannelCache.getIsLive() == true;
+ boolean wasAlreadyLive = !dispatchGoLiveEvent && currentChannelCache.getIsLive();
// - change stream title event
if (wasAlreadyLive && currentChannelCache.getTitle() != null && !currentChannelCache.getTitle().equalsIgnoreCase(stream.getTitle())) {
@@ -194,7 +217,7 @@ public TwitchClientHelper(TwitchHelix twitchHelix, EventManager eventManager, Sc
}
} else {
// was online previously?
- if (currentChannelCache.getIsLive() != null && currentChannelCache.getIsLive() == true) {
+ if (currentChannelCache.getIsLive() != null && currentChannelCache.getIsLive()) {
dispatchGoOfflineEvent = true;
}
@@ -308,50 +331,50 @@ public TwitchClientHelper(TwitchHelix twitchHelix, EventManager eventManager, Sc
return false;
}
};
- }
+ this.clipEventTask = channelId -> {
+ // check clip creations
+ boolean nextRequestCanBeImmediate = false;
+
+ final ChannelCache currentChannelCache = channelInformation.get(channelId, c -> new ChannelCache());
+ final AtomicReference windowStart = currentChannelCache.getClipWindowStart();
+ final Instant startedAt = windowStart.get();
+ final Instant now = Instant.now();
+
+ if (startedAt == null) {
+ // initialize clip query window started_at
+ nextRequestCanBeImmediate = windowStart.compareAndSet(null, now);
+ } else {
+ // get all clips in range [startedAt, now]
+ final List clips = getClips(channelId, startedAt, now);
- /**
- * Enable StreamEvent Listener
- *
- * @param channelName Channel Name
- */
- @Nullable
- public User enableStreamEventListener(String channelName) {
- UserList users = twitchHelix.getUsers(null, null, Collections.singletonList(channelName)).execute();
-
- if (users.getUsers().size() == 1) {
- User user = users.getUsers().get(0);
- if (enableStreamEventListener(user.getId(), user.getLogin()))
- return user;
- } else {
- log.error("Failed to add channel {} to stream event listener!", channelName);
- }
+ // cache channel name if unknown and construct event channel to be passed to events
+ if (!clips.isEmpty() && clips.get(0) != null && currentChannelCache.getUserName() == null) {
+ currentChannelCache.setUserName(clips.get(0).getBroadcasterName());
+ }
+ final EventChannel channel = new EventChannel(channelId, currentChannelCache.getUserName());
- return null;
- }
+ // loop through queried clips
+ Instant maxCreatedAt = startedAt;
+ for (Clip clip : clips) {
+ if (clip != null && clip.getCreatedAtInstant() != null && clip.getCreatedAtInstant().compareTo(startedAt) > 0) {
+ eventManager.publish(new ChannelClipCreatedEvent(channel, clip)); // found a new clip
- /**
- * Enable StreamEvent Listener for the given channel names
- *
- * @param channelNames the channel names to be added
- */
- public Collection enableStreamEventListener(Iterable channelNames) {
- return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream()
- .map(channels -> twitchHelix.getUsers(null, null, channels).execute())
- .map(UserList::getUsers)
- .filter(Objects::nonNull)
- .flatMap(Collection::stream)
- .filter(user -> enableStreamEventListener(user.getId(), user.getLogin()))
- .collect(Collectors.toList());
+ if (clip.getCreatedAtInstant().compareTo(maxCreatedAt) > 0)
+ maxCreatedAt = clip.getCreatedAtInstant(); // keep track of most recently created clip
+ }
+ }
+
+ // next clip window should start just after the most recent clip we've seen for this channel
+ final Instant nextStartedAt = maxCreatedAt;
+ if (nextStartedAt != startedAt)
+ windowStart.updateAndGet(old -> old == null || old.compareTo(nextStartedAt) < 0 ? nextStartedAt : old);
+ }
+
+ return nextRequestCanBeImmediate;
+ };
}
- /**
- * Enable StreamEvent Listener, without invoking a Helix API call
- *
- * @param channelId Channel Id
- * @param channelName Channel Name
- * @return true if the channel was added, false otherwise
- */
+ @Override
public boolean enableStreamEventListener(String channelId, String channelName) {
// add to set
final boolean add = listenForGoLive.add(channelId);
@@ -365,45 +388,13 @@ public boolean enableStreamEventListener(String channelId, String channelName) {
return add;
}
- /**
- * Disable StreamEvent Listener
- *
- * @param channelName Channel Name
- */
- public void disableStreamEventListener(String channelName) {
- UserList users = twitchHelix.getUsers(null, null, Collections.singletonList(channelName)).execute();
-
- if (users.getUsers().size() == 1) {
- users.getUsers().forEach(user -> disableStreamEventListenerForId(user.getId()));
- } else {
- log.error("Failed to remove channel " + channelName + " from stream event listener!");
- }
- }
-
- /**
- * Disable StreamEvent Listener for the given channel names
- *
- * @param channelNames the channel names to be removed
- */
- public void disableStreamEventListener(Iterable channelNames) {
- CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> {
- UserList users = twitchHelix.getUsers(null, null, channels).execute();
- users.getUsers().forEach(user -> disableStreamEventListenerForId(user.getId()));
- });
- }
-
- /**
- * Disable StreamEventListener, without invoking a Helix API call
- *
- * @param channelId Channel Id
- * @return true if the channel was removed, false otherwise
- */
+ @Override
public boolean disableStreamEventListenerForId(String channelId) {
// remove from set
boolean remove = listenForGoLive.remove(channelId);
// invalidate cache
- if (!listenForFollow.contains(channelId)) {
+ if (!listenForFollow.contains(channelId) && !listenForClips.contains(channelId)) {
channelInformation.invalidate(channelId);
} else if (remove) {
ChannelCache info = channelInformation.getIfPresent(channelId);
@@ -418,48 +409,7 @@ public boolean disableStreamEventListenerForId(String channelId) {
return remove;
}
- /**
- * Follow Listener
- *
- * @param channelName Channel Name
- */
- @Nullable
- public User enableFollowEventListener(String channelName) {
- UserList users = twitchHelix.getUsers(null, null, Collections.singletonList(channelName)).execute();
-
- if (users.getUsers().size() == 1) {
- User user = users.getUsers().get(0);
- if (enableFollowEventListener(user.getId(), user.getLogin()))
- return user;
- } else {
- log.error("Failed to add channel " + channelName + " to Follow Listener, maybe it doesn't exist!");
- }
-
- return null;
- }
-
- /**
- * Enable Follow Listener for the given channel names
- *
- * @param channelNames the channel names to be added
- */
- public Collection enableFollowEventListener(Iterable channelNames) {
- return CollectionUtils.chunked(channelNames, MAX_LIMIT).stream()
- .map(channels -> twitchHelix.getUsers(null, null, channels).execute())
- .map(UserList::getUsers)
- .filter(Objects::nonNull)
- .flatMap(Collection::stream)
- .filter(user -> enableFollowEventListener(user.getId(), user.getLogin()))
- .collect(Collectors.toList());
- }
-
- /**
- * Enable Follow Listener, without invoking a Helix API call
- *
- * @param channelId Channel Id
- * @param channelName Channel Name
- * @return true if the channel was added, false otherwise
- */
+ @Override
public boolean enableFollowEventListener(String channelId, String channelName) {
// add to list
final boolean add = listenForFollow.add(channelId);
@@ -473,51 +423,53 @@ public boolean enableFollowEventListener(String channelId, String channelName) {
return add;
}
- /**
- * Disable Follow Listener
- *
- * @param channelName Channel Name
- */
- public void disableFollowEventListener(String channelName) {
- UserList users = twitchHelix.getUsers(null, null, Collections.singletonList(channelName)).execute();
+ @Override
+ public boolean disableFollowEventListenerForId(String channelId) {
+ // remove from set
+ boolean remove = listenForFollow.remove(channelId);
- if (users.getUsers().size() == 1) {
- users.getUsers().forEach(user -> disableFollowEventListenerForId(user.getId()));
- } else {
- log.error("Failed to remove channel " + channelName + " from follow listener!");
+ // invalidate cache
+ if (!listenForGoLive.contains(channelId) && !listenForClips.contains(channelId)) {
+ channelInformation.invalidate(channelId);
+ } else if (remove) {
+ ChannelCache info = channelInformation.getIfPresent(channelId);
+ if (info != null) {
+ info.setLastFollowCheck(null);
+ info.getFollowers().lazySet(null);
+ }
}
+
+ startOrStopEventGenerationThread();
+ return remove;
}
- /**
- * Disable Follow Listener for the given channel names
- *
- * @param channelNames the channel names to be removed
- */
- public void disableFollowEventListener(Iterable channelNames) {
- CollectionUtils.chunked(channelNames, MAX_LIMIT).forEach(channels -> {
- UserList users = twitchHelix.getUsers(null, null, channels).execute();
- users.getUsers().forEach(user -> disableFollowEventListenerForId(user.getId()));
- });
+ @Override
+ public boolean enableClipEventListener(String channelId, String channelName, Instant startedAt) {
+ // add to set
+ final boolean add = listenForClips.add(channelId);
+ if (!add) {
+ log.info("Channel {} already added for Clip Creation Events", channelName);
+ } else {
+ // initialize cache
+ ChannelCache channelCache = channelInformation.get(channelId, s -> new ChannelCache(channelName));
+ channelCache.getClipWindowStart().compareAndSet(null, startedAt);
+ }
+ startOrStopEventGenerationThread();
+ return add;
}
- /**
- * Disable Follow Listener, without invoking a Helix API call
- *
- * @param channelId Channel Id
- * @return true when a previously-tracked channel was removed, false otherwise
- */
- public boolean disableFollowEventListenerForId(String channelId) {
+ @Override
+ public boolean disableClipEventListenerForId(String channelId) {
// remove from set
- boolean remove = listenForFollow.remove(channelId);
+ boolean remove = listenForClips.remove(channelId);
// invalidate cache
- if (!listenForGoLive.contains(channelId)) {
+ if (!listenForGoLive.contains(channelId) && !listenForFollow.contains(channelId)) {
channelInformation.invalidate(channelId);
} else if (remove) {
ChannelCache info = channelInformation.getIfPresent(channelId);
if (info != null) {
- info.setLastFollowCheck(null);
- info.getFollowers().set(null);
+ info.getClipWindowStart().lazySet(null);
}
}
@@ -525,6 +477,44 @@ public boolean disableFollowEventListenerForId(String channelId) {
return remove;
}
+ @Override
+ public void setThreadDelay(long threadDelay) {
+ final UnaryOperator updateBackoff = old -> {
+ ExponentialBackoffStrategy next = old.toBuilder().baseMillis(threadDelay).build();
+ next.setFailures(old.getFailures());
+ return next;
+ };
+
+ this.liveBackoff.getAndUpdate(updateBackoff);
+ this.followBackoff.getAndUpdate(updateBackoff);
+ this.clipBackoff.getAndUpdate(updateBackoff);
+ }
+
+ @Override
+ public Optional getCachedInformation(String channelId) {
+ return Optional.ofNullable(channelInformation.getIfPresent(channelId));
+ }
+
+ @Override
+ public void close() {
+ final Future> streamStatusFuture = this.streamStatusEventFuture.getAndSet(null);
+ if (streamStatusFuture != null)
+ streamStatusFuture.cancel(false);
+
+ final Future> followerFuture = this.followerEventFuture.getAndSet(null);
+ if (followerFuture != null)
+ followerFuture.cancel(false);
+
+ final Future> clipFuture = this.clipEventFuture.getAndSet(null);
+ if (clipFuture != null)
+ clipFuture.cancel(false);
+
+ listenForGoLive.clear();
+ listenForFollow.clear();
+ listenForClips.clear();
+ channelInformation.invalidateAll();
+ }
+
/**
* Start or quit the thread, depending on usage
*/
@@ -534,12 +524,15 @@ private void startOrStopEventGenerationThread() {
// follower event thread
updateListener(listenForFollow::isEmpty, followerEventFuture, this::runRecursiveFollowerCheck, followBackoff);
+
+ // clip creation event thread
+ updateListener(listenForClips::isEmpty, clipEventFuture, this::runRecursiveClipCheck, clipBackoff);
}
/**
* Performs the "heavy lifting" of starting or stopping a listener
*
- * @param stopCondition yields whether or not the listener should be running
+ * @param stopCondition yields whether the listener should be running
* @param futureReference the current listener in an atomic wrapper
* @param startCommand the command to start the listener
* @param backoff the {@link ExponentialBackoffStrategy} for the listener
@@ -578,103 +571,47 @@ private void updateListener(BooleanSupplier stopCondition, AtomicReference(
- executor,
- CollectionUtils.chunked(listenForGoLive, MAX_LIMIT),
- streamStatusEventFuture,
- liveBackoff,
- this::runRecursiveStreamStatusCheck,
- chunk -> {
- streamStatusEventTask.accept(chunk);
- return false; // treat as always consuming from the api rate-limit
- }
- )
- )
- );
- }
+ runRecursiveCheck(streamStatusEventFuture, executor, CollectionUtils.chunked(listenForGoLive, MAX_LIMIT), liveBackoff, this::runRecursiveStreamStatusCheck, chunk -> {
+ streamStatusEventTask.accept(chunk);
+ return false; // treat as always consuming from the api rate-limit
+ });
}
/**
* Initiates the follower listener execution
*/
private void runRecursiveFollowerCheck() {
- if (followerEventFuture.get() != null)
- synchronized (followerEventFuture) {
- if (cancel(followerEventFuture))
- followerEventFuture.set(
- executor.submit(
- new ListenerRunnable<>(
- executor,
- new ArrayList<>(listenForFollow),
- followerEventFuture,
- followBackoff,
- this::runRecursiveFollowerCheck,
- followerEventTask
- )
- )
- );
- }
+ runRecursiveCheck(followerEventFuture, executor, new ArrayList<>(listenForFollow), followBackoff, this::runRecursiveFollowerCheck, followerEventTask);
}
/**
- * Updates {@link ExponentialBackoffStrategy#getBaseMillis()} for each of the independent listeners (i.e. stream status and followers)
- *
- * @param threadRate the maximum rate of api calls per second
+ * Initiates the clip creation listener execution
*/
- public void setThreadRate(long threadRate) {
- this.setThreadDelay(1000 / threadRate);
+ private void runRecursiveClipCheck() {
+ runRecursiveCheck(clipEventFuture, executor, new ArrayList<>(listenForClips), clipBackoff, this::runRecursiveClipCheck, clipEventTask);
}
- /**
- * Updates {@link ExponentialBackoffStrategy#getBaseMillis()} for each of the independent listeners (i.e. stream status and followers)
- *
- * @param threadDelay the minimum milliseconds delay between each api call
- */
- public void setThreadDelay(long threadDelay) {
- final UnaryOperator updateBackoff = old -> {
- ExponentialBackoffStrategy next = old.toBuilder().baseMillis(threadDelay).build();
- next.setFailures(old.getFailures());
- return next;
- };
-
- this.liveBackoff.getAndUpdate(updateBackoff);
- this.followBackoff.getAndUpdate(updateBackoff);
- }
-
- /**
- * Get cached information for a channel's stream status and follower count.
- *
- * For this information to be valid, the respective event listeners need to be enabled for the channel.
- *
- * For thread safety, the setters on this object should not be used; only getters.
- *
- * @param channelId The ID of the channel whose cache is to be retrieved.
- * @return ChannelCache in an optional wrapper.
- */
- public Optional getCachedInformation(String channelId) {
- return Optional.ofNullable(channelInformation.getIfPresent(channelId));
- }
-
- /**
- * Close
- */
- public void close() {
- final Future> streamStatusFuture = this.streamStatusEventFuture.getAndSet(null);
- if (streamStatusFuture != null)
- streamStatusFuture.cancel(false);
-
- final Future> followerFuture = this.followerEventFuture.getAndSet(null);
- if (followerFuture != null)
- followerFuture.cancel(false);
-
- listenForGoLive.clear();
- listenForFollow.clear();
- channelInformation.invalidateAll();
+ private List getClips(String channelId, Instant startedAt, Instant endedAt) {
+ return PaginationUtil.getPaginated(
+ cursor -> {
+ final HystrixCommand commandGetClips = twitchHelix.getClips(null, channelId, null, null, cursor, null, MAX_LIMIT, startedAt, endedAt);
+ try {
+ ClipList result = commandGetClips.execute();
+ clipBackoff.get().reset(); // successful api call
+ return result;
+ } catch (Exception ex) {
+ if (commandGetClips != null && commandGetClips.isFailedExecution()) {
+ log.trace(ex.getMessage(), ex);
+ }
+ log.error("Failed to check for Clip Events: " + ex.getMessage());
+ clipBackoff.get().get(); // increment failures
+ return null;
+ }
+ },
+ ClipList::getData,
+ call -> call.getPagination() != null ? call.getPagination().getCursor() : null,
+ 20
+ );
}
@Value
@@ -727,4 +664,24 @@ private static boolean cancel(AtomicReference> futureRef) {
return future != null && future.cancel(false);
}
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private static void runRecursiveCheck(AtomicReference> future, ScheduledExecutorService executor, List units, AtomicReference backoff, Runnable startCommand, Function task) {
+ if (future.get() != null)
+ synchronized (future) {
+ if (cancel(future))
+ future.set(
+ executor.submit(
+ new ListenerRunnable<>(
+ executor,
+ units,
+ future,
+ backoff,
+ startCommand,
+ task
+ )
+ )
+ );
+ }
+ }
+
}
diff --git a/twitch4j/src/main/java/com/github/twitch4j/domain/ChannelCache.java b/twitch4j/src/main/java/com/github/twitch4j/domain/ChannelCache.java
index 75a70b075..fef5f1a58 100644
--- a/twitch4j/src/main/java/com/github/twitch4j/domain/ChannelCache.java
+++ b/twitch4j/src/main/java/com/github/twitch4j/domain/ChannelCache.java
@@ -50,6 +50,11 @@ public class ChannelCache {
*/
private final AtomicReference followers = new AtomicReference<>();
+ /**
+ * Clip Query Started At
+ */
+ private final AtomicReference clipWindowStart = new AtomicReference<>();
+
/**
* Construct Channel Cache
*
diff --git a/twitch4j/src/main/java/com/github/twitch4j/events/ChannelClipCreatedEvent.java b/twitch4j/src/main/java/com/github/twitch4j/events/ChannelClipCreatedEvent.java
new file mode 100644
index 000000000..48336eef6
--- /dev/null
+++ b/twitch4j/src/main/java/com/github/twitch4j/events/ChannelClipCreatedEvent.java
@@ -0,0 +1,42 @@
+package com.github.twitch4j.events;
+
+import com.github.twitch4j.common.events.TwitchEvent;
+import com.github.twitch4j.common.events.domain.EventChannel;
+import com.github.twitch4j.common.events.domain.EventUser;
+import com.github.twitch4j.helix.domain.Clip;
+import lombok.EqualsAndHashCode;
+import lombok.Value;
+
+import java.util.Optional;
+
+/**
+ * Called when a new clip is created in a channel.
+ *
+ * Fired by {@link com.github.twitch4j.TwitchClientHelper}; so this event must explicitly be enabled for specific channels.
+ *
+ * Due to Twitch heavily caching the get clips endpoint, these creation events can have multi-minute delays.
+ *
+ * @see com.github.twitch4j.IClientHelper#enableClipEventListener(String, String)
+ */
+@Value
+@EqualsAndHashCode(callSuper = false)
+public class ChannelClipCreatedEvent extends TwitchEvent {
+
+ /**
+ * The channel where the clip was created.
+ */
+ EventChannel channel;
+
+ /**
+ * The clip that was created.
+ */
+ Clip clip;
+
+ /**
+ * @return the user that created the clip
+ */
+ public Optional getCreatingUser() {
+ return Optional.ofNullable(clip.getCreatorId()).map(id -> new EventUser(id, clip.getCreatorName()));
+ }
+
+}
diff --git a/util/src/main/java/com/github/twitch4j/util/PaginationUtil.java b/util/src/main/java/com/github/twitch4j/util/PaginationUtil.java
new file mode 100644
index 000000000..5b72f8ad9
--- /dev/null
+++ b/util/src/main/java/com/github/twitch4j/util/PaginationUtil.java
@@ -0,0 +1,165 @@
+package com.github.twitch4j.util;
+
+import lombok.experimental.UtilityClass;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+@UtilityClass
+public class PaginationUtil {
+
+ /**
+ * Obtains a full (user-specified) collection of results from paginated calls with arbitrary cursors, up to a max number of pages and elements, while optionally avoiding duplicate cursors.
+ *
+ * Implementation notes:
+ *
+ * - strict mode involves more memory usage but can avoid unnecessary loops when max pages and elements are high and the call can yield undesirable duplicate cursors
+ * - even without strict mode, this checks that the next cursor does not equal the previous cursor to avoid some loops while reducing memory footprint
+ * - the max elements constraint can be marginally violated when the final page of results had enough elements to go beyond this threshold
+ * - pagination stops if any threshold is met (it does not wait for both the max pages and max elements thresholds to be exceeded)
+ * - null cursors yielded from a call end further pagination
+ *
+ *
+ * @param callByCursor Performs a query based on the cursor
+ * @param getResult Yields the results contained in the executed call
+ * @param getNext Yields the next cursor to paginate on (or null to cease pagination)
+ * @param maxPages The maximum number of pages to paginate over
+ * @param maxUnits The (approximate) maximum number of elements that can be queried
+ * @param collector The supplier of the collection to store the paginated results
+ * @param strict Whether pagination should stop upon encountering an already-seen cursor
+ * @param first The initial cursor to use in the first call
+ * @param valid Whether a cursor is valid to be used in a subsequent call
+ * @param Result unit class
+ * @param Container class of results and pagination information
+ * @param The type of collection that should be used to store the result units
+ * @param The type of the cursor object; should implement equals and hashcode
+ * @return the paginated results
+ */
+ public static , P> C getPaginated(Function callByCursor, Function> getResult, Function getNext, int maxPages, int maxUnits, Supplier collector, boolean strict, P first, Predicate valid) {
+ final C collection = collector.get();
+
+ Set
cursors = strict ? new HashSet<>(Math.max(16, Math.min(maxPages, 1024))) : null;
+ P cursor = first;
+ int page = 0;
+ do {
+ // Perform the call
+ final K call = callByCursor.apply(cursor);
+ page++;
+ if (call == null) break;
+
+ // Save the results
+ final Collection results = getResult.apply(call);
+ if (results == null || results.isEmpty()) break;
+ collection.addAll(results);
+
+ // Obtain the cursor for the next call
+ final P next = getNext.apply(call);
+ if (Objects.equals(next, cursor)) break; // we got the same cursor back; avoid infinite loop
+ if (strict && next != null && !cursors.add(next)) break; // we've already seen this cursor in the past; avoid infinite loop
+ cursor = next;
+ } while (cursor != null && valid.test(cursor) && (maxPages < 0 || page < maxPages) && (maxUnits < 0 || collection.size() < maxUnits));
+
+ return collection;
+ }
+
+ /**
+ * Obtains a full (user-specified) collection of results from paginated calls, up to a max number of pages and elements, while optionally avoiding duplicate cursors.
+ *
+ * Implementation notes:
+ *
+ * - null or empty cursors yielded from a call end further pagination
+ * - the first call executed is done with a null cursor
+ *
+ *
+ * @param callByCursor Performs a query based on the string cursor (which is initially null)
+ * @param extractResult Yields the results contained in the executed call
+ * @param extractCursor Yields the next cursor to paginate on (or null to cease pagination)
+ * @param maxPages The maximum number of pages to paginate over
+ * @param maxElements The (approximate) maximum number of elements that can be queried
+ * @param createCollection The supplier of the collection to store the paginated results
+ * @param strict Whether pagination should stop upon encountering an already-seen cursor
+ * @param Result unit class
+ * @param Container class of results and pagination information
+ * @param The type of collection that should be used to store the result units
+ * @return the paginated results
+ * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean, Object, Predicate)
+ */
+ public static > C getPaginated(Function callByCursor, Function> extractResult, Function extractCursor, int maxPages, int maxElements, Supplier createCollection, boolean strict) {
+ return getPaginated(callByCursor, extractResult, extractCursor, maxPages, maxElements, createCollection, strict, null, s -> !s.isEmpty());
+ }
+
+ /**
+ * Obtains a full (user-specified) collection of results from paginated calls, up to a max number of pages and elements.
+ *
+ * @param callByCursor Performs a query based on the string cursor (which is initially null)
+ * @param resultsFromCall Yields the results contained in the executed call
+ * @param nextCursorFromCall Yields the next cursor to paginate on (or null to cease pagination)
+ * @param maxPages The maximum number of pages to paginate over
+ * @param maxElements The (approximate) maximum number of elements that can be queried
+ * @param createCollection The supplier of the collection to store the paginated results
+ * @param Result unit class
+ * @param Container class of results and pagination information
+ * @param The type of collection that should be used to store the result units
+ * @return the paginated results
+ * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean)
+ */
+ public static > C getPaginated(Function callByCursor, Function> resultsFromCall, Function nextCursorFromCall, int maxPages, int maxElements, Supplier createCollection) {
+ return getPaginated(callByCursor, resultsFromCall, nextCursorFromCall, maxPages, maxElements, createCollection, false);
+ }
+
+ /**
+ * Obtains a full list of results from paginated calls, up to a max number of pages and elements.
+ *
+ * @param callByCursor Performs a query based on the string cursor (which is initially null)
+ * @param resultsFromCall Yields the results contained in the executed call
+ * @param nextCursorFromCall Yields the next cursor to paginate on (or null to cease pagination)
+ * @param maxPages The maximum number of pages to paginate over
+ * @param maxElements The (approximate) maximum number of elements that can be queried
+ * @param Result unit class
+ * @param Container class of results and pagination information
+ * @return the paginated results
+ * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean)
+ */
+ public static List getPaginated(Function callByCursor, Function> resultsFromCall, Function nextCursorFromCall, int maxPages, int maxElements) {
+ return getPaginated(callByCursor, resultsFromCall, nextCursorFromCall, maxPages, maxElements, ArrayList::new);
+ }
+
+ /**
+ * Obtains a full list of results from paginated calls, up to a max number of pages.
+ *
+ * @param callByCursor Performs a query based on the string cursor (which is initially null)
+ * @param resultsFromCall Yields the results contained in the executed call
+ * @param nextCursorFromCall Yields the next cursor to paginate on (or null to cease pagination)
+ * @param maxPages The maximum number of pages to paginate over
+ * @param Result unit class
+ * @param Container class of results and pagination information
+ * @return the paginated results
+ * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean)
+ */
+ public static List getPaginated(Function callByCursor, Function> resultsFromCall, Function nextCursorFromCall, int maxPages) {
+ return getPaginated(callByCursor, resultsFromCall, nextCursorFromCall, maxPages, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Obtains a full list of results from paginated calls.
+ *
+ * @param callByCursor Performs a query based on the string cursor (which is initially null)
+ * @param resultsFromCall Yields the results contained in the executed call
+ * @param nextCursorFromCall Yields the next cursor to paginate on (or null to cease pagination)
+ * @param Result unit class
+ * @param Container class of results and pagination information
+ * @return the paginated results
+ * @see #getPaginated(Function, Function, Function, int, int, Supplier, boolean)
+ */
+ public static List getPaginated(Function callByCursor, Function> resultsFromCall, Function nextCursorFromCall) {
+ return getPaginated(callByCursor, resultsFromCall, nextCursorFromCall, Integer.MAX_VALUE);
+ }
+
+}