Skip to content

Java reactive streams workshop: basic implementations of Publisher, Subscriber, Subscription and Processor.

License

Notifications You must be signed in to change notification settings

codecop/java9-introduction-to-reactive-programming-workshop

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

79 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

License: GPL v3

java9-introduction-to-reactive-programming-workshop

preface

The main goal of this project is to explore basic features of reactive streams introduced in Java 9:

  • Publisher
  • Subscriber
  • Subscription (Backpressure)
  • Processor (SubmissionPublisher)

introduction

general overview

  • reactive programming (idea formulated by Eric Meijer)
    • the applications we developer, the programs we create must be really responsive and be able to react to stimuli in a system
  • the main objective of the reactive programming is NOT to be as fast as possible but to use resources (CPU, memory, ...) in the most efficient manner
    • core idea behind reactive is to release resource whenever possible
  • reactive programming is functional programming+
  • before java 8: completely imperative + object oriented
  • functional (style): declarative + higher-order function
    • functional programming: function composition + lazy evaluation
  • functional programming and exceptions are mutually exclusive
  • exceptions + java 8 streams: if you are driving a car and have a problem with a radio - the most illogical thing to do is to reverse back
    • you should turn off the radio and continue journey
  • Michael Feather: OO makes code understandable by encapsulating moving parts. FP makes code understandable by minimizing moving parts.
    • I moving part: immutability
    • II moving part: control flow (in imperative - we are going up and down to follow the flow)

manifesto

  • https://www.reactivemanifesto.org/
    • OOP four pillars:
      • abstraction (programming is based on making abstractions),
      • encapsulation (best practice for every programming style),
      • inheritance (golang has no inheritance),
      • polymorphism (makes actually OOP)
    • reactive four pillars
      • responsive
        • infinite-scrolling
        • providing responsiveness
          • efficiency is attained not by doing tasks faster, but by avoiding those that shouldn't be done in the first place
      • resilient - make failure first-class citizen (it is okay to fail)
      • elastic - the only reasonable direction to scale is horizontally
      • message driven - do not expose your database instead export your data
    • reactive manifesto conclusions
      • blocking I/O limits opportunities for parallelism, so nonblocking I/O is preferred
      • synchronous interaction limits opportunities for parallelism - asynchronous interaction is preferred
      • polling reduces opportunity to use fewer resources, so an event-driven style is preferred
      • if one node can bring down all other nodes, that’s a waste of resources
        • you need isolation of errors (resilience) to avoid losing all your work
      • systems need to be elastic: if there’s less demand - use fewer resources
        • if there’s more demand - use more resources, but never more than required

concurrency context

  • shared mutability
    Thread th = new Thread(new Runnable() {
        public void run() { // we don't produce anything and don't consume anything 
            // it could be useful only by shared mutability
            // how to work with threads if shared mutability is dangerous 
            // and thread API forces us to use it
        }
    })
    
  • in the past the structure of sequential code was very different from the structure of concurrent code
    • with stream: the structure of sequential code is the same as the structure of concurrent code
  • how many threads should you create?
    • computation intensive <= # of cores
      • Schedulers.computation()
    • IO intensive = NCPU * UCPU * (1 + W/C)
      • NCPU is the number of cores, available through Runtime.getRuntime().availableProcessors()
      • UCPU is the target CPU utilization (between 0 and 1)
      • W/C is the ratio of wait time to compute time
      • Schedulers.io()
    • number of threads is therefore strictly limited
      • by memory also
  • example of reactive application
    • excel: if you modify one cell, it propagates to other cells
    • google docs - hundreds of people use it simultaneously

java 8 streams

  • stream is not a data structure it is is actually a collection of functions (with a data source: network, file, etc.)
  • limitations
    • stream cannot be reused
    • single pipeline (a single terminal operation)
      • cannot split into two
    • no exceptions handling

reactive streams

  • vs java 8 streams

    java streams reactive streams
    pipeline pipeline
    push data push/pull data
    lazy lazy
    0, 1, oo 0, 1, oo
    data only 3 channels: data, error, complete
    exceptions: good luck deal with it downstream (error is just another form of data)
    sequential vs parallel synch vs async
    single pipeline (one terminal operation) multiple subscribers
  • vs CompletableFuture

    reactive streams CompletableFuture/Promises
    0, 1, oo 0, 1
    3 channels 2 channels (data, error)
  • there are three key factors that make a stream reactive:

    • the data is processed asynchronously
    • the backpressure (strategy of co-op with very fast producer) mechanism is non-blocking
    • the fact that the downstream can be slower than the upstream is somehow represented in the domain model
      • the Twitter streaming API, where you can be disconnected if consuming too slow
  • Observable (producer) vs Observer pattern

    • it's that, plus
      • signal end of data stream
      • propagate error
      • evaluation may be synchronous, asynchronous or lazy
  • nonblocking backpressure

    • BUFFER - buffers all onNext values until the downstream consumes it
    • DROP - drops the most recent onNext value if the downstream can't keep up
    • ERROR - signals a MissingBackpressureException in case the downstream can't keep up
    • LATEST - keeps only the latest onNext value, overwriting any previous value if the downstream can't keep up
    • MISSING - OnNext events are written without any buffering or dropping
  • hot vs cold

    • cold = every subscriber starts fresh subscription
      • like iterator, if you start again you start from the beginning
    • hot = start from a point in time, like football online transmission
  • visualizations: https://rxmarbles.com/

