DEV Community

Vladyslav Kondratiuk
Vladyslav Kondratiuk

Posted on

Propagated Transactions for Node.js Applications

Managing database transactions might be tough and completely nontrivial in the case of Node.js. A lot of people miss the concept that we need to book a connection for the transaction and use it for all of the following queries.

Problem

Imagine situation when you already have 2 services - UserService and WalletService and we want to create user and wallet in one transaction

class UserService {
  async create(payload) {
    return knex('user').insert(payload);
  }
}

class WalletService {
  async create(payload) {
    return knex('wallet').insert(payload);
  }
}
Enter fullscreen mode Exit fullscreen mode

As mentioned above in order to guarantee operations' atomicity we have to book a database connection and execute all of the transaction's operations using this specific connection. Naive implementation is to propagate database connection to the method arguments. We might also add additional check const connection = trx || knex; to leave an opportunity to run the query outside of the transaction context if we don't pass the connection object

class UserService {
  async create(payload, trx = null) {
    const connection = trx || knex;
    return connection('user').insert(payload);
  }
}

class WalletService {
  async create(payload, trx = null) {
    const connection = trx || knex;
    return connection('wallet').insert(payload);
  }
}
Enter fullscreen mode Exit fullscreen mode

So when we trigger our transaction we end up with something like this

async create(payload1, payload2) {
  const trx = await knex.transaction();

  try {
    const user = await userService.create(payload1, trx);
    const wallet = await walletService.create(payload2, trx);

    await trx.commit();
  } catch (err) {
    await trx.rollback();
  }
}
Enter fullscreen mode Exit fullscreen mode

Of course this approach is a total anti-pattern. First of all, we pollute method arguments with the connection object. Secondly, to verify that each of our methods can be reused in transaction we have to propagate database connection in every single method. Thirdly, if we are trying to divide BLL and DAL (business logic layer and data access layer) we can't propagate connection object without breaking abstraction. So how should we deal with this problem?

Solution

Use @mokuteki/propagated-transactions. This library uses AsyncLocalStorage under the hood (It was an experimental feature, but since v16.4.0 it is stable. As for now even big ORMs such as Sequelize started using it for transaction management). Package provides an opportunity to propagate transactions without binding your logic to specific ORM, Database, Driver, whatever. Also it contains in-built type definitions, so you can use both JavaScript or TypeScript

Usage

  1. Create an implementation of ITransactionRunner interface (provided by the package) for your specific database, driver, ORM, whatever.
  2. Create an instance of PropagatedTransaction and pass implementation from step one into constructor
  3. Instantiate and store database connection by starting the transaction with PropagatedTransaction.start()
  4. Create a callback that executes business logic, use PropagatedTransaction.commit() / PropagatedTransaction.rollback() inside of it
  5. Run PropagatedTransaction.run(connection, callback), where connection is stored connection from step three, callback is a callback from step four
  6. Obtain connection inside of inner method/abstraction layer and use it to run your query

Javascript example

const { PropagatedTransaction } = require('@mokuteki/propagated-transactions');

const knex = require('knex')({
  client: 'pg',
  connection: {
    version: '8.10',
    host: '127.0.0.1',
    port: 5432,
    user: 'mokuteki',
    password: 'pass123',
    database: 'propagated-test',
  },
});

// Step 1
const KnexTransactionRunner = {
  start: async () => {
    const trx = await knex.transaction();

    return trx;
  },
  commit: async (trx) => {
    return trx.commit();
  },
  rollback: async (trx) => {
    return trx.rollback();
  },
};

// Step 2
module.exports.ptx = new PropagatedTransaction(KnexTransactionRunner);
Enter fullscreen mode Exit fullscreen mode
const ptx = require(/*your ptx path from step 2*/);

// Example method of a class that uses both UserService and WalletService in a transaction
async create(payload1, payload2) {
  // Step 3
  const connection = await ptx.start();

  // Step 4
  const callback = async () => {
    try {
      const user = await userService.create(payload1);
      const wallet = await walletService.create(payload2);

      await ptx.commit();

      return user;
    } catch (err) {
      await ptx.rollback();
    }
  };

  // Step 5
  const user = await ptx.run(connection, callback);

  return user;
}
Enter fullscreen mode Exit fullscreen mode
const ptx = require(/*your ptx path from step 2*/);

