DEV Community

Mateus Vinícius
Mateus Vinícius

Posted on • Edited on

Usando postgres como message broker

Em muitos cenários de desenvolvimento, a necessidade de um message broker é inegável. No entanto, adotar soluções como Kafka ou RabbitMQ nem sempre é a opção mais viável ou desejável. Neste tutorial, vamos explorar uma abordagem alternativa e interessante: transformar uma tabela comum do Postgres em um message broker robusto, garantindo a integridade e consistência das mensagens processadas.

O que é um message broker? Trata-se de um sistema que gerencia e armazena uma fila de mensagens, sendo crucial para a comunicação assíncrona entre diferentes partes de um sistema. No nosso caso, o objetivo é criar um sistema onde diferentes sistemas e threads possam consumir mensagens da fila, assegurando que cada mensagem seja processada apenas uma vez, evitando corrupções ou duplicações indesejadas.

Preparando o Terreno

A ideia central é simples: um producer envia ordens de pagamento para uma tabela que atua como fila, com status inicial "PENDING". Em seguida, três clusters de uma aplicação Node executam cron jobs a cada 1 minuto, desempenhando o papel de consumers. Cada job verifica as ordens de pagamento pendentes na tabela, processa o pagamento e atualiza o status para "COMPLETED". O desafio é garantir que uma ordem seja processada apenas uma vez, mesmo com múltiplos consumidores operando simultaneamente.

O objetivo deste artigo não é ensinar como criar uma aplicação com Node do zero, mas apenas mostrar como utilizar o Postgres como message broker numa API feita como Node, por isso iremos pular alguns passos de setup básico.

Também, para fins didáticos, usaremos NestJS como framework para a API, pm2 para o gerenciamento dos clusters da aplicação e prisma para modelagem e como camada de comunicação com nosso banco de dados.

Você pode obter o projeto-base no repositório anti-duhring/postgres-queue-broker.

Primeiro de tudo é necessário iniciar um novo projeto Nest:

nest new postgres-queue-broker
Enter fullscreen mode Exit fullscreen mode

Após isso, iremos instalar os pacotes pm2, @nestjs/schedule para gerenciamento dos cron jobs e o @prisma/client:

npm i pm2 @nestjs/schedule @prisma/client
Enter fullscreen mode Exit fullscreen mode

Esse será o schema da nossa tabela Order, que conterá as ordens de pagamentos:

// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema

generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

model Order {
  id        String    @id             @default(uuid())
  message   String
  status    String
  createdAt DateTime  @default(now()) @map("created_at")
  updatedAt DateTime  @updatedAt      @map("updated_at")
}
Enter fullscreen mode Exit fullscreen mode

Após isso, iremos criar um módulo para ser o nosso Producer, ele será responsável por criar uma nova ordem de pagamentos e enviar para a nossa tabela Order.

nest g resource order
Enter fullscreen mode Exit fullscreen mode

order.service.ts:

import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service'
import { orderStatus } from '../../common/orderStatus.enum'

@Injectable()
export class OrderService {
    constructor(private readonly prisma: PrismaService) {}

    async create(message: string) {
        const order = await this.prisma.order.create({
            data: {
                message,
                status: orderStatus.PENDING
            }
        })

        return order
    }
}
Enter fullscreen mode Exit fullscreen mode

Trabalhando com Locks Inteligentes

Para alcançar esse objetivo, iremos explorar as funcionalidades do Postgres: as cláusulas FOR UPDATE e SKIP LOCKED. O FOR UPDATE bloqueia as linhas retornadas por uma consulta, garantindo que nenhuma outra transação possa modificá-las até que a transação atual seja concluída. Por sua vez, o SKIP LOCKED permite que uma consulta ignore linhas que estejam bloqueadas, possibilitando o salto sobre linhas já bloqueadas por outros consumidores.

Sabendo isto, iremos criar o módulo do nosso consumer, que consistirá num cron job_ que roda a cada 1min, obtendo as ordens de pagamento pendentes, processando e atualizando seu status:

nest g resource consumer
Enter fullscreen mode Exit fullscreen mode

consumer.service.ts

import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { orderStatus } from '../../common/orderStatus.enum';
import { Cron } from '@nestjs/schedule';

@Injectable()
export class ConsumerService {
  constructor(private readonly prisma: PrismaService) {}

  async getPendingOrders() {
    const pendingOrders = await this.prisma.$queryRaw`
                select o.*
                from "Order" o 
                where o.status = 'PENDING'
                order by o.created_at asc 
                for update skip locked
        `;

    return pendingOrders as any[];
  }

  @Cron('0 * * * * *')
  async processOrders() {
    const pendingOrders = await this.getPendingOrders();

    for await (const order of pendingOrders) {
      await this.prisma.order.update({
        where: {
          id: order.id,
        },
        data: {
          status: orderStatus.COMPLETED,
        },
      });
      console.log(`Order ${order.id} has been processed`);
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Esta consulta permite que cada job consumer obtenha uma ordem de pagamento pendente e a bloqueie, impedindo que outros jobs acessem a mesma ordem simultaneamente. Uma vez que a ordem é processada e o status é atualizado, ela é liberada para futuros consumidores.

Importante mencionar que, num cenário real, é necessário ter cautela com a quantidade de itens carregados, por isso um LIMIT para limitar a quantidade de itens obtidos por cada job pode evitar gargalos e erros de out of memory.

Rodando nossa aplicação

Por fim, basta realizar a build da aplicação.

npm run build
Enter fullscreen mode Exit fullscreen mode

E executar os 3 clusters com o pm2:

pm2 start dist/main.js -i 3
Enter fullscreen mode Exit fullscreen mode

É possível monitorar os logs dos clusters com seguinte comando:

pm2 logs main
Enter fullscreen mode Exit fullscreen mode

Dessa forma podemos ver que cada ordem de pagamento é processada por apenas um cluster:

Log mostrando que cada ordem de pagamento foi processada por um cluster diferente

Conclusão

O uso do Postgres como um message broker alternativo pode ser uma solução valiosa em cenários onde a complexidade do Kafka ou do RabbitMQ não é justificada. Aproveitar as funcionalidades nativas do Postgres, como FOR UPDATE e SKIP LOCKED, possibilita a criação de um sistema robusto que garante a integridade e consistência das mensagens processadas.

Ao explorar essa abordagem, você poderá aprimorar a comunicação assíncrona em seus sistemas, evitando problemas de duplicação e corrupção de mensagens. Dessa forma, o Postgres se mostra não apenas como um excelente banco de dados, mas também como um aliado poderoso na construção de soluções de integração confiáveis. Experimente essa abordagem em seus projetos e desfrute dos benefícios de um message broker sólido e eficiente.

Para saber mais

Esse artigo foi fortemente inspirado pela palestra do Rafael Ponte no canal da ZUP, fica aqui a recomendação para entender a implementação com mais detalhes:
https://www.youtube.com/watch?v=FF6Am0N6eq4&t=3275s

Top comments (0)