The place where random ideas get written down and lost in time.

2017-08-12 - Simple Rx

Category DEV

I should try to make my own little Rx clone lib.

There are a few good things in the RxJava lib but it also has these issues:

  • It does too much for my needs.
  • On paper, and when viewing a few first tutorials, it's all good. But then it breaks down. There's RxJava 1 vs 2. The vocabulary confuses me to no end. There's too much of an initial learning curve for what looks like a simple concept on the paper.
  • Example of vocabulary confusion:
    • An "observable" is actually a stream of events. Then call it a stream.
    • "Observers" actually subscribe to a stream. Then call them subscribers.
    • A source or an emitter is the same thing.
    • A consumer is an observer (subscriber).
    • Then RxJava 2 adds its own confusion by having Flowable vs Observable, and their version of the Observer is now called a Subscriber (as it should).
    • RxStream uses even different names: Publisher, Subscriber, Subscription, Processor.
  • The issue with this is the same as with languages like Ruby and Go: They are "almost" easy to get but yet different. When I move away from one for 6 months or a year, I need to go through the ropes again to get familiar with what I wrote a year ago. A "good" framework is something that is intuitive for me and I can just re-read my old code without wondering what it means or why I wrote it that way.

I'll use the RxStream terminology:

  • A Publisher is the source, emitter, generator. It takes a stream and publishes new events onto it.
  • A Subscriber is the reader, consumer, observer. It takes a stream and reads from it.
  • Schedulers (from RxJava) express which thread a publisher or subscriber runs onto.
  • A Stream is a pipe that sends events from the publisher(s) to the subscriber(s). It can be a 1-1 or a N-N combination.
  • Streams can be created empty and publishers and subscribers added to them in any order.
  • Streams are either open, pause or closed (aka "completed").
    • Closing is final. Once closed, publishing generates an error.
    • Subscribers can pause the stream, which blocks subscribers.
  • Processors are stream filters/maps. They take one or more streams and combine events  together into a new output stream.
    • Example: map(lambda X ⇒ Y), take(N), delay(N), merge().
  • Subscribers are notified when publishers are added or removed.
  • Subscribers are notified when publishers pause the stream.
  • Publishers are notified when subscribers are added or removed.
  • Streams are by default asynchronous and multithreaded.
    • When there are multiple publishers or subscribers, they could all be on different schedulers.

Canonical examples:

        S = Stream.create()  # can be exported via dagger

Or

        @Inject Stream S;

        S.pause(true);

        S.addSubscriber( … ).on( Scheduler );

        S.addPublisher( … ).on( Scheduler );

        S.addProcessor( map( … ) ).on( Scheduler );

        S.pause(false);

        S.isPaused();

        S.close();

Async task:

        Stream.on( Scheduler.io() )                # sets default scheduler unless overridden

                .publish( fixed data, e.g. some url )

                .publishOn( Schedulers.io() )  # affects the last added publisher

                .map( lambda worker: url ⇒ http request ⇒ body response )

                .processOn( Schedulers.io() )  # affects the last added processor

                .subscribe( subscriber lambda )

                .subscribeOn( Schedulers.androidMainThread() ); # affects last subscriber

[Update 2017-08-20]

Once this is implemented, I have one issue: should users be able to publish directly on the stream, or always via subscribers?

Example:

        _pub = MyCustomPublisher()

        _stream = Stream.on(Schedulers.io())

                .publishWith( _pub )

                .subscribe( SomeSubscriber() );

        _pub.publish(42);

        _stream.publish(43);

Now we have 2 ways to inject events in the stream: via the publisher and via the stream directly.

The latter overrides whatever behavior from the publisher: we might have used a Just() publisher that only provides its constructor arguments yet we can still add more.

In essence, the publisher does not control whether the stream is "read-only".

So a different strategy is:

  • Only Publishers can publish on a stream.
  • Users that hold a stream can't publish directly.

Furthermore, we're reducing the scope of each objects:

  • Stream methods only serve to configure the stream itself.
  • Publishers decide if they expose a public publish method.
    • When they don't, they become closed generators.


 Generated on 2025-01-09 by Rig4j 0.1-Exp-f2c0035