REGISTER

Manage Async Code Like a PRO with RxJS

Paweł Trysła
2017-06-13

blog content

Need help with React Native?
hire us
Our React Native EU Conference is back
register nowlearn more

If you’ve ever struggled with managing async code or felt that Promises lacks essentials features like cancellation, better composition or in general are feature-poor, here comes the Saviour — Observables and RxJS.

What is Observable?

Observable itself is a new type introduced by proposal-observable as a part of ECMAScript specification. Since this is a proposal, you can’t use it right now, without any Observable library.

In this article, we will focus on the most popular implementation of Observables — RxJS, so quick disclaimer:

Observables, operators and implementation details mentioned in this article apply to RxJS only.

You should treat Observable the same as any other type like Array or Promise. You can instantiate it, modify, pass and whatever you like.

You can also treat Observables as a Set of (usually async) events, meaning Observable can emit multiple values (events) over time. This is one of the most significant differences between Promises — once Promise resolves, it’s done, at this point it’s just a wrapper around the data. You can attach <rte-code>then<rte-code> to it afterwards, but you will always get the same value. The second huge difference is that Observables are lazy/cold, whereas Promises are eager/hot. It means that once Promise is created, it must either resolve with a value or throw an error. On the other hand, Observables won’t do anything unless you subscribe to them (we will talk about it later).

Despite the differences, Observables and Promises also share some similar features. One of them is notification channels. With Promises, you have 2 channels: resolve and error. You can add a handler function to resolve channel using <rte-code>then<rte-code>, which will get called when Promise resolves with a value, and error handler to error channel using <rte-code>catch<rte-code>.

Observables, also implements those 2 channels named nexterror, but also have another one  — completeNext and error channels are analogous to Promise’s resolve and errorComplete channel is fired only when Observable is done emitting values. You can think of Promise’s resolve channel as a combination of next and complete  —  Promise can’t resolve multiple times so, when you get value it also completes.

Creating Observable

Let’s see how we can create an Observable instance:

Simple Observable creation

In this example, we use <rte-code>Observable.create<rte-code> factory provided by RxJS, which accepts a function. From within, we have access to <rte-code>observer<rte-code> object, on which we call <rte-code>next<rte-code> method passing values to be emitted by the Observable. In this case, we pass numbers from <rte-code>0<rte-code> to <rte-code>4<rte-code>, each one within 1s interval.

$ is a common postfix to distinct a stream of data/events.

As I mention earlier, Observables are cold, thus you won’t see <rte-code>console.log<rte-code> being called, if you try to run this example.

Transformation and composition

RxJS gives you an extremely powerful toolset to transform or compose your Observables. Those tools are called operators. Remember, when you wanted to change every element of an array? You would use Array#map method. You can do the same with Observables!

Simple filter and map operations

The code above will first check if each emitted value from <rte-code>event$<rte-code> fulfils the function we passed to <rte-code>filter<rte-code> operator. If so, it will map this value to a string. Every value, which doesn’t fulfil <rte-code>filter<rte-code>’s function will be discard. This highlights another useful feature — output Observable doesn’t have to emit the same amount of values as the input one does. In other words:

Input/output ratio of emitted values doesn’t have to be 1.

Note that we assigned results of those operations to a new variable. It’s because, each operator returns a new Observable — all operators are pure, same as <rte-code>Array#map<rte-code> and <rte-code>Array#filter<rte-code>.

Subscription

Up this point our Observable won’t do anything, since we haven’t yet subscribed to it. Let’s do it now:

Simple subscription with next handler only

As you can see, we need to call <rte-code>subscribe<rte-code> method on our Observable. It’ll make it hot, thus it will start emitting values. In console you will see:

<rte-code>subscribe<rte-code> method also returns a subscription object, which you can use to unsubscribe (stop observable execution or free resources) by calling <rte-code>subscription.unsubscribe()<rte-code>.

In this example, we’ve only passed a single function to subscribemethod, however it’s worth knowing, that you can also pass error handler and complete handler:

