< return home

Observables

This article was originally published on #dev-a-day.

Reactive programming is a paradigm that focuses on asynchronous data flow over time. It's well-fitted to any application that deals with network requests, user input, or any sort of input that comes at varying intervals over time. There are a few different established patterns for dealing with this sort of input, and you're likely already familiar with at least one.

Callbacks

The original solution to this was to use callback functions. Let's look at an example of this:

function loadData(path, callback) {
  try {
    const data = /* load some data from `path` */;
    callback(data, null);
  } catch (err) {
    callback([], err);
  }
}

loadData('/api/users', (users, err) => {
  // do something with `users`
});

This works alright for one-time use, but you quickly run into a problem known as callback hell:

loadData('/api/users', (users, err) => {
  loadData('/api/posts', (posts, err) => {
    loadData('/api/comments', (comments, err) => {
      // as you can imagine, it only gets worse from here
    });
  });
});

Promises

The first widely adopted solution to this came in the form of Promises, originally provided via 3rd party libraries like Bluebird and Q; they've since been incorporated into JS core with ES6. Functions that return a Promise can be chained with a .then() call to handle data returned via resolve and optionally a .catch() call to handle errors throw with reject:

function loadData(path) {
  return new Promise((resolve, reject) => {
    try {
      const data = /* load some data from `path` */;
      resolve(data);
    } catch (err) {
      reject(err);
    }
  });
}

loadData('/api/users')
  .then(users => {
    // do something with users
  })
  .catch(err => {
    // handle the err
  })

While this doesn't entirely avoid callback hell, it at least flattens it out:

loadData('/api/users')
  .then(users => {
    return new Promise((resolve, reject) => {
      loadData('/api/posts')
        .then(resolve)
        .catch(reject);
    });
  })
  .then(posts => {
    return new Promise((resolve, reject) => {
      loadData('/api/comments')
        .then(resolve)
        .catch(reject);
    });
  })
  .then(comments => {
    // do something with `comments`
  });

This opens up a new problem with variable scoping, though: when chaining Promise calls, you have to return every variable you want to use later on down the chain, even if you don't use it in the directly following call:

loadData('/api/users')
  .then(users => {
    return new Promise((resolve, reject) => {
      loadData('/api/posts')
        .then(posts => resolve([users, posts]))
        .catch(reject);
    });
  })
  .then(([users, posts]) => {
    return new Promise((resolve, reject) => {
      loadData('/api/comments')
        .then(comments => resolve([users, posts, comments]))
        .catch(reject);
    });
  })
  .then(([users, posts, comments]) => {
    // do something with `users, `posts, and `comments`
  });

Asynchronous Functions

Our code is less nested but more needlessly complex. Enter ES8's async/await:

function loadData(path) {
  return new Promise((resolve, reject) => {
    try {
      const data = /* load some data from `path` */;
      resolve(data);
    } catch (err) {
      reject(err);
    }
  });
}

async function run() {
  try {
    const users = await loadData('/api/users');
    const posts = await loadData('/api/posts');
    const comments = await loadData('/api/comments');
  } catch (err) {
    // handle `err` (thrown by any of the 3 Promises)
  }
}

run();

Asynchronous functions are an abstraction built on top of Promises, and as you can see often drastically simplify code structure. However, they're still only well-suited to one-shot asynchronous actions such as a single network request. What if we wanted to open a channel and listen for many inputs over time?

Observables

Observables provide an idiomatic solution to handling continuous input streams using a pattern that will be immediately famililar to anyone who's used a Promise. They have not been implemented in native JS, so to use them you'll need to pull from a reactive programming library like RxJS. Here's what an Observable looks like in action:

import { Observable } from 'rxjs';

