DEV Community

Cover image for Observables with "forward" and "reverse" pipelines
Dario Mannu
Dario Mannu

Posted on • Edited on

Observables with "forward" and "reverse" pipelines

If you know observables you know the pipeline pattern, which is the composition of a high-level operation by adding processing steps, defined as functions.

In RxJS this is accomplished with the pipe() function (not the .pipe method) as follows:

import { pipe } from 'rxjs';

const toUpperCase = () => pipe(
  map((s: string) => string.toUpperCase()),
  // maybe other steps...
);
Enter fullscreen mode Exit fullscreen mode

The above returns a custom operator that can be embedded in other pipelines:

const pipeline2 = pipe(
  filter(),
  toUpperCase(),
);
Enter fullscreen mode Exit fullscreen mode

Otherwise, it can be used to perform a partial application for streams:

const stream = // ... whatever;
const piped = toUpperCase(stream);

piped.subscribe(...);
Enter fullscreen mode Exit fullscreen mode

Plucking

Let's examine this pattern in the context of a UI component:

const PostTitle = pipe(
  map(post => post.title)
);

const Component = () => {
  const data = fetch('/api').then(r=>r.json());

  return rml`
    <div>${PostTitle(data)}</div>
  `;
}
Enter fullscreen mode Exit fullscreen mode

In this example, the expression PostTitle(data) returns a new stream that goes through the given pipeline to extract the title from the response.

If we had a proper pipeline operator in JS, it could even look like:
<div>${data |> PostTitle}</div>.


Form submission handling

Now, let's examine a different case.

We have a form. On click we want to feed a stream the value of its text box.
We also want to keep a good separation of concerns.

In order to keep the stream clean, as a pure stream of strings, regardless of where those come from, we want make use of the Event Adapter pattern: a translator that takes events from their original form and emits exactly the type of data we need.

The implementation of an Event Adapter for streams can be in fact done with a reverse pipeline, which is a pipeline that unlike the ones above, defines the processing steps at the input of a given stream:

const Component = () => {
  // We start creating a stream of strings
  const stream = new Subject<string>();

  // Then we create an event adapter
  const AsText = inputPipe(
    map((e: Event) => e.target.box1.value)
  );

  return rml`
    <form onsubmit="${AsText(stream)}">
      <input name="box1" id="box1">
      <button>submit</button>
    </form>
  `;
}
Enter fullscreen mode Exit fullscreen mode

AsText above is a Subject. Any data in goes through the given pipeline and the result feeds the actual stream we want to feed.

When to use which?

It's very simple how to distinguish normal "forward" pipelines from "reverse" pipelines:

  • You use forward pipelines when sinking data from your models to your sinks (e.g.: to your HTML).
  • You use reverse pipelines to transform or extract from raw DOM Events the data needed by your models.

Play

If it's not interactive, it's not true! Click the one below and play around a bit. Any trouble? Leave a comment below!

Now, if you like what's going on here, come say hi in our Discord Channel where we're discussing hop topics such as Stream-Oriented Programming!

Learn More

Top comments (2)

Collapse
 
framemuse profile image
Valery Zinchenko

But what if I need two different transformers on input and output?

Collapse
 
dariomannu profile image
Dario Mannu • Edited

Great question! You can use both on the same stream, if you want:

const stream = new Subject<string>()

const EventAdapter1 = inputPipe(
  filter(whatever),
  map((e: Event) => e.target.value)
);

const OutputFormatter1 = pipe(
  filter(str => str != ''),
  map((data: string) => data.toUpperCase())
);

target.innerHTML = rml`
  <button onclick=${EventAdapter1(stream)}">
    ${OutputFormatter1(stream)}
  </button>
`;
Enter fullscreen mode Exit fullscreen mode

This way, EventAdapter1 will apply a transformation to Event objects before they reach the stream, and OutputFormatter1 will change the output before it goes innerHTML