Next, error and complete handlers

What exactly are the operators?

Operators are nothing more than a pure functions, which take input/current Observable and return a new Observable with applied modifications thus they are chainable. Each operator can belong to 1 category out of 9. You can see all of them here, but the most commonly used are:

  • creation operators
  • transformation operators
  • filtering operators
  • combination operators
  • error handling operators

Each operator regardless of it’s category can be:

  • instance operator — uses Observable instance to which they’re bound, in other words, they use <rte-code>this<rte-code> keyword to refer to input Observable
  • static operators — aren’t bound to any specific Observable instance, instead of using <rte-code>this<rte-code>, they use first argument as an input Observable

Almost all creation operators are static, whereas the rest are instance operators. However, some of them such as <rte-code>concat<rte-code> or <rte-code>merge<rte-code> have both instance and static versions. You can read more on that matter here.

RxJS gives you ~120 operators to use, which is an overwhelming amount, but don’t worry most of them are just more specific variants of another or are used in very specific cases. Take for example <rte-code>map<rte-code> operator, if you know what it does, you also know what <rte-code>mapTo<rte-code> does — same as <rte-code>map<rte-code>, it projects value to another, but instead of accepting a function to get the new value, it uses the same single one every time.

Here’s the list of the most common operators, that you should get familiar with before diving deeper:

If you ever stuck, wondering which operator to use, here is extremely useful Q&A based guide to help you find the exact one.

Error handling and chains isolation

Let’s talk a bit more about error handling. Usually, while developing your app, you have to deal with 3rd party modules or need to fetch some data from remote server. Regardless of what you need to accomplish, the code can always throw an error, which will cause an Observable to close itself and run some teardown logic, thus no events will be able to go through it! Once the error is throw, the Observable is done aka dead.

Sometimes, this kind of behaviour is desired. In other cases, you need to handle errors properly. So how do you do that? Let’s first look at the example:

Naive implementation of error handling

As we can see, this Observable should make an Ajax call on specific event — buttonClicked. If the request fails, the thrown error will be handled by catch operator. Even though, the error’ed Observable (userInput$) is replaced by a new Observable.of('ajaxFailed'), the error will still cause it to close. No events will go through it anymore.

To fix this problem, we need chains isolation:

Proper implementation of error handling

We just moved <rte-code>catch<rte-code> into the inner chain created by <rte-code>ajax.getJSON<rte-code>. Now, if the error is raised, we replace it with a new Observable (in the inner chain), so in every case, the final output of the inner Observable chain will be a non-error’ed Observable.

Most of the time, you don’t want your Observable to die, so just isolate the error-prone chain from the main one, and add <rte-code>catch<rte-code> to it.

Observables can be also synchronous

Although, Observables are extremely useful at managing asynchronous code, like WebSockets, events or user input, they can also be beneficial for synchronous tasks. Suppose, you have some big data set like an array. Observables might come in handy, where you need multiple <rte-code>Array#filter<rte-code> and <rte-code>Array#map<rte-code> operations to be chained together.

Consider the following example:

Modify large data set using array methods vs RxJS operators

Both variants will produce the same result, however Observables will be more performant and won’t create intermediate arrays. It’s due to the fact that, all RxJS operators will be applied in single iteration, whereas Array implementation will go through whole array to filter elements, then again over the intermediate array created by Array#filter method.

What’s next?

In case, you want to broaden your knowledge about RxJS and Observables, I strongly suggest to watch RxJS 5 Thinking Reactively and Advanced RxJS: State Management and Animations, both done by Ben Lesh or to read a RxJS documentation.

In the next article, we will talk how to use RxJS to manage side effects in Redux app.

Author:
Paweł Trysła
Fullstack Developer, the tooling guy with extensive knowledge of Webpack and Babel. Huge fan of Electron, functional programming and RxJS.
arrow icon
MORE posts from this author

learn more

More posts from this category

stay tuned

Subscribe to our newsletter

You may unsubscribe from these communications at any time. For details see the Privacy Policy.