Tutorial Creating Custom Rxjs Operators

RxJS is a popular library available for TypeScript and
JavaScript.

It provides APIs for the creation of applications and libraries using asynchronous streams of data and reactive methods.
It’s one of the foundation libraries of Angular.

Included in it are over 100 operators – functions that take an Observable stream of data and return values for use in
chains of operators.

Many of the operators are low level, and combining them through the pipe method they create a powerful way to work
with data.

Creating custom operators for a domain

The good news is it’s also very easy to create new higher-level operators for our domain code – these can be used where
you find duplicate or complicated operations.

Creating operators we can also ensure well-tested code
using marble testing and they can be shared among your team to make
your code more readable and stable.

There are two types of operators that can be created – a MonoTypeOperatorFunction and OperatorFunction and all
operators must do two things:

  • Return a function which accepts as its parameter a source from the previous Observable value in the stream
  • Return a value of the same type for MonoTypeOperatorFunction or different type for an OperatorFunction by using
    the source value with pipe

Below we’ll have an example of each, but first, to support creating the operators we need some code to simplify:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import { from } from 'rxjs';
import { map, tap } from 'rxjs/operators';

// Create a cold source that will emit each number
const source$ = from([1, 2, 3, 4, 5]);

// Create a cold source that multiplies each number by `5`
const multiplyByFive$ = source$.pipe(map(value => value * 5));
// Create a cold source that multiplies each number by `10`
const multiplyByTen$ = source$.pipe(map(value => value * 10));

// Subscribe to the sources and console.log the output
multiplyByFive$.pipe(tap(console.log)).subscribe();
// Output: `5, 10, 15, 20, 25`

multiplyByTen$.pipe(tap(console.log)).subscribe();
// Output: `10, 20, 30, 40, 50`

Creating MonoTypeOperatorFunction for single types

As the name suggests a MonoTypeOperatorFunction is a function that works with a single type of data – the input and
output value must be of the same type.

Looking at our code we can identify two multiplication operations in our code that are the same. To turn this into an
operator the function will look like this:

1
2
3
4
5
6
import { MonoTypeOperatorFunction } from 'rxjs';
import { map } from 'rxjs/operators';

export function multiply(factor: number): MonoTypeOperatorFunction<number> {
  return (source) => source.pipe(map(value => value * factor))
}

Here, we are returning an arrow function that takes the previous source – which must be an Observable<number>. The
source is piped to map which allows the source value to be converted to a new
value, in our case we multiply by the factor

TypeScript understands that the output must also be a number – and if you try to return another value type it will throw
a compile error.

Writing a marble test

Marble testing is a way to write tests for RxJS operators that deal
with data over time – data is not static due to its asynchronous nature and cannot always be guaranteed in a specific
order. Luckily the test for this operator is simple.

My personal preference has been to write these test in Jest using rxjs-marbles and jest-marbles, but there are other libraries to write these test available.

Using marbles, we can set up a mock source that will emit 5 numbers at the specified frames.

The test result contains two things:

  • A subscriptions string which is used to check that the operator handle subscription ending properly
    using toHaveSubscriptions
  • An output Observable that will contain the results of the operator and compared against the expectations
    using toBeObservable

In this test, we’ll pass a source of numbers and multiply by 10

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import { marbles } from "rxjs-marbles/jest";
import { map } from "rxjs/operators";
import { multiply } from './multiply'

describe("multiply", () => {
  it("should multiply by 10", marbles(m => {
    const input = m.hot('-a-b-c-d-e-|', {a: 2, b: 3, c: 4, d: 5, e: 6});
    const subs = '^----------!';
    const expected = m.cold('-a-b-c-d-e-|', {a: 20, b: 30, c: 40, d: 50, e: 60});
    m.expect(input.pipe(mul(10))).toBeObservable(expected);
    m.expect(input).toHaveSubscriptions(subs);
  }));
});

Update Code

Now the operator is created it can be used in the existing code from above – ideally the operator should be part of a
shared library of code:

