Skip to content

Latest commit

 

History

History
221 lines (182 loc) · 7.22 KB

subscribe-details.adoc

File metadata and controls

221 lines (182 loc) · 7.22 KB

subscribe Method Examples

This section contains minimal examples of each of the five signatures for the subscribe method. The following code shows an example of the basic method with no arguments:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(); (2)
  1. Set up a Flux that produces three values when a subscriber attaches.

  2. Subscribe in the simplest way.

The preceding code produces no visible output, but it does work. The Flux produces three values. If we provide a lambda, we can make the values visible. The next example for the subscribe method shows one way to make the values appear:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(i -> System.out.println(i)); (2)
  1. Set up a Flux that produces three values when a subscriber attaches.

  2. Subscribe with a subscriber that will print the values.

The preceding code produces the following output:

1
2
3

To demonstrate the next signature, we intentionally introduce an error, as shown in the following example:

Flux<Integer> ints = Flux.range(1, 4) (1)
      .map(i -> { (2)
        if (i <= 3) return i; (3)
        throw new RuntimeException("Got to 4"); (4)
      });
ints.subscribe(i -> System.out.println(i), (5)
      error -> System.err.println("Error: " + error));
  1. Set up a Flux that produces four values when a subscriber attaches.

  2. We need a map so that we can handle some values differently.

  3. For most values, return the value.

  4. For one value, force an error.

  5. Subscribe with a subscriber that includes an error handler.

We now have two lambda expressions: one for the content we expect and one for errors. The preceding code produces the following output:

1
2
3
Error: java.lang.RuntimeException: Got to 4

The next signature of the subscribe method includes both an error handler and a handler for completion events, as shown in the following example:

Flux<Integer> ints = Flux.range(1, 4); (1)
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done")); (2)
  1. Set up a Flux that produces four values when a subscriber attaches.

  2. Subscribe with a Subscriber that includes a handler for completion events.

Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.

The completion callback has no input, as represented by an empty pair of parentheses: It matches the run method in the Runnable interface. The preceding code produces the following output:

1
2
3
4
Done

Cancelling a subscribe() with Its Disposable

All these lambda-based variants of subscribe() have a Disposable return type. In this case, the Disposable interface represents the fact that the subscription can be cancelled, by calling its dispose() method.

For a Flux or Mono, cancellation is a signal that the source should stop producing elements. However, it is NOT guaranteed to be immediate: Some sources might produce elements so fast that they could complete even before receiving the cancel instruction.

Some utilities around Disposable are available in the Disposables class. Among these, Disposables.swap() creates a Disposable wrapper that lets you atomically cancel and replace a concrete Disposable. This can be useful, for instance, in a UI scenario where you want to cancel a request and replace it with a new one whenever the user clicks on a button. Disposing the wrapper itself closes it. Doing so disposes the current concrete value and all future attempted replacements.

Another interesting utility is Disposables.composite(…​). This composite lets you collect several Disposable — for instance, multiple in-flight requests associated with a service call — and dispose all of them at once later on. Once the composite’s dispose() method has been called, any attempt to add another Disposable immediately disposes it.

An Alternative to Lambdas: BaseSubscriber

There is an additional subscribe method that is more generic and takes a full-blown Subscriber rather than composing one out of lambdas. In order to help with writing such a Subscriber, we provide an extendable class called BaseSubscriber.

Warning
Instances of BaseSubscriber (or subclasses of it) are single-use, meaning that a BaseSubscriber cancels its subscription to the first Publisher if it is subscribed to a second Publisher. That is because using an instance twice would violate the Reactive Streams rule that the onNext method of a Subscriber must not be called in parallel. As a result, anonymous implementations are fine only if they are declared directly within the call to Publisher#subscribe(Subscriber).

Now we can implement one of these. We call it a SampleSubscriber. The following example shows how it would be attached to a Flux:

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);

The following example shows what SampleSubscriber could look like, as a minimalistic implementation of a BaseSubscriber:

package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}

The SampleSubscriber class extends BaseSubscriber, which is the recommended abstract class for user-defined Subscribers in Reactor. The class offers hooks that can be overridden to tune the subscriber’s behavior. By default, it triggers an unbounded request and behaves exactly as subscribe(). However, extending BaseSubscriber is much more useful when you want a custom request amount.

For a custom request amount, the bare minimum is to implement hookOnSubscribe(Subscription subscription) and hookOnNext(T value), as we did. In our case, the hookOnSubscribe method prints a statement to standard out and makes the first request. Then the hookOnNext method prints a statement and performs additional requests, one request at a time.

The SampleSubscriber class produces the following output:

Subscribed
1
2
3
4

BaseSubscriber also offers a requestUnbounded() method to switch to unbounded mode (equivalent to request(Long.MAX_VALUE)), as well as a cancel() method.

It also has additional hooks: hookOnComplete, hookOnError, hookOnCancel, and hookFinally (which is always called when the sequence terminates, with the type of termination passed in as a SignalType parameter)

Note
You almost certainly want to implement the hookOnError, hookOnCancel, and hookOnComplete methods. You may also want to implement the hookFinally method. SampleSubscriber is the absolute minimum implementation of a Subscriber that performs bounded requests.