Skip to content

Commit

Permalink
3.x: Add RxJavaPlugins.createExecutorScheduler (#7306)
Browse files Browse the repository at this point in the history
* 3.x: Add RxJavaPlugins.createExecutorScheduler

* Fix newline
  • Loading branch information
akarnokd committed Aug 2, 2021
1 parent e24e725 commit 5b0dd58
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
22 changes: 21 additions & 1 deletion src/main/java/io/reactivex/rxjava3/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.*;

import org.reactivestreams.Subscriber;

Expand Down Expand Up @@ -1302,6 +1302,26 @@ public static Scheduler createSingleScheduler(@NonNull ThreadFactory threadFacto
return new SingleScheduler(Objects.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Create an instance of a {@link Scheduler} by wrapping an existing {@link Executor}.
* <p>
* This method allows creating an {@code Executor}-backed {@code Scheduler} before the {@link Schedulers} class
* would initialize the standard {@code Scheduler}s.
*
* @param executor the {@code Executor} to wrap and turn into a {@code Scheduler}.
* @param interruptibleWorker if {@code true}, the tasks submitted to the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} will
* be interrupted when the task is disposed.
* @param fair if {@code true}, tasks submitted to the {@code Scheduler} or {@code Worker} will be executed by the underlying {@code Executor} one after the other, still
* in a FIFO and non-overlapping manner, but allows interleaving with other tasks submitted to the underlying {@code Executor}.
* If {@code false}, the underlying FIFO scheme will execute as many tasks as it can before giving up the underlying {@code Executor} thread.
* @return the new {@code Scheduler} wrapping the {@code Executor}
* @since 3.1.0
*/
@NonNull
public static Scheduler createExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
return new ExecutorScheduler(executor, interruptibleWorker, fair);
}

/**
* Wraps the call to the function in try-catch and propagates thrown
* checked exceptions as RuntimeException.
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,18 @@ public static Scheduler single() {
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@code Scheduler} instance, even for the same {@code Executor} instance.
* <p>
* It is possible to wrap an {@code Executor} into a {@code Scheduler} without triggering the initialization of all the
* standard schedulers by using the {@link RxJavaPlugins#createExecutorScheduler(Executor, boolean, boolean)} method
* before the {@code Schedulers} class itself is accessed.
* @param executor
* the executor to wrap
* @return the new {@code Scheduler} wrapping the {@code Executor}
* @see #from(Executor, boolean, boolean)
*/
@NonNull
public static Scheduler from(@NonNull Executor executor) {
return new ExecutorScheduler(executor, false, false);
return from(executor, false, false);
}

/**
Expand Down Expand Up @@ -452,6 +456,10 @@ public static Scheduler from(@NonNull Executor executor) {
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@code Scheduler} instance, even for the same {@code Executor} instance.
* <p>
* It is possible to wrap an {@code Executor} into a {@code Scheduler} without triggering the initialization of all the
* standard schedulers by using the {@link RxJavaPlugins#createExecutorScheduler(Executor, boolean, boolean)} method
* before the {@code Schedulers} class itself is accessed.
* <p>History: 2.2.6 - experimental
* @param executor
* the executor to wrap
Expand All @@ -463,7 +471,7 @@ public static Scheduler from(@NonNull Executor executor) {
*/
@NonNull
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker) {
return new ExecutorScheduler(executor, interruptibleWorker, false);
return from(executor, interruptibleWorker, false);
}

/**
Expand Down Expand Up @@ -532,6 +540,11 @@ public static Scheduler from(@NonNull Executor executor, boolean interruptibleWo
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@code Scheduler} instance, even for the same {@code Executor} instance.
* <p>
* It is possible to wrap an {@code Executor} into a {@code Scheduler} without triggering the initialization of all the
* standard schedulers by using the {@link RxJavaPlugins#createExecutorScheduler(Executor, boolean, boolean)} method
* before the {@code Schedulers} class itself is accessed.
*
* @param executor
* the executor to wrap
* @param interruptibleWorker if {@code true}, the tasks submitted to the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} will
Expand All @@ -544,7 +557,7 @@ public static Scheduler from(@NonNull Executor executor, boolean interruptibleWo
*/
@NonNull
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
return new ExecutorScheduler(executor, interruptibleWorker, fair);
return RxJavaPlugins.createExecutorScheduler(executor, interruptibleWorker, fair);
}

/**
Expand Down

0 comments on commit 5b0dd58

Please sign in to comment.