RxJS - the mergeMap Operator
Do you ever program in TypeScript? Do you use the Promise
API? Have you ever used the new async/await
functions in
TypeScript (or JavaScript) with inner-asynchronous request code? And have you
ever wondered how you can achieve similar results using RxJS? In this
post, we’ll explore how to take advantage of RxJS operators as part of an
asynchronous workflow, and how to reduce the complexity of having inner
subscriptions.
Introduction
A long time ago, when the TypeScript tools were still fairly young, I was doing a bunch of work with JavaScript Promises. Promises are a form of monad. In general, what a promise means in programming is:
- The value of the Promise is not (usually) known at the time of creation
- The value of the Promise is bound exactly once (either in the form of success, or an error)
- Obtaining the value of a Promise requires some form of “unwrapping” operation
If you’re a .NET developer, and haven’t started working in JavaScript or TypeScript yet, then take note that JavaScript Promises are analogous to the Task<T> class.
The act of “unwrapping” a Promise usually means one of two things:
- Registering a continuation function
- Blocking, and waiting on the original asynchronous operation to complete
From this point forward, I will only use TypeScript to refer to items unique to TypeScript. Anything that I say about JavaScript is generally applicable to TypeScript, because TypeScript is a superset of JavaScript.
Before async/await made it into the JavaScript language specification, and prior to implementation inside the TypeScript compiler, I used to write a lot of code similar to this:
export function myAsyncFunction(): Promise<string> {
const promiseReturningHttpClient = new HttpClient();
return promiseReturningHttpClient.get(
'/app/data/users?firstName=John&lastName=Doe').then((data) => {
return promiseReturningHttpClient.get(
`/app/data/users/${data.userId}/colors`).then((colors) => {
Promise.resolve(colors[0] || '');
});
});
}
This is what some people referred to as “callback hell,” for obvious reasons. There are only two callbacks here; I’m sure you can imagine how much worse it would be with - say - four levels of nesting.
Now, let’s consider what this looks like when using the async/await keywords:
export async function myAsyncFunction() {
const promiseReturningHttpClient = new HttpClient();
const userData = await promiseReturningHttpClient.get(
'/app/data/users?firstName=John&lastName=Doe');
const colorData = await promiseReturningHttpClient.get(
`/app/data/users/${data.userId}/colors`);
return colorData[0] || '';
}
That’s a much better async workflow than the first one I wrote. It’s fairly easy to understand, and there’s not a whole lot of complexity buried in there.
Now, Promises are great: Angular provides great support for asynchronous module initialization, and mandates that you give it a Promise (not an Observable) if you want to take advantage of that feature (I personally use it quite extensively). But Promises aren’t the only way to asynchronously program. In fact, for as standard and wonderful as Promises are, I’d still much rather use RxJS Observables: they’re composable, they’re pipable, and if you for whatever reason need a Promise, they can be easily converted to Promises when an API’s contract is that you will provide a Promise. Let’s dig in a bit to see what I’m talking about.
Introducing the mergeMap Operator
RxJS provides this most fundamental API that has significantly simplified a lot
of the work that I’m doing in the Angular world. Frequently, we’ll have a set
of REST endpoints that we want to get data from. We’ll have an async workflow
that depends on multiple asynchronous operations before anything meaningful or
useful can be performed. RxJS provides us with the mergeMap
operator, which greatly reduces a lot of this complexity.
What mergeMap
allows you to do is create and complete observables as part
of your asynchronous pipeline. Continuing with the previous example, let’s now
assume that HttpClient.get()
returns an Observable, instead of a Promise:
export function myAsyncFunction(): Observable<string> {
const client = new HttpClient();
const request = client.get('/app/data/users/?firstName=John&lastName=Doe');
return request.pipe(
map(value$ => `/app/data/users/${value$.id}/colors`), // Create a url
mergeMap(url$ => client.get(url$)), // Returning our observable
map(value$ => value$[0] || '') // Performing our selection operation
);
}
In this example, mergeMap
is used to take the url$
constructed in the
previous stage, send an HTTP request, and return the results to the pipeline.
The mechanics of this API also take care of unwrapping the value on our behalf,
and placing the asynchronously-obtained values back into the pipeline.
Summary
I love RxJS and it’s various pipeline capabilities. The fact that we can begin
an asynchronous operation, manipulate the resulting values, start additional
asynchronous work, which is automatically unwrapped for us, and continue
manipulating future results, is really exciting to me. I recently migrated a
project that was using Promises, from before there was async/await support in
JavaScript, and decided to embrace the use of Observables. I hit a few snags
where I couldn’t figure out what was going on with my use of RxJS, and fell
back to creating Promises out of my Observables. Then I discovered the
composability of Observables, and started employing the Observable.pipe()
operator everywhere I could. I haven’t looked back since. It has very concisely
framed all of my asynchronous code, and in the oft-chance that I need to
refactor some small part, it’s very easy to lift parts out of a pipeline,
re-invoke the call, and create a new pipe with the original parts.
Thanks for reading this post. As always, I hope something I’ve presented here helps you on one of your projects, or inspires you to open your editor and start a new one!