DEV Community

Bugfender
Bugfender

Posted on • Originally published at bugfender.com on

How to Start Programming With Reactive X and RxJava2

In the development world, we’re constantly being bombarded by new innovations with the potential to change our world. 5G, augmented reality, machine learning… there’s probably never been a more fertile and exciting time to be a developer.

But, of all the changes and breakthroughs we’re seeing explode before our eyes, none is more exciting than Reactive programming, an asynchronous philosophy which cuts across languages and platforms.

Reactive programming allows us to build our software in a completely agile way. Whenever a particular value changes, all related values change automatically, so there’s no need for the developer to go back and make all the changes themselves. This means that, after inputting a particular task or function, the developer can continue to the next one without waiting for the server to respond.

This has major practical implications for teams like ours. We used to work in an imperative, pull-based methodology, whereby we made some requests and waited for the server to respond. But Reactive programming enables us to work in a push-based way – when we make a request to the server, we know that the sender will send the response back, so we continue with other requests. If you’re constantly iterating at breakneck speed (as we are) this is a huge breakthrough.

ReactiveX (or Reactive Extensions to provide its full title) provides a library for modern reactive programming, so this is what we’ll be diving into today. Specifically, we’ll explain the various components you have to build to use ReactiveX effectively. You can use ReactiveX across both iOS and Android (another major benefit) but we’re going to focus on the RxJava2 variant, using Kotlin, for the sake of simplicity.

In Short

The article will cover:

  • The three main components of Reactive programming
  • How to build these components quickly and easily
  • How to prevent memory leaks
  • How to work with multi-threading

By the end of this article, you should have a solid grasp of the fundamentals of Reactive programming. You might even be ready to start practicing yourself.

Getting Started – Observables

The Reactive philosophy is based on three key elements: observables, subscribers and operators. Let’s look at the first of these.

Essentially, the observable emits data which we want to update or mutate – in other words, create an object from the original, modified to provide different behaviors. The operator determines exactly how the data is modified. Finally, the subscriber reads the data that the observable is emitting, or pushing, and reacts in one of three ways: either by moving to the next stage of the process, signalling an error or marking the process complete.

To begin, let’s create a simple observable:

val observable =
  Observable.create<String> { emitter ->
    emitter.onNext("Hello Rx!")
   emitter.onComplete()
  }

This is the most basic way to create an Observable, It emits the message “Hello Rx!” and then completes.

Now add operators

Operators work by taking one observable and creating another one. This allows us to apply operators in a chain, each one building on the next.

As we’ve said, the most simple observable is “Hello Rx!”. But here are some others:

  • Empty** / *Never* / ***Throw* -> To create an observable that completes, that won’t emit anything but will never complete and another that will throw an error calling onError respectively.
  • From -> To create observables from other objects like iterables, futures, promises and callbacks.
  • Interval To create an observable that emits a sequence of integers spaced by a particular time interval.

If you want to see other operators, this ReactiveX document provides a whole bunch of them.

To see how an operator works in practice, let’s look at the map function_,_ which you can create like this.

callToServer()
  .map { response -> response.contentLength() }
  .map { size -> size.toString() }
  .subscribeBy { print(it) }

The map function takes the server’s response and returns its length. Then another map function converts that length into a string at the end of the stream; we just log the size as a result of the whole transformation flux.

We can chain as many operators as we want, polishing the resulting output to the subscriber.

And finally… subscribers

Now, we need a subscriber that consumes and reacts to the data. Thanks to Kotlin (and most of the languages that carry its Rx equivalent) we can implement an anonymous subscriber in the form of a lambda function (a small autonomous function you can read more about here).

observable
  .subscribeBy { item -> print(item) } 

As we’ve said, the subscriber has three different possible actions: onNext, onComplete, and onError. All three are implemented as lambda functions.

We can create the actions like this:

observable.subscribeBy(
  onNext = { item -> print(item) },
  onError = { },
  onComplete = { print("Completed!") })

When the subscriptions are in place, the observable will automatically call the subscribers onNext and then onComplete (notice that in this case onError won’t be called because any exception can be thrown).

As a result, the subscriber (which implemented anonymously) will call out “Hello Rx!” and then “Completed!” Then, it will terminate.

Reactive programming is based on a simple set of core celemenets.

Disposals

Each time we create a subscription an object called a disposable is created automatically. This can cancel the subscription at any point.

This is vital because sometimes we need to cancel a subscription when it threatens to clog up memory – for example when it’s holding a reference over a class that is waiting to be removed (like the garbage collector in java). Until the subscription is disposed of, the project can’t be destroyed, and if the process lives on it can create a memory leak.

The subscribe function outputs a Disposable object that carries a dispose function, which cancels the subscription. The framework also provides a CompositeDisposable, a collection that holds a bunch of disposables and can dispose of them all in a row.

Best of all, the disposable also provides a function called clear, which, as the name suggests, clears the collection of disposables – wiping them completely so a new set can be created. This is a great option if we are destroying the whole object where the CompositeDisposable is initiated.

Scheduling

One of the most useful features that ReactiveX provides is a simple way to work with concurrency, using schedulers – an abstract way to achieve threading management. ReactiveX is not only thread-safe; it also allows programmers to jump over various schedulers with a single operator. Different mechanisms include the current thread, dispatch queues, operation queues, new threads, thread pools, Executors, etc…

There are two main operators that work with schedulers, observeOn and subscribeOn. observeOn is the much more common of the two: if you want to perform work on a different scheduler, this is the one you’re most likely to use. In the event that observeOn isn’t explicitly specified, work will be performed on whichever thread/scheduler elements are generated.

There are a bunch of already implemented Schedulers that we can use instead of implementing our own.

  • Schedulers.io() -> Used to run IO blocking operations like database/network calls
  • Schedulers.computation() -> Used to perform long computational works or callbacks (Like providing data from sensors and transforming data)
  • Schedulers.newThread() -> It creates a new thread for each new execution
  • Schedulers.trampoline() -> This queue works and executes the schedulers in a FIFO manner on one of the participating threads.

Following the most recent example shown, we can add schedulers this way:

callToServer()
  .map { response -> response.contentLength() }
  .map { size -> size.toString() }
  .observeOn(Schedulers.io())
  .subscribeOn(AndroidSchedulers.mainThread())
  .subscribeBy { print(it) }

As you can see we are running the network call into a specific I/O thread and then we are running the code on the subscriber in the main thread. We’ve used AndroidSchedulers.mainThread() because we were using Android Studio and Kotlin for this example but each language/framework has its own MainThread scheduler.

Next steps

Ok, we’ve reached the end of the beginning. If you like what you’ve read here and want to dive deeper into Reactive programming, there are plenty of resources you can use.

The ReactiveX community page contains a trove of useful documents and has really helped us in the past. You can also check out the GitHub page and RxMarbles page for a more abstract but graphic documentation.

One more thing…

Given you’ve got this far, we thought you wouldn’t mind too much if we gave a little plug to Bugfender, our remote logging tool with in-built reporting and in-app feedback. Bugfender allows you to track logs of apps built using RxJava2, and its easy-to-use cloud-based console is perfect for those just getting to grips with this form of programming. For more information go to https://bugfender.com/.

Top comments (0)