The place where random ideas get written down and lost in time.
2017-05-23 - RxJava
Category DEVStill trying to wrap around the basics.
Some high level vocabulary:
- Observable = iterator == the stream.
- Source / emitter == produce values (into an Observable) via onNext / onCompleted.
- Observer = consumer == callback (onNext / onError / onCompleted).
- "Subscribe to an observable using an observer".
⇒ This is my main grip with RxJava: the vocabulary is nonsensical and confusing. It means as soon as I stop using it I will get confused when I get back to it later.
The flow in a nutshell:
(emitter) ⇒ Observable ⇒ Observer[s].
But in fact it is implemented as such:
Observable.onSubscribe(emitter code) ⇒ Observer/Subscriber(receiving code)
with various intermediate steps in the stream.
A Disposable/Subscription represents the link from an Observer/Subscriber to an Observable/stream.
Core to remember:
Observable.{from|create}(source) ---> the observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(an observer).
RxJava2 has Observable vs Flowable:
- Observable = no back pressure.
- Observer type: #onNext / #onComplete / #onError / #onSubscribe(Disposable).
- Disposable: #dispose.
- Specialized types: Single, Completable, Maybe (types of Observable)
- Source: fromCallable(() -> return value),
- Source: Observable.create() using a "subscriber" with an "emitter" object.
- Flowable = has back pressure built-in.
- Subscriber type: #onNext / #onComplete / #onError / #onSubscribe(Subscription).
- Subscription: #cancel, #request.
RxStreams:
- Publisher: #subscribe(Subscriber)
- Subscriber: #onSubscribe(Subscription)
- Subscription: #request, #cancel
- Processor<T, R>: converts a Subscriber T into a Publisher R.
A few links:
- The missing intro: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
- RxJava Android tips: http://futurice.com/blog/top-7-tips-for-rxjava-on-android
Using it to replace an Event Bus:
- http://blog.kaush.co/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/
- https://lorentzos.com/rxjava-as-event-bus-the-right-way-10a36bdd49ba
Useful videos:
The most obvious pattern is to replace async tasks:
Observable.just( input data )
.map( async lambda, transforming input to output type )
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe( observer using the output )
To replace a timer task:
subscription = Observable.timer(delay, interval, TimeUnit.unit)
.take(20) / map, etc. # for example
.subscribe( observer doing something at interval )
subscription.unsubscribe() # equivalent to timer.cancel