class UserService {
  async create(payload) {
    /**
     * Step 6
     * If you run this method in PropagatedTransaction context it will be executed in transaction
     * Otherwise it will be executed as usual query
     */
    const connection = ptx.connection || knex;
    return connection('user').insert(payload);
  }
}

class WalletService {
  async create(payload) {
    // Step 6
    const connection = ptx.connection || knex;
    return connection('wallet').insert(payload);
  }
}
Enter fullscreen mode Exit fullscreen mode

Typescript example + Layers Separation

import { DataSource, QueryRunner } from 'typeorm';
import { PropagatedTransaction, ITransactionRunner } from '@mokuteki/propagated-transactions';

// Step 1
class TypeormTransactionRunner implements ITransactionRunner<QueryRunner> {
  constructor(private readonly dataSource: DataSource) {}

  /**
   * Book and return database connection, run `.start()` method if exists
   */
  public async start(): Promise<QueryRunner> {
    const queryRunner = this.dataSource.createQueryRunner();

    await queryRunner.connect();

    await queryRunner.startTransaction();

    return queryRunner;
  }

  public async commit(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.commitTransaction();

    return queryRunner.release();
  }

  public async rollback(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.rollbackTransaction();

    return queryRunner.release();
  }
}

// Step 2
export const ptx = new PropagatedTransaction(TypeormTransactionRunner);
Enter fullscreen mode Exit fullscreen mode
export class UserService {
  constructor(
    private readonly ptx: PropagatedTransaction, 
    private readonly userRepository: IUserRepository, 
    private readonly walletRepository: IWalletRepository, 
  ) {}

  public async create(
    payload1: ICreateUser, 
    payload2: ICreateWallet
  ): Promise<UserEntity> {
    // Step 3
    const connection = await this.ptx.start();

    // Step 4
    const callback = async () => {
      try {
        const user = await this.userRepository.create(payload1);
        const wallet = await this.walletRepository.create(payload2);

        await this.ptx.commit();

        return user;
      } catch (err) {
        await this.ptx.rollback();
      }
    };

    // Step 5
    const user = await this.ptx.run<Promise<UserEntity>>(connection, callback);

    return user;
  }
}
Enter fullscreen mode Exit fullscreen mode
export class UserRepository implements IUserRepository {
  constructor(
    private readonly manager: EntityManager, 
    private readonly ptx: PropagatedTransaction,
  ) {}

  /**
   * Step 6
   * If you run this method in PropagatedTransaction context it will be executed in transaction
   * Otherwise it will be executed as usual query
   */
  public async create(data: ICreateUser): Promise<UserEntity> {
    const manager = this.ptx.connection?.manager || this.manager;

    const user = manager.getRepository(TypeormUserEntity).create(data);

    return manager.save(user)
  }
}
Enter fullscreen mode Exit fullscreen mode
export class WalletRepository implements IWalletRepository {
  constructor(
    private readonly manager: EntityManager, 
    private readonly ptx: PropagatedTransaction,
  ) {}

  /**
   * Step 6
   * If you run this method in PropagatedTransaction context it will be executed in transaction
   * Otherwise it will be executed as usual query
   */
  public async create(data: ICreateWallet): Promise<WalletEntity> {
    const manager = this.ptx.connection?.manager || this.manager;

    const wallet = manager.getRepository(TypeormWalletEntity).create(data);

    return manager.save(wallet)
  }
}
Enter fullscreen mode Exit fullscreen mode

Overall

Today we've learned how to properly propagate database transactions without polluting method arguments and breaking abstraction layers

Library repository: https://github.com/mokuteki225/propagated-transactions
Npm package: https://www.npmjs.com/package/@mokuteki/propagated-transactions

Please leave your opinion in the comments!

Top comments (0)