1
2
3
4
5
6
7
import { from } from 'rxjs';
import { multiply } from '@myorg/rxjs-library'

const source$ = from([1, 2, 3, 4, 5]);

const multiplyByFive$ = source$.pipe(multiply(5));
const multiplyByTen$ = source$.pipe(multiply(10));

Already much more readable! Our code explains our intent, but we haven’t really reduced the duplication of our sources.

Changing the API with OperatorFunction

In our domain, we know we always want more than one value from a source and using the OperatorFunction we can use that
to reduce our duplicate code even more.

This would introduce an API change, but with proper tests, we should be able to migrate our code easily.

For our source value, it is still a single number value, but in the API we’ve changed:

  • The input factor can be a single value or an array of values
  • The return value is now an array of values, regardless of the input.

Instead of forcing the users to check the type of response, this single API can be well documented and expected when we
use it in our code:

1
2
3
4
5
6
import { OperatorFunction } from 'rxjs';
import { map } from 'rxjs/operators';

export function multiply(factor: number | number[]): OperatorFunction<number, number[]> {
  return source => source.pipe(map(value => (Array.isArray(factor) ? factor : [factor]).map(f => value * f)))
}

Updating the tests

First, we need to update the existing test – here we only have to change the values in our expected Observable – we
now expect an array of numbers regardless of the input – but with a single value our array length will be 1

1
2
3
4
5
6
7
it("should multiply by 10", marbles(m => {
  const input = m.hot('-a-b-c-d-e-|', {a: 2, b: 3, c: 4, d: 5, e: 6});
  const subs = '^----------!';
  const expected = m.cold('-a-b-c-d-e-|', {a: [20], b: [30], c: [40], d: [50], e: [60]});
  m.expect(input.pipe(mul(10))).toBeObservable(expected);
  m.expect(input).toHaveSubscriptions(subs);
}));

To ensure full coverage, we should also test for the case where were have an array input for the multiplication factor:

1
2
3
4
5
6
7
it("should multiply by by 5 and 10", marbles(m => {
  const input = m.hot('-a-b-c-d-e-|', {a: 2, b: 3, c: 4, d: 5, e: 6});
  const subs = '^----------!';
  const expected = m.cold('-a-b-c-d-e-|', {a: [10, 20], b: [15, 30], c: [20, 40], d: [25, 50], e: [30, 60]});
  m.expect(input.pipe(mul([5, 10]))).toBeObservable(expected);
  m.expect(input).toHaveSubscriptions(subs);
}));

Update Code

We can now update the code further – here we can now remove the two additional cold Observables and create a single one
using our new multiply operator, passing it an array containing out factors:

1
2
3
4
5
6
import { from } from 'rxjs';
import { multiply } from '@myorg/rxjs-library'

const source$ = from([1, 2, 3, 4, 5]);

const multiplyValues$ = source$.pipe(multiply([5, 10]));

Now we can subscribe to the multiplyValues$ source and get both our new result which contains the multiplication of
both numbers

1
2
multiplyValues$.pipe(tap(console.log)).subscribe();
// Output: `[5, 10], [10, 20], [15, 30], [20, 40], [25, 50]`

Next Steps

You can see a working version of this operator
on StackBlitz by opening the console to see the
result.

This operator is just a taste of what’s possible with RxJS – diving into the API you’ll find many more operators to help
work with data in other synchronous and asynchronous operations.

A collection of pre-built operators for your projects

The Rxjs Logo, A Ninja Jumping Over A Moon

Now for a shameless plug – my own library – RxJS Ninja – is a collection of over 130 operators for
working with various types of data (such as arrays
or numbers) and streams
allowing for modifying, filtering and querying the data.

Still in active development, you might find useful operators that provide clearer intent for your RxJS code.

You can check out the source code on GitHub. There you can also find
a starter project for creating your own TypeScript libraries like
this.

Load More Related Articles
Load More By john
Load More In Javascript

Leave a Reply

Your email address will not be published. Required fields are marked *