The place where random ideas get written down and lost in time.

2017-05-23 - RxJava

Category DEV

Still trying to wrap around the basics.

Some high level vocabulary:

  • Observable = iterator == the stream.
  • Source / emitter == produce values (into an Observable) via onNext / onCompleted.
  • Observer = consumer == callback (onNext / onError / onCompleted).
    • "Subscribe to an observable using an observer".

⇒ This is my main grip with RxJava: the vocabulary is nonsensical and confusing. It means as soon as I stop using it I will get confused when I get back to it later.

The flow in a nutshell:

(emitter) ⇒ Observable ⇒ Observer[s].

But in fact it is implemented as such:

Observable.onSubscribe(emitter code) ⇒ Observer/Subscriber(receiving code)

with various intermediate steps in the stream.

A Disposable/Subscription represents the link from an Observer/Subscriber to an Observable/stream.

Core to remember:

Observable.{from|create}(source) ---> the observable

        .subscribeOn(Schedulers.io())

        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(an observer).

RxJava2 has Observable vs Flowable:

  • Observable = no back pressure.
    • Observer type: #onNext / #onComplete / #onError / #onSubscribe(Disposable).
    • Disposable: #dispose.
    • Specialized types: Single, Completable, Maybe (types of Observable)
    • Source: fromCallable(() -> return value),
    • Source: Observable.create() using a "subscriber" with an "emitter" object.
  • Flowable = has back pressure built-in.
    • Subscriber type: #onNext / #onComplete / #onError / #onSubscribe(Subscription).
    • Subscription: #cancel, #request.

RxStreams:

  • Publisher: #subscribe(Subscriber)
  • Subscriber: #onSubscribe(Subscription)
  • Subscription: #request, #cancel
  • Processor<T, R>: converts a Subscriber T into a Publisher R.

A few links:

Using it to replace an Event Bus:

Useful videos:

The most obvious pattern is to replace async tasks:

Observable.just( input data )

        .map( async lambda, transforming input to output type )

        .subscribeOn(Schedulers.io())

        .observeOn(AndroidSchedulers.mainThread())

        .subscribe( observer using the output )

To replace a timer task:

subscription = Observable.timer(delay, interval, TimeUnit.unit)

        .take(20) / map, etc. # for example

        .subscribe( observer doing something at interval )

subscription.unsubscribe() # equivalent to timer.cancel


 Generated on 2025-01-09 by Rig4j 0.1-Exp-f2c0035