past and future

  • times when you have to go to bank and talk with a person
  • times when you have to go to travel agency to buy tickets
  • in the past companies made products for their employees to use and make those employees (nobody cares what they think) available to us the customers
  • now companies build product for real-users, IOT

push vs pull protocols

  • summary

    alt text

  • push protocols:

    • the client opens a connection to the server and keeps it constantly active
    • the server will send (push) all new events to the client using that single always-on connection
    • in other words, the server PUSHes the new events to the client
  • pull protocols:

    • the client periodically connects to the server, checks for and gets (pulls) recent events and then closes the connection and disconnects from the server
    • the client repeats this whole procedure to get updated about new events
    • in this mode, the clients periodically PULLs the new events from the server
  • code example

    private static void pullExample() {
        final List<String> list = Lists.newArrayList("Java", "C", "C++", "PHP", "Go");
    
        final Iterator<String> iterator = list.iterator();
    
        while (iterator.hasNext()) {
            System.out.println(iterator.next());
        }
    }
    
    private static void pushExample() {
        final List<String> list = Lists.newArrayList("Java", "C", "C++", "PHP", "Go");
    
        final Observable<String> observable = Observable.from(list);
    
        observable.subscribe(System.out::println, System.out::println, () -> System.out.println("We are done!"));
    }
    

definitions

  • Flow.Publisher - source of data
  • Flow.Subscriber - destination of data
  • Flow.Subscription - message control linking a Flow.Publisher and Flow.Subscriber (Subscriber signal demand to Publisher using Subscription)
  • Flow.Processor - a component that acts as both a Subscriber and Publisher (can consume input and produce output).
  • Flow.SubmissionPublisher - the only one implementation (in JDK) of Flow.Publisher; has ability to asynchronously issue submitted (non-null) items to current subscribers until it is closed.

data flow

PUBLISHER -> PROCESSOR -> PROCESSOR -> SUBSCRIBER

We have two scenarios:

  • Publisher is slow, Subscriber is fast (best scenario)
  • Publisher is fast, Subscriber is slow (the Subscriber must deal

with excessive data - the most naive approach is just to drop all excessive data - so the data will be lost)

Note that if we have multiple subscribers and one publisher - they are receiving elements in the same order.

interaction steps

  1. implement Flow.Publisher (using, for example SubmissionPublisher<T>) and Flow.Subscriber
  2. the subscriber attempts to subscribe to the publisher by calling the `subscribe(Flow.Subscriber<? super T> subscriber)
    • take a look at SubmissionPublisher.subscribe(Subscriber<? super T> subscriber) method of the publisher
    • success: the publisher asynchronously calls the onSubscribe(Flow.Subscription subscription) method of the subscriber
    • failure: onError(Throwable throwable) method of the subscriber is called with an IllegalStateException, and the interaction ends
  3. the subscriber sends a request to the publisher for N items calling the request(N) on the Subscription
  4. multiple requests are send regardless if earlier are already fulfilled (non-blocking)
  5. the publisher calls the onNext(T item) method of the subscriber and sends an item in each call
    • if there is no more items to send the publisher calls the onComplete() method of the subscriber to signal the end of stream, and interaction ends
    • note that if subscriber requests Long.MAX_VALUE items, the stream becomes not reactive - it is effectively a push stream
  6. if the publisher encounters an error - calls onError(Throwable throwable) on subscriber
  7. the subscriber can cancel its subscription by calling the cancel() method on its subscription
    • if a subscription is cancelled, the interaction ends
    • it is possible for the subscriber to receive items after cancellation if there were pending requests before

additional remarks

Correctly @Override method onSubscribe looks as below:

@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (this.subscription == null) {
        this.subscription = subscription;
        this.subscription.request(1); // we handle backpressure through subscription
    }
    else {
        subscription.cancel(); // we handle cancellation through subscription
    }
}
  • because we want our Subscriber talk to only one Publisher - Subscription represents a link between single Subscriber and single Publisher so you have to cancel the incoming one (if we already have one, we don't accept any furthers)

    • think about subscriber as a radio receiver, subscriptions as radio waves, and publisher as radio station
  • the actual goal of having them included in the JDK is to provide something called a Service Provider Interface (or SPI) layer

    • this should eventually serve as a unification layer for different components that have reactive and streaming nature, but may expose their own custom APIs, and thus not be able to interoperate with other similar implementations

tests

We test it by running:

  • RunAnswer
  • RunWorkshop

the output should be:

onNext, item: new mapping: 2
onNext, item: new mapping: 4
onNext, item: new mapping: 6
onNext, item: new mapping: 8
onNext, item: new mapping: 10
onNext, item: new mapping: 12
...

About

Java reactive streams workshop: basic implementations of Publisher, Subscriber, Subscription and Processor.

Topics

Resources

License

Stars

Watchers

Forks

Languages

  • Java 100.0%