Em muitos cenários de desenvolvimento, a necessidade de um message broker é inegável. No entanto, adotar soluções como Kafka ou RabbitMQ nem sempre é a opção mais viável ou desejável. Neste tutorial, vamos explorar uma abordagem alternativa e interessante: transformar uma tabela comum do Postgres em um message broker robusto, garantindo a integridade e consistência das mensagens processadas.
O que é um message broker? Trata-se de um sistema que gerencia e armazena uma fila de mensagens, sendo crucial para a comunicação assíncrona entre diferentes partes de um sistema. No nosso caso, o objetivo é criar um sistema onde diferentes sistemas e threads possam consumir mensagens da fila, assegurando que cada mensagem seja processada apenas uma vez, evitando corrupções ou duplicações indesejadas.
Preparando o Terreno
A ideia central é simples: um producer envia ordens de pagamento para uma tabela que atua como fila, com status inicial "PENDING". Em seguida, três clusters de uma aplicação Node executam cron jobs a cada 1 minuto, desempenhando o papel de consumers. Cada job verifica as ordens de pagamento pendentes na tabela, processa o pagamento e atualiza o status para "COMPLETED". O desafio é garantir que uma ordem seja processada apenas uma vez, mesmo com múltiplos consumidores operando simultaneamente.
O objetivo deste artigo não é ensinar como criar uma aplicação com Node do zero, mas apenas mostrar como utilizar o Postgres como message broker numa API feita como Node, por isso iremos pular alguns passos de setup básico.
Também, para fins didáticos, usaremos NestJS como framework para a API, pm2 para o gerenciamento dos clusters da aplicação e prisma para modelagem e como camada de comunicação com nosso banco de dados.
Você pode obter o projeto-base no repositório anti-duhring/postgres-queue-broker.
Primeiro de tudo é necessário iniciar um novo projeto Nest:
nest new postgres-queue-broker
Após isso, iremos instalar os pacotes pm2, @nestjs/schedule para gerenciamento dos cron jobs e o @prisma/client:
npm i pm2 @nestjs/schedule @prisma/client
Esse será o schema da nossa tabela Order, que conterá as ordens de pagamentos:
// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema
generator client {
  provider = "prisma-client-js"
}
datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}
model Order {
  id        String    @id             @default(uuid())
  message   String
  status    String
  createdAt DateTime  @default(now()) @map("created_at")
  updatedAt DateTime  @updatedAt      @map("updated_at")
}
Após isso, iremos criar um módulo para ser o nosso Producer, ele será responsável por criar uma nova ordem de pagamentos e enviar para a nossa tabela Order.
nest g resource order
order.service.ts:
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service'
import { orderStatus } from '../../common/orderStatus.enum'
@Injectable()
export class OrderService {
    constructor(private readonly prisma: PrismaService) {}
    async create(message: string) {
        const order = await this.prisma.order.create({
            data: {
                message,
                status: orderStatus.PENDING
            }
        })
        return order
    }
}
Trabalhando com Locks Inteligentes
Para alcançar esse objetivo, iremos explorar as funcionalidades do Postgres: as cláusulas FOR UPDATE e SKIP LOCKED. O FOR UPDATE bloqueia as linhas retornadas por uma consulta, garantindo que nenhuma outra transação possa modificá-las até que a transação atual seja concluída. Por sua vez, o SKIP LOCKED permite que uma consulta ignore linhas que estejam bloqueadas, possibilitando o salto sobre linhas já bloqueadas por outros consumidores.
Sabendo isto, iremos criar o módulo do nosso consumer, que consistirá num cron job_ que roda a cada 1min, obtendo as ordens de pagamento pendentes, processando e atualizando seu status:
nest g resource consumer
consumer.service.ts
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { orderStatus } from '../../common/orderStatus.enum';
import { Cron } from '@nestjs/schedule';
@Injectable()
export class ConsumerService {
  constructor(private readonly prisma: PrismaService) {}
  async getPendingOrders() {
    const pendingOrders = await this.prisma.$queryRaw`
                select o.*
                from "Order" o 
                where o.status = 'PENDING'
                order by o.created_at asc 
                for update skip locked
        `;
    return pendingOrders as any[];
  }
  @Cron('0 * * * * *')
  async processOrders() {
    const pendingOrders = await this.getPendingOrders();
    for await (const order of pendingOrders) {
      await this.prisma.order.update({
        where: {
          id: order.id,
        },
        data: {
          status: orderStatus.COMPLETED,
        },
      });
      console.log(`Order ${order.id} has been processed`);
    }
  }
}
Esta consulta permite que cada job consumer obtenha uma ordem de pagamento pendente e a bloqueie, impedindo que outros jobs acessem a mesma ordem simultaneamente. Uma vez que a ordem é processada e o status é atualizado, ela é liberada para futuros consumidores.
Importante mencionar que, num cenário real, é necessário ter cautela com a quantidade de itens carregados, por isso um LIMIT para limitar a quantidade de itens obtidos por cada job pode evitar gargalos e erros de out of memory.
Rodando nossa aplicação
Por fim, basta realizar a build da aplicação.
npm run build
E executar os 3 clusters com o pm2:
pm2 start dist/main.js -i 3
É possível monitorar os logs dos clusters com seguinte comando:
pm2 logs main
Dessa forma podemos ver que cada ordem de pagamento é processada por apenas um cluster:
Conclusão
O uso do Postgres como um message broker alternativo pode ser uma solução valiosa em cenários onde a complexidade do Kafka ou do RabbitMQ não é justificada. Aproveitar as funcionalidades nativas do Postgres, como FOR UPDATE e SKIP LOCKED, possibilita a criação de um sistema robusto que garante a integridade e consistência das mensagens processadas.
Ao explorar essa abordagem, você poderá aprimorar a comunicação assíncrona em seus sistemas, evitando problemas de duplicação e corrupção de mensagens. Dessa forma, o Postgres se mostra não apenas como um excelente banco de dados, mas também como um aliado poderoso na construção de soluções de integração confiáveis. Experimente essa abordagem em seus projetos e desfrute dos benefícios de um message broker sólido e eficiente.
Para saber mais
Esse artigo foi fortemente inspirado pela palestra do Rafael Ponte no canal da ZUP, fica aqui a recomendação para entender a implementação com mais detalhes:
https://www.youtube.com/watch?v=FF6Am0N6eq4&t=3275s
 


 
    
Top comments (0)