const obs = new Observable(observer => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

obs.subscribe(value => {
  console.log(value);
});
// 1
// 2
// 3

An Observable's .subscribe() method works just like a Promise's .then(), except it's invoked each time the Observable pushes a new value with .next(). RxJS provides some useful helper methods to build Observables for you from existing data sources, both synchronous and asynchronous. Let's look at of:

import { of } from 'rxjs';

const obs = of(1, 2, 3);

obs.subscribe(value => {
  console.log(value);
});
// 1
// 2
// 3

of will create an Observable from any iterable collection that will emit each value of the collection. When calling .subscribe() on an Observable, you can subscribe a few different ways:

import { of } from 'rxjs';

const obs = of(1, 2, 3);

// single callback on `.next()`
obs.subscribe(val => /* do something with val */);

// multiple callbacks for `.next()`, `.error()`, and `.complete()`
obs.subscribe(
  val => /* do something with `val` */,
  err => /* handle `err` */,
  () => /* Observable completed */,
);

// observer Object with `next`, `error`, and `complete` props
obs.subscribe({
  next: val => /* do something with `val` */,
  error: err => /* handle `err` */,
  complete: () => /* Observable completed */,
});

If you're already working in a project that relies on Promises and want to try out Observables, just wrap them with RxJS's from.

import { from } from 'rxjs';

const obs = from(loadData('api/users'));

obs.subscribe({
  next: users => /* do something with `users` */,
  error: err => /* handle `err` */,
});

Unicast vs. Multicast Observables

Standard Observables are unicast, which means each subscriber created by calling .subscribe() on the Observable gets a new instance of the Observable. When running synchronous, repeatable Observables like we've used above, this is usually fine.

import { of } from 'rxjs';

const obs = of(1, 2, 3);

obs.subscribe(val => console.log('first', val));
obs.subscribe(val => console.log('second', val));
// first 1
// first 2
// first 3
// second 1
// second 2
// second 3

Subjects

In the Real World™, this is rarely the intended result. If we want to hook up multiple subscribers, we generally want to have each subscriber react to every next output at the same time. To have multiple subscriptions hooked up to the same source Observable, we need to use a Subject. RxJS Subjects accept inputs via their .next() method and in turn send these inputs along to any number of subscribers in tandem. Subjects may also be used as a subscriber for an Observable like so:

import { from, Subject } from 'rxjs';

const sub = new Subject();

sub.subscribe(val => console.log('first', val));
sub.subscribe(val => console.log('second', val));

const obs = from(1, 2, 3);
obs.subscribe(sub);
// first 1
// second 1
// first 2
// second 2
// first 3
// second 3

To get this effect from an existing Observable, we have to multicast that observable Observable by piping its outputs through a Subject:

import { of } from 'rxjs';

const obs = of(1, 2, 3);
const sub = new Subject();
const multicasted = obs.multicast(sub);

sub.subscribe(val => console.log(val));

multicasted.connect(); // I'll explain this in a sec

Just remember - if you only care about one subscriber or each subscriber in exlusion, use an Observable. If you want to have your output synced between multiple subscribers in real time, use a Subject.

Hot vs. Cold Observables

By default, Observables are cold - they won't begin outputting values until the first observer subscribes. Often this is best, since any values output before an observer had subscribed would essentially be thrown away. In our previous example, we had to call .connect() on a multicasted Observable before it would begin emitting values to subscribers. We have to do this because there is an inherit disconnect within multicasted Observables and their Subject. Observers don't subscribe directly to the source Observable, but instead subscribe to the multicasted Subject. As a result, the source Observable never sees an observer subscribe, and since it's cold, it will never start emitting values. Calling .connect() on a multicast Subject will trigger its source Observable and have it start emitting values.

This behavior may be desirable in certain situtations, as we'll see in a moment. Observables that begin emitting values upon instantiation instead of waiting for an observer to subscribe are referred to as hot. So, when may this be useful?

Replay Subjects

Say we want to keep a record of all messages sent over a channel, but we may not care to view them until the user navigates to the route responsible for displaying these messages. Instead of hooking up a distinct service that subscribes and records these messages, we can hand that function over to RxJS in the form of a ReplaySubject:

import { from, ReplaySubject } from 'rxjs';
import { getMessagesForChannel } from '../api';

const obs = from(getMessagesForChannel('channel'));
const sub = new ReplaySubject();
obs.subscribe(sub);

// channel emits 'foo'
// channel emits 'bar'
// channel emits 'baz'

// sometime later, once the user navigates to `/messages`

sub.subscribe(message => {
  console.log(message);
});
// immediately:
// foo
// bar
// baz

Though the observer didn't subscribe until after the channel had emitted 3 values, it's callback is immediately invoked 3 times as sub replays every input it had previously received.

Behavior Subjects

Similarly, you may want to get the last emitted value from a hot Subject when you subscribe. Imagine a Subject that emits the color theme for a website whenever the user changes to a new theme - any components that want to resopnd to this change would need to immediately get the most recent theme value on mount, and then listen for future changes and respond accordingly. This may be accomplished using a BehaviorSubject:

import { from, BehaviorSubject } from 'rxjs';
import { getTheme } from '../services/theme.service';

const obs = from(getTheme());
const sub = new BehaviorSubject();
obs.subscribe(sub);

// user changes theme to 'blue'
// user changes theme to 'red'

// sometime later, when a new component is mounted

sub.subscribe(theme => {
  console.log(theme);
});
// immediately:
// red

Conclusion

RxJS comes packed with a ton of useful features for working with input streams, and today we've only looked at a small subset of those features. If you're working on a project that deals heavily with asynchronous streams, you should absolutely consider implementing Observables today!

ramblings by Aaron Ross, otherwise known as superhawk610
> ...
© 2023 all rights reserved
built with Gatsby