loading...

Tap-ts: type-safe eavesdropping

mstn profile image Marco ใƒป7 min read

In a previous post, we talked about the issues we face when we implement a logging solution in a serverless context.

In order to run log commands in parallel and make our Lambda faster, we do not want to wait for responses from an external logging service. However, because the AWS platform may freeze the execution context, we have to "flush" any pending process before returning a value in the main Lambda handler.

If you want to understand better why, please refer to my previous post about this topic. You can run the experiments from this repo, as well.

Here, we want to present a NodeJS package, tap-ts, that offers the right abstractions to solve this problem.

This package resolves the general problem of eavesdropping a computation. Thus, its purpose goes beyond logging.

I think that it is also a nice case study to show how functional design with types can be actually better than other paradigms.

We use fp-ts, a Typescript library that introduces some common basic functional types. You do not need a deep functional programming knowledge to understand the rest of this post, though.

  • Not needed now, but I recommend this "getting started" tutorial about fp-ts by @gcanti .

How to tap a computation

Ramda has a useful function tap to inspect the content of a computation. For example, the following snippet defines a function that takes a number and adds 1.

const R = require('ramda');

const print = n => console.log(n);

const fun = R.compose(
  R.tap(print),
  R.add(1),
  R.tap(print),
)

The R.tap function intercepts the state of the computation at different stages and prints its value.

The side effect print does not interfere with the main computation.

We would like to do the same in fp-ts. However, our tap function should work with async side effects and run taps in parallel in a non-blocking way.

More importantly, if tapping is asynchronous, we must be sure that pending promises are resolved before the end of the Lambda function. For example, this Ramda code does not work as expected in serverless (but it does in nodejs!) for the reasons we discussed in our previous post.

const R = require('ramda');

const printAsync = n => sendHttpRequest(n); // returns a Promise

const fun = R.compose(
  R.tap(printAsync),
  R.add(1),
  R.tap(printAsync),
)

ObservableTasks

Task in fp-ts represents a computation that does not fail. Under the hood, Task is a wrapper for a Promise whose type is () => Promise<A>. The benefits of using Task instead of Promise are better composition and no callback hell.

import * as T from 'fp-ts/lib/Task';

import { pipe } from 'fp-ts/lib/pipeable';

// equivalent to () => Promise.resolve({ statusCode: 200, body: 'Hello, World!' })
const fakeApiCall = (token: string) => T.of({ statusCode: 200, body: 'Hello, World!' });
const getUserToken = (username: string, password: string) => T.of({ statusCode: 200, body: 'XYZ'  });

const getBody = response => response.body;

const runTask = pipe(
  // get a task that returns a token
  getUserToken('marco', 'password'),
  // concatenate the result of the execution of the previous task
  // with another task
  T.chain(fakeApiCall),
  // project the result of the task built from the composition
  // of fakeApiCall and getUserToken
  T.map(getBody),
);

// runTask has type () => Promise()
// is a description of a side effect, not its actual value
// thus we need to "run" it in order to get a response

const result = await runTask();
// 'Hello, World!'

A naive implementation of tap could be based on T.map as shown in the following snippet. Here, we log the current value in the computation chain and return it without any change.

const runTask = pipe(
  getUserToken('marco', 'password'),
  T.map( token => {
    console.log(token);
    return token;
  }),
  T.chain(fakeApiCall),
  T.map( response => {
    console.log(response);
    return response;
  }),
  T.map(getBody),
);

The problem with this implementation, which is roughly what Rambda does, is that it does not work with async tapping in serverless.

const runTask = pipe(
  ...
  T.map( token => {
    const promise = asyncLog(token);
    return token;
  }),
  ...
);

We need to collect deferred computation, i.e. promises, that are generated during the computation.

We can create a global array promises to store promises and call await Promise.all(promises) before the end, but it does not sound very functional.

An alternative approach is to use chain for tapping.

const runTask = pipe(
  ...
  T.chain( token => {
    return pipe(
     asyncLogTask(token),
     T.map( () => token )
    );
  }),
  ...
);

The problem here is that we wait for the resolution of the promise embedded in asyncLogTask. In other words, we block the computation and don't perform tapping in parallel.

For all these reasons, we introduce a new concept: ObservableTask.

An ObservableTask is a Task that can be "chained" as usual, but it can be tapped. The idea here is that ObservableTask is a Task that carries around also a state with pending promises.

