The place where random ideas get written down and lost in time.
2017-08-12 - Simple Rx
Category DEVI should try to make my own little Rx clone lib.
There are a few good things in the RxJava lib but it also has these issues:
- It does too much for my needs.
- On paper, and when viewing a few first tutorials, it's all good. But then it breaks down. There's RxJava 1 vs 2. The vocabulary confuses me to no end. There's too much of an initial learning curve for what looks like a simple concept on the paper.
- Example of vocabulary confusion:
- An "observable" is actually a stream of events. Then call it a stream.
- "Observers" actually subscribe to a stream. Then call them subscribers.
- A source or an emitter is the same thing.
- A consumer is an observer (subscriber).
- Then RxJava 2 adds its own confusion by having Flowable vs Observable, and their version of the Observer is now called a Subscriber (as it should).
- RxStream uses even different names: Publisher, Subscriber, Subscription, Processor.
- The issue with this is the same as with languages like Ruby and Go: They are "almost" easy to get but yet different. When I move away from one for 6 months or a year, I need to go through the ropes again to get familiar with what I wrote a year ago. A "good" framework is something that is intuitive for me and I can just re-read my old code without wondering what it means or why I wrote it that way.
I'll use the RxStream terminology:
- A Publisher is the source, emitter, generator. It takes a stream and publishes new events onto it.
- A Subscriber is the reader, consumer, observer. It takes a stream and reads from it.
- Schedulers (from RxJava) express which thread a publisher or subscriber runs onto.
- A Stream is a pipe that sends events from the publisher(s) to the subscriber(s). It can be a 1-1 or a N-N combination.
- Streams can be created empty and publishers and subscribers added to them in any order.
- Streams are either open, pause or closed (aka "completed").
- Closing is final. Once closed, publishing generates an error.
- Subscribers can pause the stream, which blocks subscribers.
- Processors are stream filters/maps. They take one or more streams and combine events together into a new output stream.
- Example: map(lambda X ⇒ Y), take(N), delay(N), merge().
- Subscribers are notified when publishers are added or removed.
- Subscribers are notified when publishers pause the stream.
- Publishers are notified when subscribers are added or removed.
- Streams are by default asynchronous and multithreaded.
- When there are multiple publishers or subscribers, they could all be on different schedulers.
Canonical examples:
S = Stream.create() # can be exported via dagger
Or
@Inject Stream S;
S.pause(true);
S.addSubscriber( … ).on( Scheduler );
S.addPublisher( … ).on( Scheduler );
S.addProcessor( map( … ) ).on( Scheduler );
S.pause(false);
S.isPaused();
S.close();
Async task:
Stream.on( Scheduler.io() ) # sets default scheduler unless overridden
.publish( fixed data, e.g. some url )
.publishOn( Schedulers.io() ) # affects the last added publisher
.map( lambda worker: url ⇒ http request ⇒ body response )
.processOn( Schedulers.io() ) # affects the last added processor
.subscribe( subscriber lambda )
.subscribeOn( Schedulers.androidMainThread() ); # affects last subscriber
[Update 2017-08-20]
Once this is implemented, I have one issue: should users be able to publish directly on the stream, or always via subscribers?
Example:
_pub = MyCustomPublisher()
_stream = Stream.on(Schedulers.io())
.publishWith( _pub )
.subscribe( SomeSubscriber() );
_pub.publish(42);
_stream.publish(43);
Now we have 2 ways to inject events in the stream: via the publisher and via the stream directly.
The latter overrides whatever behavior from the publisher: we might have used a Just() publisher that only provides its constructor arguments yet we can still add more.
In essence, the publisher does not control whether the stream is "read-only".
So a different strategy is:
- Only Publishers can publish on a stream.
- Users that hold a stream can't publish directly.
Furthermore, we're reducing the scope of each objects:
- Stream methods only serve to configure the stream itself.
- Publishers decide if they expose a public publish method.
- When they don't, they become closed generators.