Managing database transactions might be tough and completely nontrivial in the case of Node.js. A lot of people miss the concept that we need to book a connection for the transaction and use it for all of the following queries.
Problem
Imagine situation when you already have 2 services - UserService
and WalletService
and we want to create user and wallet in one transaction
class UserService {
async create(payload) {
return knex('user').insert(payload);
}
}
class WalletService {
async create(payload) {
return knex('wallet').insert(payload);
}
}
As mentioned above in order to guarantee operations' atomicity we have to book a database connection and execute all of the transaction's operations using this specific connection. Naive implementation is to propagate database connection to the method arguments. We might also add additional check const connection = trx || knex;
to leave an opportunity to run the query outside of the transaction context if we don't pass the connection object
class UserService {
async create(payload, trx = null) {
const connection = trx || knex;
return connection('user').insert(payload);
}
}
class WalletService {
async create(payload, trx = null) {
const connection = trx || knex;
return connection('wallet').insert(payload);
}
}
So when we trigger our transaction we end up with something like this
async create(payload1, payload2) {
const trx = await knex.transaction();
try {
const user = await userService.create(payload1, trx);
const wallet = await walletService.create(payload2, trx);
await trx.commit();
} catch (err) {
await trx.rollback();
}
}
Of course this approach is a total anti-pattern. First of all, we pollute method arguments with the connection object. Secondly, to verify that each of our methods can be reused in transaction we have to propagate database connection in every single method. Thirdly, if we are trying to divide BLL and DAL (business logic layer and data access layer) we can't propagate connection object without breaking abstraction. So how should we deal with this problem?
Solution
Use @mokuteki/propagated-transactions
. This library uses AsyncLocalStorage under the hood (It was an experimental feature, but since v16.4.0 it is stable. As for now even big ORMs such as Sequelize started using it for transaction management). Package provides an opportunity to propagate transactions without binding your logic to specific ORM, Database, Driver, whatever. Also it contains in-built type definitions, so you can use both JavaScript or TypeScript
Usage
- Create an implementation of
ITransactionRunner
interface (provided by the package) for your specific database, driver, ORM, whatever. - Create an instance of
PropagatedTransaction
and pass implementation from step one into constructor - Instantiate and store database connection by starting the transaction with
PropagatedTransaction.start()
- Create a callback that executes business logic, use
PropagatedTransaction.commit()
/PropagatedTransaction.rollback()
inside of it - Run
PropagatedTransaction.run(connection, callback)
, where connection is stored connection from step three, callback is a callback from step four - Obtain connection inside of inner method/abstraction layer and use it to run your query
Javascript example
const { PropagatedTransaction } = require('@mokuteki/propagated-transactions');
const knex = require('knex')({
client: 'pg',
connection: {
version: '8.10',
host: '127.0.0.1',
port: 5432,
user: 'mokuteki',
password: 'pass123',
database: 'propagated-test',
},
});
// Step 1
const KnexTransactionRunner = {
start: async () => {
const trx = await knex.transaction();
return trx;
},
commit: async (trx) => {
return trx.commit();
},
rollback: async (trx) => {
return trx.rollback();
},
};
// Step 2
module.exports.ptx = new PropagatedTransaction(KnexTransactionRunner);
const ptx = require(/*your ptx path from step 2*/);
// Example method of a class that uses both UserService and WalletService in a transaction
async create(payload1, payload2) {
// Step 3
const connection = await ptx.start();
// Step 4
const callback = async () => {
try {
const user = await userService.create(payload1);
const wallet = await walletService.create(payload2);
await ptx.commit();
return user;
} catch (err) {
await ptx.rollback();
}
};
// Step 5
const user = await ptx.run(connection, callback);
return user;
}
const ptx = require(/*your ptx path from step 2*/);
class UserService {
async create(payload) {
/**
* Step 6
* If you run this method in PropagatedTransaction context it will be executed in transaction
* Otherwise it will be executed as usual query
*/
const connection = ptx.connection || knex;
return connection('user').insert(payload);
}
}
class WalletService {
async create(payload) {
// Step 6
const connection = ptx.connection || knex;
return connection('wallet').insert(payload);
}
}
Typescript example + Layers Separation
import { DataSource, QueryRunner } from 'typeorm';
import { PropagatedTransaction, ITransactionRunner } from '@mokuteki/propagated-transactions';
// Step 1
class TypeormTransactionRunner implements ITransactionRunner<QueryRunner> {
constructor(private readonly dataSource: DataSource) {}
/**
* Book and return database connection, run `.start()` method if exists
*/
public async start(): Promise<QueryRunner> {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
return queryRunner;
}
public async commit(queryRunner: QueryRunner): Promise<void> {
await queryRunner.commitTransaction();
return queryRunner.release();
}
public async rollback(queryRunner: QueryRunner): Promise<void> {
await queryRunner.rollbackTransaction();
return queryRunner.release();
}
}
// Step 2
export const ptx = new PropagatedTransaction(TypeormTransactionRunner);
export class UserService {
constructor(
private readonly ptx: PropagatedTransaction,
private readonly userRepository: IUserRepository,
private readonly walletRepository: IWalletRepository,
) {}
public async create(
payload1: ICreateUser,
payload2: ICreateWallet
): Promise<UserEntity> {
// Step 3
const connection = await this.ptx.start();
// Step 4
const callback = async () => {
try {
const user = await this.userRepository.create(payload1);
const wallet = await this.walletRepository.create(payload2);
await this.ptx.commit();
return user;
} catch (err) {
await this.ptx.rollback();
}
};
// Step 5
const user = await this.ptx.run<Promise<UserEntity>>(connection, callback);
return user;
}
}
export class UserRepository implements IUserRepository {
constructor(
private readonly manager: EntityManager,
private readonly ptx: PropagatedTransaction,
) {}
/**
* Step 6
* If you run this method in PropagatedTransaction context it will be executed in transaction
* Otherwise it will be executed as usual query
*/
public async create(data: ICreateUser): Promise<UserEntity> {
const manager = this.ptx.connection?.manager || this.manager;
const user = manager.getRepository(TypeormUserEntity).create(data);
return manager.save(user)
}
}
export class WalletRepository implements IWalletRepository {
constructor(
private readonly manager: EntityManager,
private readonly ptx: PropagatedTransaction,
) {}
/**
* Step 6
* If you run this method in PropagatedTransaction context it will be executed in transaction
* Otherwise it will be executed as usual query
*/
public async create(data: ICreateWallet): Promise<WalletEntity> {
const manager = this.ptx.connection?.manager || this.manager;
const wallet = manager.getRepository(TypeormWalletEntity).create(data);
return manager.save(wallet)
}
}
Overall
Today we've learned how to properly propagate database transactions without polluting method arguments and breaking abstraction layers
Library repository: https://github.com/mokuteki225/propagated-transactions
Npm package: https://www.npmjs.com/package/@mokuteki/propagated-transactions
Please leave your opinion in the comments!
Top comments (0)