DEV Community

Cover image for Observable Operators: Merge & Concat
Parwinder 👨🏻‍💻
Parwinder 👨🏻‍💻

Posted on

Observable Operators: Merge & Concat

Merge

Merge as the name suggests merges all given input Observables into output Observables without doing any operations on the input. The output Observable will complete when all the input Observables complete. If any input Observables have an error, that error is sent to the output Observable immediately.

import Rx from 'rxjs';

const interval1 = Rx.Observable.interval(1000).map(i => `first: ${i}`);
const interval2 = Rx.Observable.interval(500).map(i => `second: ${i}`);

const combinedInterval = Rx.Observable.merge(interval1, interval2).take(10);

combinedInterval.subscribe(
    data => console.log(`${data}`)
);

We have combined two interval Observables into a combinedInterval Observable. The first interval will output every second and the second one will do it every half second. The output will be:

second: 0
first: 0
second: 1
second: 2
first: 1
second: 3
second: 4
first: 2
second: 5
second: 6

merge also allows you to specify the concurrency of merged Observable. For example, if I combine 3 interval based Observables but I only want two of them to run at a time, I can pass a schedule(like) parameter.

import Rx from 'rxjs';

const interval1 = Rx.Observable.interval(1000).map(i => `first: ${i}`).take(10);
const interval2 = Rx.Observable.interval(500).map(i => `second: ${i}`).take(5);
const interval3 = Rx.Observable.interval(500).map(i => `third: ${i}`).take(10);

const combinedInterval = Rx.Observable.merge(interval1, interval2, interval3, 2); // 2 at a time

combinedInterval.subscribe(
    data => console.log(`${data}`)
);

First and second Observable run concurrently until the five values from the second Observable has been emitted. Then the third Observable joins the first one in emitting values. The output will look like:

second: 0
first: 0
second: 1
second: 2
first: 1
second: 3
second: 4
first: 2
third: 0
third: 1
first: 3
third: 2
third: 3
first: 4
third: 4
third: 5
first: 5
third: 6
third: 7
first: 6
third: 8
third: 9
first: 7
first: 8
first: 9

Concat

Concat is slightly different from the merge. Both combine the Observables, but concat waits for one Observable to complete before it starts the next one. All the Observables concatenated run in sequence. Whereas in case of a merge, all of them run but provide a single stream of output.

import Rx from 'rxjs';

const interval1 = Rx.Observable.interval(1000).map(i => `first: ${i}`).take(5);
const interval2 = Rx.Observable.interval(500).map(i => `second: ${i}`).take(5);

const combinedInterval = Rx.Observable.concat(interval1, interval2);

combinedInterval.subscribe(
    data => console.log(`${data}`)
);

So concat will combine the output but maintain the output order. The console log will be:

first: 0
first: 1
first: 2
first: 3
first: 4
second: 0
second: 1
second: 2
second: 3
second: 4

Top comments (0)