The Writer monad

The Writer monad is a particular data structure that allows to accumulate a secondary value along with the return value of the main computation (informal definition from here).

If you think of tapping as a "secondary computation", the Writer monad seems what we need. We are not inventing anything new, though. Logging is one of the main use cases for the Writer monad. You can find a huge literature on the Web.

An ObservableTask<A> is nothing else but a Writer monad defined as Writer<Taps, Task<A>>. Think of Writer<Taps, Task<A>> as a data structure where Taps is the state with pending promises and Task<A> is the main computation returning a value of type A.

We can chain and map observable tasks as we do with traditional tasks, but we can also eavesdrop a computation without (almost) interfering with it. Indeed, the tap function is similar to Ramda tap, but the main difference is that tapping does not block the main computation and is deferred.

Here's a simple example.

import { start, chain, tap, map, end } from 'tap-ts/lib/Task';

// an async probe
const probe = (n: number): T.Task<void> => () => logAsync(`Read ${n}`);

const increase = n => n+1;

const runTask = pipe(
  // start a new ObservableTask
  // with an initial value
  start(0), 
  // log the current value
  tap(probe), 
  // here, it is like a traditional Task.chain
  // we do not wait for the resolution of logAsync(`Read ${n}`) promise
  chain(() => T.of(100)), 
  // log the current value
  tap(probe),
  // here, it is like a traditional Task.map
  map(increase), 
  tap(probe),
  map(increase),
  // before getting the result we need to release resources:
  // we wait for still pending promises
  end(), 
);
const result = await runTask();
expect(result).toBe(102);

Unhandled exceptions

A probe can be any function of type (a: A) => Task<void>. Thus, it can raise exceptions. For example, TypeError or ReferenceError are quite common in Javascript programming.

In general, it is a good practice to wrap the body of a Lambda handler with try-catch. In this way, exceptions are caught and handled gracefully.

So, in theory, exceptions occurring in tap are already captured. However, we do not want to fail the whole computation for failures in log commands. Logging is just a secondary computation.

For this reason, tap wraps callbacks in try-catch on our behalf.

Types for a better Developer Experience

As you can see, an observable computation begins and terminates with start and end, respectively.

The function start is a way to transform a traditional Task into an ObservableTask. In order to get back a traditional Task, we need to call the end function.

Basically, we define an "observation window". If we want to get the result of a computation, we must call end, explicitly. If we do not do that, we get a typing error. In this way, types force us to release resources used in the observation window.

In particular, end takes care of flushing pending promises on our behalf.

In this way, types help us to avoid runtime bugs. In general, types are not a substitute for unit or integration tests, but, in this particular context, a compile time error is able to detect a problem that will be hard to catch with traditional tests or debugging.

In OOP, it is common to have classes with a close or flush method whose goal is to close database connections, release used memory and so on. It is the responsibility of a programmer to call these methods. Unfortunately, in most cases, the type system does not help us to remember to do that!

Challenge. Following this line of thoughts, we should try-catch also chain and map in observable tasks. If something bad happens there, we need to wait for pending promises before returning. At the moment, we do not do that. It seems a good idea for your first PR. ๐Ÿงก

Interrupt logging preemptively

As we discussed in the previous post, we want to avoid Lambda timeouts when possible. Logging could cause timeouts (e.g. slow or temporarily unavailable logging service).

The end function is also able to interrupt logging preemptively if Lambda time is running out.

In this case, it is better to sacrifice "logging" in order to avoid timeouts in the first place. Losing some log messages is better than making a whole computation fail.

First, the return value of the main computation may be already available, it does not make sense to recompute it.

Then, Lambda failures might trigger a new Lambda execution; this means that side effects, which are not canceled on failure, are run twice or more.

In addition, if logging is slow because the external service is unavailable, we do not really want to send more requests.

Conclusions

In this post we presented tap-ts, a package for fp-ts that solves some problems with logging in a serverless context as discussed in a previous post.

It is based on the Writer monad, a well-known design pattern in functional programming, which is used for implementing logging.

This pattern allows running logging "in parallel", improving in general the performances of a Lambda function, but it also "reminds" us to wait for pending processes before returning a value. This is important in a serverless context where pending processes in a frozen Lambda context may cause unpredictable behaviors and be a source of nasty bugs, hard to find with testing and debugging.

Posted on by:

Discussion

markdown guide